Skip to content

Instantly share code, notes, and snippets.

@arubdesu
Last active February 8, 2016 17:49
Show Gist options
  • Save arubdesu/eee54869af345366a13d to your computer and use it in GitHub Desktop.
Save arubdesu/eee54869af345366a13d to your computer and use it in GitHub Desktop.
All the things
#!/usr/bin/python
"""Collect inventory via osquery, etc"""
import csv
import datetime
import os
import platform
import plistlib
import subprocess
import sys
CWD = os.path.dirname(__file__)
sys.path.append(CWD + '/osquery')
import osquery
#pylint: disable=no-name-in-module
from SystemConfiguration import SCNetworkInterfaceCopyAll, \
SCNetworkInterfaceGetLocalizedDisplayName, SCNetworkInterfaceGetBSDName, \
SCNetworkInterfaceGetHardwareAddressString, SCDynamicStoreCreate, \
SCDynamicStoreCopyValue
def get_adinfo(net_config):
'''Returns the ad directory info'''
ad_info = SCDynamicStoreCopyValue(net_config, "com.apple.opendirectoryd.ActiveDirectory")
if ad_info:
return ad_info
else:
return {}
def get_adadmins():
'''Returns list of ad-connector-configured admin groups'''
try:
nested_admins = \
plistlib.readPlistFromString(subprocess.check_output( \
['/usr/sbin/dsconfigad', '-show', '-xml']))
if nested_admins:
return nested_admins['Administrative']['Allowed admin groups']
except Exception as _err:
print _err
return {}
def get_printers():
'''Returns name, ppd, and uri of printers'''
full_prints = \
plistlib.readPlistFromString(subprocess.check_output( \
['/usr/sbin/system_profiler', 'SPPrintersDataType', '-xml']))
all_printdicts = full_prints[0]['_items']
just_prints = {}
for printer in all_printdicts:
just_prints[printer.get('_name', None)] = [printer.get('ppd', None),
printer.get('uri', None)]
return just_prints
def get_dnsinfo(net_config):
'''Returns the currently considered primary network interface'''
dns_info = SCDynamicStoreCopyValue(net_config, "State:/Network/Global/DNS")
return dict(dns_info)
def get_primaryinterface(net_config):
'''Returns the currently considered primary network interface'''
states = SCDynamicStoreCopyValue(net_config, "State:/Network/Global/IPv4")
if states:
return states['PrimaryInterface']
else:
return {}
def lookup_guids(net_config, guid):
'''Returns net iface for IPv4 guid'''
netname = SCDynamicStoreCopyValue(net_config, "Setup:/Network/Service/%s" %
guid)
if netname:
return netname['UserDefinedName']
else:
return {}
def get_is_dhcp(net_config, guid):
'''Returns net iface for IPv4 guid'''
config = SCDynamicStoreCopyValue(net_config,
"Setup:/Network/Service/%s/IPv4" %
guid)
if config:
if ['ConfigMethod'] == 'DHCP':
return True
else:
return False
def get_ip_address(net_config, iface):
'''Returns IP for provided interface'''
try:
addresses = SCDynamicStoreCopyValue(net_config,
"State:/Network/Interface/%s/IPv4" %
iface)
return addresses['Addresses'][0]
except TypeError:
return None
def get_networkinterfacelist(net_config):
'''Returns a list of all network interface names/MACs'''
all_media = {}
names_forguids = []
guids = {}
network_interfaces = SCNetworkInterfaceCopyAll()
for interface in network_interfaces:
bsdname = SCNetworkInterfaceGetBSDName(interface)
disp_name = SCNetworkInterfaceGetLocalizedDisplayName(interface)
mac_addy = SCNetworkInterfaceGetHardwareAddressString(interface)
if not disp_name == 'Bluetooth PAN':
all_media[disp_name] = [get_ip_address(net_config, bsdname),
mac_addy, bsdname]
names_forguids.append(disp_name)
guid_list = SCDynamicStoreCopyValue(net_config, "Setup:/Network/Global/IPv4")
for guid in guid_list['ServiceOrder']:
lookd_up = lookup_guids(net_config, guid)
guids[lookd_up] = guid
for iface_key in names_forguids:
associated_guid = guids.get(iface_key, None)
check_dhcp = get_is_dhcp(net_config, associated_guid)
if check_dhcp:
all_media[iface_key].append('DHCP')
return all_media.items()
def parse_groups(group, result_dict):
'''takes group to parse and current output, returns filtered list'''
parsed = []
stripd_usrs = result_dict.get('users')
group_usrs = result_dict.get(group)
for user_dict in stripd_usrs:
new_dict = {}
this_uid = user_dict.get('uid')
for grp_uid in group_usrs:
if this_uid == grp_uid.get('uid'):
new_dict[user_dict.get('username')] = this_uid
parsed.append(new_dict)
return parsed
def parse_users(result_dict):
'''takes group to parse and current output, returns filtered list'''
parsed = []
stripd_usrs = result_dict.get('users')
for user_dict in stripd_usrs:
del user_dict['uid_signed']
del user_dict['gid_signed']
del user_dict['shell']
parsed.append(user_dict)
return parsed
def run_osquery(sql_dict):
"""take sql command you'd like output from osquery for, returns..."""
result_dict = {}
instance = osquery.SpawnInstance(path='%s/osqueryd' % CWD)
instance.open()
for name, quer in sql_dict.items():
results = instance.client.query(quer)
result_dict[name] = results.response
return result_dict
def pokemon():
"""returns inventory items"""
query_dict = {}
result_dict = {}
inventory = [('hostname, cpu_serial, hardware_model, physical_memory, \
cpu_brand', 'system_info'),
('major, minor, patch, build', 'os_version'),
('global_state', 'alf'),
('name, path, bundle_identifier, bundle_name, \
bundle_short_version, bundle_version, copyright, \
bundle_executable', 'apps')
]
for query in inventory:
sql_string = ''.join(['select ', query[0], ' from ', query[1]])
query_dict[query[1]] = sql_string
complex_qs = [("select uid from user_groups where gid like '80' and uid \
not like '0'", 'admins'),
("select uid from user_groups where gid like '398' and uid \
not like '0'", 'ARD'),
("select uid from user_groups where gid like '404' and uid \
not like '0'", 'ssh'),
("select * from users where shell not like '/usr/%' and uid \
not like '0' and uid not like '248'", 'users'),
("select * from disk_encryption where encrypted like '1'",
'disk_encryption'),
("select device, path, blocks, blocks_size, blocks_free, \
blocks_available from mounts where device not like 'map%' \
and device not like 'dev%'", 'mounts'),
("select name, label from block_devices where type not like \
'Virtual Interface' and label like '%Media'", 'block_devices')
]
for quer in complex_qs:
query_dict[quer[1]] = quer[0]
result_dict = run_osquery(query_dict)
result_dict['admins'] = parse_groups('admins', result_dict)
result_dict['ARD'] = parse_groups('ARD', result_dict)
result_dict['ssh'] = parse_groups('ssh', result_dict)
result_dict['users'] = parse_users(result_dict)
return result_dict
def main():
'''gimme some main'''
if int(platform.mac_ver()[0].split('.')[1]) < 9:
print 'OS too old for support'
sys.exit(1)
net_config = SCDynamicStoreCreate(None, "net", None, None)
now = str(datetime.datetime.now())[:-7]
extend = {}
extend['ad_info'] = dict(get_adinfo(net_config))
extend['ad_groups'] = get_adadmins()
extend['primary_interface'] = get_primaryinterface(net_config)
extend['printers'] = get_printers()
extend['network_interfaces'] = get_networkinterfacelist(net_config)
extend['DateCollected'] = now
result_dict = pokemon()
result_dict.update(extend)
pars_serial = result_dict.get('system_info')[0]
serial = pars_serial.get('cpu_serial')
filename_now = (now.replace(':', '-')).replace(' ', '_')
print result_dict
our_filepath = os.path.join('/tmp/', serial + '_' + filename_now + '.csv')
print our_filepath
with open(our_filepath, 'w') as target:
writer = csv.writer(target)
writer.writerows(zip(result_dict.keys(), result_dict.values()))
if __name__ == '__main__':
main()
@theopolis
Copy link

If you keep the instance within a longer-lived scope you'll get a bit more performance improvement!

@arubdesu
Copy link
Author

Did I do the thing? https://gist.github.com/arubdesu/eee54869af345366a13d#file-adhoc-osq-py-L149-L157 I really can't tell, I use such primitive data structure handling.

@theopolis
Copy link

Yes! This looks good!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment