Last active
May 2, 2022 11:10
-
-
Save RicterZ/686e8e34e9e1c8f31da9d17baef5fe94 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import dbus | |
import os | |
import glob | |
import copy | |
import json | |
import sys | |
from xml.dom import minidom | |
SIGNAL_DETAIL = { | |
'Arguments': '' | |
} | |
PROPERTY_DETAIL = { | |
'Privileged': False, | |
'ReadOnly': True, | |
'Type': '' | |
} | |
METHOD_DETAIL = { | |
'Arguments': '', | |
'Privileged': False | |
} | |
INTERFACE_DETAIL = { | |
'Methods': {}, | |
'Properties': {}, | |
'Signals': {} | |
} | |
PATH_DETAIL = { | |
'Interfaces': {}, | |
'Ref': None | |
} | |
DBUS_DETAIL = { | |
'Paths': {}, | |
'Policies': {} | |
} | |
DBUS_DICT = { | |
} | |
POLKIT_ACTIONS = { | |
} | |
SYSTEMD_PATH = [ | |
'/usr/lib/systemd/system', | |
'/etc/systemd/system', | |
] | |
DBUS_PATH = [ | |
'/usr/share/dbus-1/system-services', | |
'/usr/local/share/dbus-1/system-services', | |
'/lib/dbus-1/system-services', | |
] | |
POLKIT_PATH = [ | |
'/usr/share/polkit-1/actions', | |
] | |
IGNORED_PATH = ( | |
) | |
IGNORED_DBUS = ( | |
) | |
IGNORED_INTERFACES = ( | |
'org.freedesktop.DBus.Introspectable', | |
'org.freedesktop.DBus.Peer', | |
'org.freedesktop.DBus.Properties', | |
'org.freedesktop.DBus.ObjectManager', | |
) | |
def logger(msg): | |
if os.getenv('DEBUG') == '1': | |
print(msg) | |
def parse_dbus(paths, identifier='Name='): | |
for i in paths: | |
if not os.path.exists(i): | |
continue | |
logger('[*] Parsing files under directory \'{}\''.format(i)) | |
for f in glob.glob('{}/**/*'.format(i), recursive=True): | |
if not os.path.isfile(f): | |
continue | |
with open(f, 'r') as f: | |
data = f.read() | |
for line in data.splitlines(): | |
if line.startswith(identifier): | |
_, name = line.split('=', 1) | |
name = name.strip() | |
logger('[+] Got dbus name \'{}\''.format(name)) | |
if name in IGNORED_DBUS: | |
continue | |
if name not in DBUS_DICT: | |
obj = copy.deepcopy(DBUS_DETAIL) | |
DBUS_DICT[name] = obj | |
def get_interfaces(dom): | |
ret = {} | |
interfaces = dom.getElementsByTagName('interface') | |
for iface in interfaces: | |
iface_name = iface.getAttribute('name') | |
if iface_name in IGNORED_INTERFACES: | |
continue | |
logger('[+] Found interface \'{}\''.format(iface_name)) | |
iface_obj = copy.deepcopy(INTERFACE_DETAIL) | |
iface_obj['Methods'] = get_methods(iface) | |
iface_obj['Properties'] = get_properties(iface) | |
iface_obj['Signals'] = get_signals(iface) | |
ret[iface_name] = iface_obj | |
return ret | |
def get_methods(dom): | |
ret = {} | |
methods = dom.getElementsByTagName('method') | |
for method in methods: | |
method_obj = copy.deepcopy(METHOD_DETAIL) | |
method_name = method.getAttribute('name') | |
logger('[*] Found method \'{}\''.format(method_name)) | |
args = method.getElementsByTagName('arg') | |
args_list = [] | |
for arg in args: | |
arg_name = arg.getAttribute('name') | |
arg_type = arg.getAttribute('type') | |
arg_direction = arg.getAttribute('direction') | |
if arg_direction == 'in': | |
args_list.append('in {} {}'.format(arg_type, arg_name).strip()) | |
annotations = method.getElementsByTagName('annotation') | |
for annotation in annotations: | |
if 'Privileged' in annotation.getAttribute('name'): | |
method_obj['Privileged'] = True | |
method_obj['Arguments'] = ', '.join(args_list) | |
ret[method_name] = method_obj | |
return ret | |
def get_properties(dom): | |
ret = {} | |
properties = dom.getElementsByTagName('property') | |
for p in properties: | |
property_name = p.getAttribute('name') | |
property_type = p.getAttribute('type') | |
property_access = p.getAttribute('access') | |
logger('[*] Found property \'{}\''.format(property_name)) | |
property_obj = copy.deepcopy(PROPERTY_DETAIL) | |
property_obj['ReadOnly'] = property_access == 'read' | |
property_obj['Type'] = property_type | |
annotations = p.getElementsByTagName('annotation') | |
for annotation in annotations: | |
if 'Privileged' in annotation.getAttribute('name'): | |
property_obj['Privileged'] = True | |
ret[property_name] = property_obj | |
return ret | |
def get_signals(dom): | |
ret = {} | |
signals = dom.getElementsByTagName('signal') | |
for signal in signals: | |
name = signal.getAttribute('name') | |
logger('[*] Found signal \'{}\''.format(name)) | |
args = signal.getElementsByTagName('arg') | |
args_list = [] | |
for arg in args: | |
arg_name = arg.getAttribute('name') | |
arg_type = arg.getAttribute('type') | |
args_list.append('{} {}'.format(arg_type, arg_name).strip()) | |
signal_obj = copy.deepcopy(SIGNAL_DETAIL) | |
signal_obj['Arguments'] = ', '.join(args_list) | |
ret[name] = signal_obj | |
return ret | |
def introspect_bus(name, path='/'): | |
logger('[*] Introspecting \'{}\' with path \'{}\''.format(name, path)) | |
DBUS_DICT[name]['Policies'] = find_match_polkit_actions(name) | |
bus = dbus.SystemBus() | |
try: | |
obj = bus.get_object(name, path) | |
introspect = obj.Introspect(dbus_interface='org.freedesktop.DBus.Introspectable') | |
# logger(introspect) | |
except Exception as e: | |
logger('[-] Error while introspecting \'{}\': {}'.format(name, e)) | |
return | |
path_obj = copy.deepcopy(PATH_DETAIL) | |
try: | |
logger(introspect) | |
dom = minidom.parseString(introspect) | |
except Exception as e: | |
logger('[-] Error while formatting xml: {}'.format(e)) | |
logger(introspect) | |
return | |
nodes = dom.getElementsByTagName('node') | |
for node in nodes: | |
node_name = node.getAttribute('name') | |
if node_name == '' or node_name == path: | |
path_obj['Interfaces'] = get_interfaces(node) | |
continue | |
found_path = path.rstrip('/') + '/' + node_name | |
if found_path in IGNORED_PATH: | |
continue | |
logger('[+] Found path \'{}\''.format(found_path)) | |
if found_path != path: | |
introspect_bus(name, found_path) | |
if path_obj['Interfaces']: | |
ret = merge_path(name, path, path_obj['Interfaces']) | |
if ret: | |
logger('[*] Path \'{}\' now is refer to \'{}\''.format(path, ret)) | |
path_obj['Interfaces'] = {} | |
path_obj['Ref'] = ret | |
DBUS_DICT[name]['Paths'][path] = path_obj | |
def merge_path(dbus_name, path_name, interfaces): | |
path_name = path_name.rsplit('/', 1)[0] | |
keys = set(interfaces.keys()) | |
paths = DBUS_DICT.get(dbus_name, {})['Paths'] | |
for path, path_detail in paths.items(): | |
if path.startswith(path_name) and path_detail['Ref'] is None: | |
keys2 = set(path_detail['Interfaces'].keys()) | |
if not keys ^ keys2: | |
return path | |
def get_all_running_dbus(): | |
bus = dbus.SystemBus() | |
obj = bus.get_object('org.freedesktop.DBus','/org/freedesktop/DBus') | |
interface = dbus.Interface(obj, dbus_interface='org.freedesktop.DBus') | |
m = interface.get_dbus_method('ListNames') | |
for i in m(): | |
if i.startswith(':'): | |
continue | |
if i not in DBUS_DICT: | |
logger('[+] Found new dbus \'{}\''.format(i)) | |
obj = copy.deepcopy(DBUS_DETAIL) | |
DBUS_DICT[i] = obj | |
def print_dbus(name_filter=''): | |
printed_ref = set() | |
for dbus_name, dbus_detail in DBUS_DICT.items(): | |
print() | |
if name_filter and name_filter != dbus_name: | |
continue | |
print(dbus_name + ' {') | |
for k, v in dbus_detail['Policies'].items(): | |
print(' % {}: {}'.format(k, v)) | |
for path, path_detail in dbus_detail['Paths'].items(): | |
if path_detail['Ref']: | |
print(' * {} -> {}'.format(path, path_detail['Ref'])) | |
continue | |
print(' * ' + path) | |
for iface, iface_detail in path_detail['Interfaces'].items(): | |
print(' - ' + iface) | |
for method, method_detail in iface_detail['Methods'].items(): | |
if method_detail['Privileged']: | |
sign = '+ privileged' | |
else: | |
sign = '+' | |
print(' {} {}({})'.format(sign, method, method_detail['Arguments'])) | |
for signal, signal_detail in iface_detail['Signals'].items(): | |
print(' @ {}({})'.format(signal, signal_detail['Arguments'])) | |
for p, p_detail in iface_detail['Properties'].items(): | |
if p_detail['Privileged']: | |
sign = '$ privileged' | |
else: | |
sign = '$' | |
if p_detail['ReadOnly']: | |
readable = 'readonly' | |
else: | |
readable = 'readwrite' | |
print(' {} {}({}) {}'.format(sign, readable, p_detail['Type'], p)) | |
print('}') | |
def load_polkit(paths): | |
for i in paths: | |
if not os.path.exists(i): | |
continue | |
for f in glob.glob('{}/*.policy'.format(i), recursive=True): | |
with open(f, 'r') as f: | |
data = f.read() | |
dom = minidom.parseString(data) | |
actions = dom.getElementsByTagName('action') | |
for action in actions: | |
action_name = action.getAttribute('id') | |
allow_any = action.getElementsByTagName('allow_any') | |
if allow_any: | |
allow = allow_any[0].firstChild.nodeValue | |
else: | |
allow = 'unknown' | |
POLKIT_ACTIONS[action.getAttribute('id')] = allow | |
def find_match_polkit_actions(name): | |
ret = {} | |
for k, v in POLKIT_ACTIONS.items(): | |
prefix = k.lower().rsplit('.', 1)[0].replace('-', '') | |
if name.lower().startswith(prefix): | |
ret[k] = v | |
return ret | |
if __name__ == '__main__': | |
load_polkit(POLKIT_PATH) | |
name_filter = '' if len(sys.argv) < 2 else sys.argv[1] | |
if not name_filter: | |
parse_dbus(DBUS_PATH) | |
parse_dbus(SYSTEMD_PATH, 'BusName=') | |
get_all_running_dbus() | |
for key in DBUS_DICT.keys(): | |
introspect_bus(key) | |
else: | |
DBUS_DICT[name_filter] = copy.deepcopy(DBUS_DETAIL) | |
introspect_bus(name_filter) | |
print_dbus(name_filter) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment