Last active
June 4, 2019 17:06
-
-
Save troolee/06cea54f5ec8d35db7ce575cf801ecbf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# pylint: disable=E0401,E1002 | |
import array | |
import functools | |
import dbus | |
import dbus.exceptions | |
import dbus.mainloop.glib | |
import dbus.service | |
import pprint | |
try: | |
from gi.repository import GObject as gobject | |
except ImportError: | |
import gobject | |
def log(msg): | |
if isinstance(msg, (dict, list)): | |
pprint.pprint(msg) | |
else: | |
print msg | |
DBusStringArray = functools.partial(dbus.Array, signature='s') | |
DBusByteStringArray = lambda x: dbus.Array(array.array('B', x), signature='y') | |
class DBusObjectBase(dbus.service.Object): | |
def __init__(self, bus, path): | |
super(DBusObjectBase, self).__init__(bus, path) | |
def get_path(self): | |
return dbus.ObjectPath(getattr(self, 'path')) | |
class Characteristic(DBusObjectBase): | |
def __init__(self, bus, index, uuid, flags, service): | |
self.path = service.path + '/characteristic/' + str(index) | |
self.bus = bus | |
self.uuid = uuid | |
self.service = service | |
self.flags = flags | |
self.descriptors = [] | |
super(Characteristic, self).__init__(bus, self.path) | |
def get_properties(self): | |
return { | |
'org.bluez.GattCharacteristic1': { | |
'Service': self.service.get_path(), | |
'UUID': self.uuid, | |
'Flags': self.flags, | |
'Descriptors': dbus.Array(self.get_descriptor_paths(), signature='o') | |
} | |
} | |
def add_descriptor(self, descriptor): | |
self.descriptors.append(descriptor) | |
def get_descriptor_paths(self): | |
return [d.get_path() for d in self.descriptors] | |
@dbus.service.method('org.freedesktop.DBus.Properties', in_signature='s', out_signature='a{sv}') | |
def GetAll(self, interface): | |
return self.get_properties()[interface] | |
@dbus.service.method('org.bluez.GattCharacteristic1', in_signature='a{sv}', out_signature='ay') | |
def ReadValue(self, options): | |
log('??? Default ReadValue called, returning error') | |
@dbus.service.method('org.bluez.GattCharacteristic1', in_signature='aya{sv}') | |
def WriteValue(self, value, options): | |
log('??? Default WriteValue called, returning error') | |
@dbus.service.method('org.bluez.GattCharacteristic1') | |
def StartNotify(self): | |
log('??? Default StartNotify called, returning error') | |
@dbus.service.method('org.bluez.GattCharacteristic1') | |
def StopNotify(self): | |
log('??? Default StopNotify called, returning error') | |
@dbus.service.signal('org.freedesktop.DBus.Properties', signature='sa{sv}as') | |
def PropertiesChanged(self, interface, changed, invalidated): | |
log([interface, changed, invalidated]) | |
log('PropertiesChanged') | |
class Descriptor(DBusObjectBase): | |
def __init__(self, bus, index, uuid, flags, characteristic): | |
self.path = characteristic.path + '/descriptor/' + str(index) | |
self.bus = bus | |
self.uuid = uuid | |
self.flags = flags | |
self.characteristic = characteristic | |
super(Descriptor, self).__init__(bus, self.path) | |
def get_properties(self): | |
return { | |
'org.bluez.GattDescriptor1': { | |
'Characteristic': self.characteristic.get_path(), | |
'UUID': self.uuid, | |
'Flags': self.flags, | |
} | |
} | |
@dbus.service.method('org.freedesktop.DBus.Properties', in_signature='s', out_signature='a{sv}') | |
def GetAll(self, interface): | |
return self.get_properties()[interface] | |
@dbus.service.method('org.freedesktop.DBus.Properties', in_signature='a{sv}', out_signature='ay') | |
def ReadValue(self, options): | |
log('??? Default ReadValue called, returning error') | |
@dbus.service.method('org.freedesktop.DBus.Properties', in_signature='aya{sv}') | |
def WriteValue(self, value, options): | |
log('??? Default WriteValue called, returning error') | |
class ZeyeAdvertisement(DBusObjectBase): | |
def __init__(self, bus): | |
self.bus = bus | |
self.path = '/ws/zeye/bluez/advertisement/0' | |
# self.service_uuids = DBusStringArray([ | |
# 'd9c3ab90-9f18-4fd8-b05f-93384e6b4988', | |
# ]) | |
# self.manufacturer_data = dbus.Dictionary({ | |
# 0xffff: DBusByteStringArray('Zeye'), | |
# }, signature='qv') | |
# self.service_data = dbus.Dictionary({ | |
# '9999': DBusByteStringArray('Test'), | |
# }, signature='sv') | |
# self.include_tx_power = True | |
super(ZeyeAdvertisement, self).__init__(bus, self.path) | |
def get_properties(self): | |
return { | |
'org.bluez.LEAdvertisement1': { | |
'Type': 'peripheral', | |
# 'ServiceUUIDs': self.service_uuids, | |
# 'ManufacturerData': self.manufacturer_data, | |
# 'ServiceData': self.service_data, | |
# 'IncludeTxPower': self.include_tx_power, | |
}, | |
} | |
@dbus.service.method('org.freedesktop.DBus.Properties', in_signature='s', out_signature='a{sv}') | |
def GetAll(self, interface): | |
log('Getting advertisement parameters (%s)...' % interface) | |
return self.get_properties().get(interface) | |
@dbus.service.method('org.bluez.LEAdvertisement1', in_signature='', out_signature='') | |
def Release(self): | |
log('%s: Released!' % self.path) | |
class ZeyeApplication(DBusObjectBase): | |
def __init__(self, bus): | |
self.path = '/ws/zeye/bluez/application/0' | |
super(ZeyeApplication, self).__init__(bus, self.path) | |
self.services = [ | |
ZeyeConfigService(bus), | |
] | |
@dbus.service.method('org.freedesktop.DBus.ObjectManager', out_signature='a{oa{sa{sv}}}') | |
def GetManagedObjects(self): | |
log('ZeyeApplication.GetManagedObjects') | |
response = {} | |
for service in self.services: | |
response[service.get_path()] = service.get_properties() | |
for c in service.characteristics: | |
response[c.get_path()] = c.get_properties() | |
for d in c.descriptors: | |
response[d.get_path()] = d.get_properties() | |
return response | |
class ZeyeConfigService(DBusObjectBase): | |
def __init__(self, bus): | |
self.path = '/ws/zeye/bluez/service/0' | |
self.uuid = '2e560001-f981-11e6-bc64-92361f002671' | |
super(ZeyeConfigService, self).__init__(bus, self.path) | |
self.bus = bus | |
self.primary = True | |
self.characteristics = [ | |
TestCharacteristic(bus, 0, self) | |
] | |
def get_properties(self): | |
return { | |
'org.bluez.GattService1': { | |
'UUID': self.uuid, | |
'Primary': self.primary, | |
'Characteristics': dbus.Array( | |
[c.get_path() for c in self.characteristics], | |
signature='o') | |
} | |
} | |
@dbus.service.method('org.freedesktop.DBus.Properties', in_signature='s', out_signature='a{sv}') | |
def GetAll(self, interface): | |
log('ZeyeConfigService.GetAll') | |
return self.get_properties()[interface] | |
class TestCharacteristic(Characteristic): | |
variants = [ | |
'brennan', | |
'aristoteles', | |
'vedic', | |
'zoanthropy', | |
'mercurous', | |
'solferino', | |
'newshen', | |
'zebrawood', | |
'frumentaceous', | |
'malmdy', | |
] | |
def __init__(self, bus, index, service): | |
Characteristic.__init__( | |
self, | |
bus, | |
index, | |
'2e560002-f981-11e6-bc64-92361f002671', | |
['read', 'notify'], # 'write', 'writable-auxiliaries', | |
service) | |
self.value = 'Hello' | |
self.notifying = False | |
gobject.timeout_add(500, self.update_value) | |
def update_value(self): | |
import random | |
self.value = self.variants[random.randint(0, 1000) % len(self.variants)] | |
self.notify_value() | |
return True | |
def ReadValue(self, options): | |
log('TestCharacteristic Read: ' + repr(self.value)) | |
return DBusByteStringArray(self.value) | |
def StartNotify(self): | |
if self.notifying: | |
return | |
log('Start notifications') | |
self.notifying = True | |
self.notify_value() | |
def StopNotify(self): | |
if not self.notifying: | |
return | |
log('Stop notifications') | |
self.notifying = False | |
def notify_value(self): | |
if not self.notifying: | |
return | |
log('Notify Value Change: ' + repr(self.value)) | |
self.PropertiesChanged('org.bluez.GattCharacteristic1', {'Value': DBusByteStringArray(self.value)}, []) | |
def find_adapter(bus, supported_interfaces): | |
remote_om = dbus.Interface(bus.get_object( | |
'org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager') | |
objects = remote_om.GetManagedObjects() | |
for o, props in objects.items(): | |
for interface in supported_interfaces: | |
if interface in props.keys(): | |
return o | |
return None | |
def register_ad_cb(): | |
log('Advertisement has been successfully registered.') | |
def register_ad_error_cb(error): | |
log('Failed to register advertisement: %s.' % error) | |
def register_app_cb(): | |
log('GATT application registered.') | |
def register_app_error_cb(error): | |
log('Failed to register application: %s.' % error) | |
def main(): | |
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) | |
bus = dbus.SystemBus() | |
log('Getting BT adapter...') | |
adapter = find_adapter( | |
bus, ['org.bluez.LEAdvertisingManager1', 'org.bluez.GattManager1']) | |
bluez = bus.get_object('org.bluez', adapter) | |
adapter_props = dbus.Interface(bluez, 'org.freedesktop.DBus.Properties') | |
log('-' * 80) | |
log('Adapter : %s' % adapter_props.Get('org.bluez.Adapter1', 'Name')) | |
log('Address : %s' % adapter_props.Get('org.bluez.Adapter1', 'Address')) | |
log('-' * 80) | |
log('Setting power on...') | |
adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1)) | |
mainloop = gobject.MainLoop() | |
log('Registering application...') | |
app = ZeyeApplication(bus) | |
service_manager = dbus.Interface(bluez, 'org.bluez.GattManager1') | |
service_manager.RegisterApplication( | |
app.get_path(), | |
{}, | |
reply_handler=register_app_cb, | |
error_handler=register_app_error_cb, ) | |
log('Registering advertisement...') | |
advertisement = ZeyeAdvertisement(bus) | |
ad_manager = dbus.Interface(bluez, 'org.bluez.LEAdvertisingManager1') | |
ad_manager.RegisterAdvertisement( | |
advertisement.get_path(), | |
{}, | |
reply_handler=register_ad_cb, | |
error_handler=register_ad_error_cb, ) | |
log('Starting main loop...') | |
try: | |
mainloop.run() | |
except KeyboardInterrupt: | |
log('\nShutting down...') | |
log('Unregistering application...') | |
service_manager.UnregisterApplication( | |
app.get_path(), | |
reply_handler=lambda: log('Advertisement has been successfully unregistered.'), | |
error_handler=lambda e: log('Failed to unregister advertisement: %s.' % e)) | |
log('Unregistering advertisement...') | |
ad_manager.UnregisterAdvertisement( | |
advertisement.get_path(), | |
reply_handler=lambda: log('Application has been successfully unregistered.'), | |
error_handler=lambda e: log('Failed to unregister application: %s.' % e)) | |
log('Bye.') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Pavel, thank you for the example. I'm new to BLE and python and your example helped me to send a byte array through the notification. There is very little documentation for that but after studying your code, I figured it out.
Thanks Again.
Mark