Skip to content

Instantly share code, notes, and snippets.

@JasonLG1979
Created August 27, 2020 12:32
Show Gist options
  • Save JasonLG1979/3bcbd8300d144a4fa38a8e3c578c6dd3 to your computer and use it in GitHub Desktop.
Save JasonLG1979/3bcbd8300d144a4fa38a8e3c578c6dd3 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# Copyright (c) 2020 Jason Gray <[email protected]>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
import systemd.daemon
from systemd import journal
from gi.repository import Gio, GLib
BLUEZ_IFACE = 'org.bluez'
BLUEZ_PATH = '/org/bluez'
MANAGER_IFACE = 'org.bluez.AgentManager1'
PROPS_IFACE = 'org.freedesktop.DBus.Properties'
A2DPAGENT_IFACE = 'org.bluez.Agent1'
A2DPAGENT_PATH = '/org/bluez/a2dp'
CAPS = 'NoInputNoOutput'
DBUS_TIMEOUT = -1
PINCODE = '000000'
PASSKEY = 000000
OWNERFLAGS = Gio.BusNameOwnerFlags.NONE
PROXY_FLAGS = Gio.DBusProxyFlags.DO_NOT_LOAD_PROPERTIES | Gio.DBusProxyFlags.DO_NOT_CONNECT_SIGNALS
A2DP_SERVICE_CLASS = '0000110d-0000-1000-8000-00805f9b34fb'
REJECTED = 'org.bluez.Error.Rejected'
PAIRING_ERROR = 'Pairing Error'
class Server:
def __init__(self, con, path):
method_outargs = {}
method_inargs = {}
for interface in Gio.DBusNodeInfo.new_for_xml(self.__doc__).interfaces:
for method in interface.methods:
method_outargs[method.name] = '({})'.format(''.join([arg.signature for arg in method.out_args]))
method_inargs[method.name] = tuple(arg.signature for arg in method.in_args)
con.register_object(object_path=path, interface_info=interface, method_call_closure=self._on_method_call)
self._method_inargs = method_inargs
self._method_outargs = method_outargs
def _on_method_call(self, connection, sender, object_path, interface_name, method_name, parameters, invocation):
args = list(parameters.unpack())
try:
result = getattr(self, method_name)(*args)
except ValueError as e:
invocation.return_dbus_error(interface_name, str(e))
else:
if result == REJECTED:
invocation.return_dbus_error(REJECTED, PAIRING_ERROR)
else:
result = (result,)
out_args = self._method_outargs[method_name]
if out_args != '()':
variant = GLib.Variant(out_args, result)
invocation.return_value(variant)
else:
invocation.return_value(None)
class A2DPAgent(Server):
"""
<!DOCTYPE node PUBLIC "-//freedesktop//DTD D-BUS Object Introspection 1.0//EN"
"http://www.freedesktop.org/standards/dbus/1.0/introspect.dtd">
<node>
<interface name="org.bluez.Agent1">
<method name="Release" />
<method name="RequestPinCode">
<arg direction="in" type="o" />
<arg direction="out" type="s" />
</method>
<method name="DisplayPinCode">
<arg direction="in" type="o" />
<arg direction="in" type="s" />
</method>
<method name="RequestPasskey">
<arg direction="in" type="o" />
<arg direction="out" type="u" />
</method>
<method name="DisplayPasskey">
<arg direction="in" type="o" />
<arg direction="in" type="u" />
<arg direction="in" type="q" />
</method>
<method name="RequestConfirmation">
<arg direction="in" type="o" />
<arg direction="in" type="u" />
</method>
<method name="RequestAuthorization">
<arg direction="in" type="o" />
</method>
<method name="AuthorizeService">
<arg direction="in" type="o" />
<arg direction="in" type="s" />
</method>
<method name="Cancel" />
</interface>
</node>
"""
def __init__(self, mainloop):
# A very simple bluetooth A2DP receiver pairing agent that accepts all requests,
# designed to be ran as a systemd service on headless systems.
self._mainloop = mainloop
bus = Gio.bus_get_sync(Gio.BusType.SYSTEM, None)
self._trust_variant = GLib.Variant('(ssb)', ('org.bluez.Device1', 'Trusted', True))
self._bus_id = Gio.bus_own_name_on_connection(bus, A2DPAGENT_IFACE, OWNERFLAGS, None, None)
Server.__init__(self, bus, A2DPAGENT_PATH)
def _trust(self, device):
try:
props = Gio.DBusProxy.new_for_bus_sync(
Gio.BusType.SYSTEM,
PROXY_FLAGS,
None,
BLUEZ_IFACE,
device,
PROPS_IFACE,
None
)
props.call_sync(
'Set',
self._trust_variant,
Gio.DBusCallFlags.NONE,
DBUS_TIMEOUT,
None
)
journal.sendv('MESSAGE=Device {} trusted'.format(device))
return True
except GLib.Error as e:
journal.sendv('MESSAGE=Failed to trust device {}: {}'.format(device, e))
return False
def quit(self):
if self._bus_id:
try:
Gio.bus_unown_name(self._bus_id)
except:
pass
try:
self._mainloop.quit()
except:
pass
def Release(self):
journal.sendv('MESSAGE=Release')
self.quit()
def RequestPinCode(self, device):
journal.sendv('MESSAGE=RequestPinCode: device {}'.format(device))
if self._trust(device):
return PINCODE
return REJECTED
def DisplayPinCode(self, device, pincode):
journal.sendv('MESSAGE=DisplayPinCode: device {}, pincode {}'.format(device, pincode))
def RequestPasskey(self, device):
journal.sendv('MESSAGE=RequestPasskey: device {}'.format(device))
if self._trust(device):
return PASSKEY
return REJECTED
def DisplayPasskey(self, device, passkey, entered):
journal.sendv('MESSAGE=DisplayPasskey: device {}, passkey {}, entered {}'.format(device, passkey, entered))
def RequestConfirmation(self, device, passkey):
journal.sendv('MESSAGE=RequestConfirmation: device {}, passkey {}'.format(device, passkey))
if not self._trust(device):
return REJECTED
def RequestAuthorization(self, device):
journal.sendv('MESSAGE=RequestAuthorization: device {}'.format(device))
if not self._trust(device):
return REJECTED
def AuthorizeService(self, device, uuid):
journal.sendv('MESSAGE=AuthorizeService: device {}, uuid {}'.format(device, uuid))
if uuid == A2DP_SERVICE_CLASS:
if not self._trust(device):
return REJECTED
else:
journal.sendv('MESSAGE=Service Not Supported')
return REJECTED
def Cancel(self):
journal.sendv('MESSAGE=Cancel')
def Introspect(self):
journal.sendv('MESSAGE=Introspect')
return self.__doc__
if __name__ == '__main__':
try:
mainloop = GLib.MainLoop()
agent = A2DPAgent(mainloop)
manager = Gio.DBusProxy.new_for_bus_sync(
Gio.BusType.SYSTEM,
PROXY_FLAGS,
None,
BLUEZ_IFACE,
BLUEZ_PATH,
MANAGER_IFACE,
None
)
manager.call_sync(
'RegisterAgent',
GLib.Variant('(os)', (A2DPAGENT_PATH, CAPS)),
Gio.DBusCallFlags.NONE,
DBUS_TIMEOUT,
None
)
manager.call_sync(
'RequestDefaultAgent',
GLib.Variant('(o)', (A2DPAGENT_PATH,)),
Gio.DBusCallFlags.NONE,
DBUS_TIMEOUT,
None
)
journal.sendv('MESSAGE=A2DP Agent registered')
systemd.daemon.notify('READY=1')
mainloop.run()
except KeyboardInterrupt:
agent.quit()
except Exception as e:
agent.quit()
journal.sendv('MESSAGE={}'.format(str(e)))
systemd.daemon.notify('BUSERROR={}'.format(str(e.code)))
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment