文章参考了以下内容
蓝牙bluez5的开发方法及入门教程_bluez蓝牙配网demo-CSDN博客文章浏览阅读1w次,点赞15次,收藏99次。1 摘要这篇文章的主要目的是告诉大家应该如何使用bluez进行开发,由于bluez的文档实在太少了,入门门槛实在太高了,很多人无从下手,准备写一个专题记录一下自己学习bluez的过程,分享一下bluez的学习方法,让大家少走一些弯路。我用的平台是君正x2000,蓝牙库使用的是bluez5.54,bluez库是通过buildroot编译出来的。通过BLUEZ我主要开发了一个BLE的串口服务端demo。这个程序我已经上传到了https://download.csdn.net/download/huoho_bluez蓝牙配网demohttps://blog.csdn.net/huohongpeng/article/details/115786953D-Bus学习(二):基本概念_freedesktop 学习-CSDN博客文章浏览阅读1.3w次。 D-Bus的方式在移动手机操作系统中非常重要,包括Maemo,Moblin等以Linux为基础的操作系统。估计Andriod也大量使用。D-Bus的相关学习资料见:http://www.freedesktop.org/wiki/Software/dbus,在网上也有大量的学习资料,在http://blog.chinaunix.net/u3/111961/_freedesktop 学习https://blog.csdn.net/flowingflying/article/details/5411124
BLUEZ源码路径
GitHub - bluez/bluez: Main BlueZ tree
我需要搭建一个对端读写的一对GATT Service,所以参照bluez-master\test\example-gatt-client和bluez-master\test\example-gatt-server,python源码内容如下:
client
#!/usr/bin/env python3
# SPDX-License-Identifier: LGPL-2.1-or-laterimport dbus
try:from gi.repository import GObject
except ImportError:import gobject as GObject
import sysfrom dbus.mainloop.glib import DBusGMainLoopbus = None
mainloop = NoneBLUEZ_SERVICE_NAME = 'org.bluez'
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'GATT_SERVICE_IFACE = 'org.bluez.GattService1'
GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'HR_SVC_UUID = '0000180d-0000-1000-8000-00805f9b34fb'
HR_MSRMT_UUID = '00002a37-0000-1000-8000-00805f9b34fb'
BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb'
HR_CTRL_PT_UUID = '00002a39-0000-1000-8000-00805f9b34fb'# The objects that we interact with.
hr_service = None
hr_msrmt_chrc = None
body_snsr_loc_chrc = None
hr_ctrl_pt_chrc = Nonedef generic_error_cb(error):print('D-Bus call failed: ' + str(error))mainloop.quit()def body_sensor_val_to_str(val):if val == 0:return 'Other'if val == 1:return 'Chest'if val == 2:return 'Wrist'if val == 3:return 'Finger'if val == 4:return 'Hand'if val == 5:return 'Ear Lobe'if val == 6:return 'Foot'return 'Reserved value'def sensor_contact_val_to_str(val):if val == 0 or val == 1:return 'not supported'if val == 2:return 'no contact detected'if val == 3:return 'contact detected'return 'invalid value'def body_sensor_val_cb(value):if len(value) != 1:print('Invalid body sensor location value: ' + repr(value))returnprint('Body sensor location value: ' + body_sensor_val_to_str(value[0]))def hr_msrmt_start_notify_cb():print('HR Measurement notifications enabled')def hr_msrmt_changed_cb(iface, changed_props, invalidated_props):if iface != GATT_CHRC_IFACE:returnif not len(changed_props):returnvalue = changed_props.get('Value', None)if not value:returnprint('New HR Measurement')flags = value[0]value_format = flags & 0x01sc_status = (flags >> 1) & 0x03ee_status = flags & 0x08if value_format == 0x00:hr_msrmt = value[1]next_ind = 2else:hr_msrmt = value[1] | (value[2] << 8)next_ind = 3print('\tHR: ' + str(int(hr_msrmt)))print('\tSensor Contact status: ' +sensor_contact_val_to_str(sc_status))if ee_status:print('\tEnergy Expended: ' + str(int(value[next_ind])))def start_client():# Read the Body Sensor Location value and print it asynchronously.body_snsr_loc_chrc[0].ReadValue({}, reply_handler=body_sensor_val_cb,error_handler=generic_error_cb,dbus_interface=GATT_CHRC_IFACE)# Listen to PropertiesChanged signals from the Heart Measurement# Characteristic.hr_msrmt_prop_iface = dbus.Interface(hr_msrmt_chrc[0], DBUS_PROP_IFACE)hr_msrmt_prop_iface.connect_to_signal("PropertiesChanged",hr_msrmt_changed_cb)# Subscribe to Heart Rate Measurement notifications.hr_msrmt_chrc[0].StartNotify(reply_handler=hr_msrmt_start_notify_cb,error_handler=generic_error_cb,dbus_interface=GATT_CHRC_IFACE)def process_chrc(chrc_path):chrc = bus.get_object(BLUEZ_SERVICE_NAME, chrc_path)chrc_props = chrc.GetAll(GATT_CHRC_IFACE,dbus_interface=DBUS_PROP_IFACE)uuid = chrc_props['UUID']if uuid == HR_MSRMT_UUID:global hr_msrmt_chrchr_msrmt_chrc = (chrc, chrc_props)elif uuid == BODY_SNSR_LOC_UUID:global body_snsr_loc_chrcbody_snsr_loc_chrc = (chrc, chrc_props)elif uuid == HR_CTRL_PT_UUID:global hr_ctrl_pt_chrchr_ctrl_pt_chrc = (chrc, chrc_props)else:print('Unrecognized characteristic: ' + uuid)return Truedef process_hr_service(service_path, chrc_paths):service = bus.get_object(BLUEZ_SERVICE_NAME, service_path)service_props = service.GetAll(GATT_SERVICE_IFACE,dbus_interface=DBUS_PROP_IFACE)uuid = service_props['UUID']if uuid != HR_SVC_UUID:return Falseprint('Heart Rate Service found: ' + service_path)# Process the characteristics.for chrc_path in chrc_paths:process_chrc(chrc_path)global hr_servicehr_service = (service, service_props, service_path)return Truedef interfaces_removed_cb(object_path, interfaces):if not hr_service:returnif object_path == hr_service[2]:print('Service was removed')mainloop.quit()def main():# Set up the main loop.DBusGMainLoop(set_as_default=True)global busbus = dbus.SystemBus()global mainloopmainloop = GObject.MainLoop()om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)om.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)print('Getting objects...')objects = om.GetManagedObjects()chrcs = []# List characteristics foundfor path, interfaces in objects.items():if GATT_CHRC_IFACE not in interfaces.keys():continuechrcs.append(path)# List sevices foundfor path, interfaces in objects.items():if GATT_SERVICE_IFACE not in interfaces.keys():continuechrc_paths = [d for d in chrcs if d.startswith(path + "/")]if process_hr_service(path, chrc_paths):breakif not hr_service:print('No Heart Rate Service found')sys.exit(1)start_client()mainloop.run()if __name__ == '__main__':main()
server
#!/usr/bin/env python3
# SPDX-License-Identifier: LGPL-2.1-or-laterimport dbus
import dbus.exceptions
import dbus.mainloop.glib
import dbus.serviceimport array
try:from gi.repository import GObject
except ImportError:import gobject as GObject
import sysfrom random import randintmainloop = NoneBLUEZ_SERVICE_NAME = 'org.bluez'
GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'GATT_SERVICE_IFACE = 'org.bluez.GattService1'
GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
GATT_DESC_IFACE = 'org.bluez.GattDescriptor1'class InvalidArgsException(dbus.exceptions.DBusException):_dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'class NotSupportedException(dbus.exceptions.DBusException):_dbus_error_name = 'org.bluez.Error.NotSupported'class NotPermittedException(dbus.exceptions.DBusException):_dbus_error_name = 'org.bluez.Error.NotPermitted'class InvalidValueLengthException(dbus.exceptions.DBusException):_dbus_error_name = 'org.bluez.Error.InvalidValueLength'class FailedException(dbus.exceptions.DBusException):_dbus_error_name = 'org.bluez.Error.Failed'class Application(dbus.service.Object):"""org.bluez.GattApplication1 interface implementation"""def __init__(self, bus):self.path = '/'self.services = []dbus.service.Object.__init__(self, bus, self.path)self.add_service(HeartRateService(bus, 0))self.add_service(BatteryService(bus, 1))self.add_service(TestService(bus, 2))def get_path(self):return dbus.ObjectPath(self.path)def add_service(self, service):self.services.append(service)@dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')def GetManagedObjects(self):response = {}print('GetManagedObjects')for service in self.services:response[service.get_path()] = service.get_properties()chrcs = service.get_characteristics()for chrc in chrcs:response[chrc.get_path()] = chrc.get_properties()descs = chrc.get_descriptors()for desc in descs:response[desc.get_path()] = desc.get_properties()return responseclass Service(dbus.service.Object):"""org.bluez.GattService1 interface implementation"""PATH_BASE = '/org/bluez/example/service'def __init__(self, bus, index, uuid, primary):self.path = self.PATH_BASE + str(index)self.bus = busself.uuid = uuidself.primary = primaryself.characteristics = []dbus.service.Object.__init__(self, bus, self.path)def get_properties(self):return {GATT_SERVICE_IFACE: {'UUID': self.uuid,'Primary': self.primary,'Characteristics': dbus.Array(self.get_characteristic_paths(),signature='o')}}def get_path(self):return dbus.ObjectPath(self.path)def add_characteristic(self, characteristic):self.characteristics.append(characteristic)def get_characteristic_paths(self):result = []for chrc in self.characteristics:result.append(chrc.get_path())return resultdef get_characteristics(self):return self.characteristics@dbus.service.method(DBUS_PROP_IFACE,in_signature='s',out_signature='a{sv}')def GetAll(self, interface):if interface != GATT_SERVICE_IFACE:raise InvalidArgsException()return self.get_properties()[GATT_SERVICE_IFACE]class Characteristic(dbus.service.Object):"""org.bluez.GattCharacteristic1 interface implementation"""def __init__(self, bus, index, uuid, flags, service):self.path = service.path + '/char' + str(index)self.bus = busself.uuid = uuidself.service = serviceself.flags = flagsself.descriptors = []dbus.service.Object.__init__(self, bus, self.path)def get_properties(self):return {GATT_CHRC_IFACE: {'Service': self.service.get_path(),'UUID': self.uuid,'Flags': self.flags,'Descriptors': dbus.Array(self.get_descriptor_paths(),signature='o')}}def get_path(self):return dbus.ObjectPath(self.path)def add_descriptor(self, descriptor):self.descriptors.append(descriptor)def get_descriptor_paths(self):result = []for desc in self.descriptors:result.append(desc.get_path())return resultdef get_descriptors(self):return self.descriptors@dbus.service.method(DBUS_PROP_IFACE,in_signature='s',out_signature='a{sv}')def GetAll(self, interface):if interface != GATT_CHRC_IFACE:raise InvalidArgsException()return self.get_properties()[GATT_CHRC_IFACE]@dbus.service.method(GATT_CHRC_IFACE,in_signature='a{sv}',out_signature='ay')def ReadValue(self, options):print('Default ReadValue called, returning error')raise NotSupportedException()@dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}')def WriteValue(self, value, options):print('Default WriteValue called, returning error')raise NotSupportedException()@dbus.service.method(GATT_CHRC_IFACE)def StartNotify(self):print('Default StartNotify called, returning error')raise NotSupportedException()@dbus.service.method(GATT_CHRC_IFACE)def StopNotify(self):print('Default StopNotify called, returning error')raise NotSupportedException()@dbus.service.signal(DBUS_PROP_IFACE,signature='sa{sv}as')def PropertiesChanged(self, interface, changed, invalidated):passclass Descriptor(dbus.service.Object):"""org.bluez.GattDescriptor1 interface implementation"""def __init__(self, bus, index, uuid, flags, characteristic):self.path = characteristic.path + '/desc' + str(index)self.bus = busself.uuid = uuidself.flags = flagsself.chrc = characteristicdbus.service.Object.__init__(self, bus, self.path)def get_properties(self):return {GATT_DESC_IFACE: {'Characteristic': self.chrc.get_path(),'UUID': self.uuid,'Flags': self.flags,}}def get_path(self):return dbus.ObjectPath(self.path)@dbus.service.method(DBUS_PROP_IFACE,in_signature='s',out_signature='a{sv}')def GetAll(self, interface):if interface != GATT_DESC_IFACE:raise InvalidArgsException()return self.get_properties()[GATT_DESC_IFACE]@dbus.service.method(GATT_DESC_IFACE,in_signature='a{sv}',out_signature='ay')def ReadValue(self, options):print ('Default ReadValue called, returning error')raise NotSupportedException()@dbus.service.method(GATT_DESC_IFACE, in_signature='aya{sv}')def WriteValue(self, value, options):print('Default WriteValue called, returning error')raise NotSupportedException()class HeartRateService(Service):"""Fake Heart Rate Service that simulates a fake heart beat and control pointbehavior."""HR_UUID = '0000180d-0000-1000-8000-00805f9b34fb'def __init__(self, bus, index):Service.__init__(self, bus, index, self.HR_UUID, True)self.add_characteristic(HeartRateMeasurementChrc(bus, 0, self))self.add_characteristic(BodySensorLocationChrc(bus, 1, self))self.add_characteristic(HeartRateControlPointChrc(bus, 2, self))self.energy_expended = 0class HeartRateMeasurementChrc(Characteristic):HR_MSRMT_UUID = '00002a37-0000-1000-8000-00805f9b34fb'def __init__(self, bus, index, service):Characteristic.__init__(self, bus, index,self.HR_MSRMT_UUID,['notify'],service)self.notifying = Falseself.hr_ee_count = 0def hr_msrmt_cb(self):value = []value.append(dbus.Byte(0x06))value.append(dbus.Byte(randint(90, 130)))if self.hr_ee_count % 10 == 0:value[0] = dbus.Byte(value[0] | 0x08)value.append(dbus.Byte(self.service.energy_expended & 0xff))value.append(dbus.Byte((self.service.energy_expended >> 8) & 0xff))self.service.energy_expended = \min(0xffff, self.service.energy_expended + 1)self.hr_ee_count += 1print('Updating value: ' + repr(value))self.PropertiesChanged(GATT_CHRC_IFACE, { 'Value': value }, [])return self.notifyingdef _update_hr_msrmt_simulation(self):print('Update HR Measurement Simulation')if not self.notifying:returnGObject.timeout_add(1000, self.hr_msrmt_cb)def StartNotify(self):if self.notifying:print('Already notifying, nothing to do')returnself.notifying = Trueself._update_hr_msrmt_simulation()def StopNotify(self):if not self.notifying:print('Not notifying, nothing to do')returnself.notifying = Falseself._update_hr_msrmt_simulation()class BodySensorLocationChrc(Characteristic):BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb'def __init__(self, bus, index, service):Characteristic.__init__(self, bus, index,self.BODY_SNSR_LOC_UUID,['read'],service)def ReadValue(self, options):# Return 'Chest' as the sensor location.return [ 0x01 ]class HeartRateControlPointChrc(Characteristic):HR_CTRL_PT_UUID = '00002a39-0000-1000-8000-00805f9b34fb'def __init__(self, bus, index, service):Characteristic.__init__(self, bus, index,self.HR_CTRL_PT_UUID,['write'],service)def WriteValue(self, value, options):print('Heart Rate Control Point WriteValue called')if len(value) != 1:raise InvalidValueLengthException()byte = value[0]print('Control Point value: ' + repr(byte))if byte != 1:raise FailedException("0x80")print('Energy Expended field reset!')self.service.energy_expended = 0class BatteryService(Service):"""Fake Battery service that emulates a draining battery."""BATTERY_UUID = '180f'def __init__(self, bus, index):Service.__init__(self, bus, index, self.BATTERY_UUID, True)self.add_characteristic(BatteryLevelCharacteristic(bus, 0, self))class BatteryLevelCharacteristic(Characteristic):"""Fake Battery Level characteristic. The battery level is drained by 2 pointsevery 5 seconds."""BATTERY_LVL_UUID = '2a19'def __init__(self, bus, index, service):Characteristic.__init__(self, bus, index,self.BATTERY_LVL_UUID,['read', 'notify'],service)self.notifying = Falseself.battery_lvl = 100GObject.timeout_add(5000, self.drain_battery)def notify_battery_level(self):if not self.notifying:returnself.PropertiesChanged(GATT_CHRC_IFACE,{ 'Value': [dbus.Byte(self.battery_lvl)] }, [])def drain_battery(self):if not self.notifying:return Trueif self.battery_lvl > 0:self.battery_lvl -= 2if self.battery_lvl < 0:self.battery_lvl = 0print('Battery Level drained: ' + repr(self.battery_lvl))self.notify_battery_level()return Truedef ReadValue(self, options):print('Battery Level read: ' + repr(self.battery_lvl))return [dbus.Byte(self.battery_lvl)]def StartNotify(self):if self.notifying:print('Already notifying, nothing to do')returnself.notifying = Trueself.notify_battery_level()def StopNotify(self):if not self.notifying:print('Not notifying, nothing to do')returnself.notifying = Falseclass TestService(Service):"""Dummy test service that provides characteristics and descriptors thatexercise various API functionality."""TEST_SVC_UUID = '12345678-1234-5678-1234-56789abcdef0'def __init__(self, bus, index):Service.__init__(self, bus, index, self.TEST_SVC_UUID, True)self.add_characteristic(TestCharacteristic(bus, 0, self))self.add_characteristic(TestEncryptCharacteristic(bus, 1, self))self.add_characteristic(TestSecureCharacteristic(bus, 2, self))class TestCharacteristic(Characteristic):"""Dummy test characteristic. Allows writing arbitrary bytes to its value, andcontains "extended properties", as well as a test descriptor."""TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef1'def __init__(self, bus, index, service):Characteristic.__init__(self, bus, index,self.TEST_CHRC_UUID,['read', 'write', 'writable-auxiliaries'],service)self.value = []self.add_descriptor(TestDescriptor(bus, 0, self))self.add_descriptor(CharacteristicUserDescriptionDescriptor(bus, 1, self))def ReadValue(self, options):print('TestCharacteristic Read: ' + repr(self.value))return self.valuedef WriteValue(self, value, options):print('TestCharacteristic Write: ' + repr(value))self.value = valueclass TestDescriptor(Descriptor):"""Dummy test descriptor. Returns a static value."""TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef2'def __init__(self, bus, index, characteristic):Descriptor.__init__(self, bus, index,self.TEST_DESC_UUID,['read', 'write'],characteristic)def ReadValue(self, options):return [dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')]class CharacteristicUserDescriptionDescriptor(Descriptor):"""Writable CUD descriptor."""CUD_UUID = '2901'def __init__(self, bus, index, characteristic):self.writable = 'writable-auxiliaries' in characteristic.flagsself.value = array.array('B', b'This is a characteristic for testing')self.value = self.value.tolist()Descriptor.__init__(self, bus, index,self.CUD_UUID,['read', 'write'],characteristic)def ReadValue(self, options):return self.valuedef WriteValue(self, value, options):if not self.writable:raise NotPermittedException()self.value = valueclass TestEncryptCharacteristic(Characteristic):"""Dummy test characteristic requiring encryption."""TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef3'def __init__(self, bus, index, service):Characteristic.__init__(self, bus, index,self.TEST_CHRC_UUID,['encrypt-read', 'encrypt-write'],service)self.value = []self.add_descriptor(TestEncryptDescriptor(bus, 2, self))self.add_descriptor(CharacteristicUserDescriptionDescriptor(bus, 3, self))def ReadValue(self, options):print('TestEncryptCharacteristic Read: ' + repr(self.value))return self.valuedef WriteValue(self, value, options):print('TestEncryptCharacteristic Write: ' + repr(value))self.value = valueclass TestEncryptDescriptor(Descriptor):"""Dummy test descriptor requiring encryption. Returns a static value."""TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef4'def __init__(self, bus, index, characteristic):Descriptor.__init__(self, bus, index,self.TEST_DESC_UUID,['encrypt-read', 'encrypt-write'],characteristic)def ReadValue(self, options):return [dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')]class TestSecureCharacteristic(Characteristic):"""Dummy test characteristic requiring secure connection."""TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef5'def __init__(self, bus, index, service):Characteristic.__init__(self, bus, index,self.TEST_CHRC_UUID,['secure-read', 'secure-write'],service)self.value = []self.add_descriptor(TestSecureDescriptor(bus, 2, self))self.add_descriptor(CharacteristicUserDescriptionDescriptor(bus, 3, self))def ReadValue(self, options):print('TestSecureCharacteristic Read: ' + repr(self.value))return self.valuedef WriteValue(self, value, options):print('TestSecureCharacteristic Write: ' + repr(value))self.value = valueclass TestSecureDescriptor(Descriptor):"""Dummy test descriptor requiring secure connection. Returns a static value."""TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef6'def __init__(self, bus, index, characteristic):Descriptor.__init__(self, bus, index,self.TEST_DESC_UUID,['secure-read', 'secure-write'],characteristic)def ReadValue(self, options):return [dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')]def register_app_cb():print('GATT application registered')def register_app_error_cb(error):print('Failed to register application: ' + str(error))mainloop.quit()def find_adapter(bus):remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),DBUS_OM_IFACE)objects = remote_om.GetManagedObjects()for o, props in objects.items():if GATT_MANAGER_IFACE in props.keys():return oreturn Nonedef main():global mainloopdbus.mainloop.glib.DBusGMainLoop(set_as_default=True)bus = dbus.SystemBus()adapter = find_adapter(bus)if not adapter:print('GattManager1 interface not found')returnservice_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),GATT_MANAGER_IFACE)app = Application(bus)mainloop = GObject.MainLoop()print('Registering GATT application...')service_manager.RegisterApplication(app.get_path(), {},reply_handler=register_app_cb,error_handler=register_app_error_cb)mainloop.run()if __name__ == '__main__':main()
此处仅以心跳服务作为详细解析对象。
BUS基础知识
可见client通过控制dbus,获取对端server控制dbus生成的service。