Last active
February 8, 2016 17:49
-
-
Save arubdesu/eee54869af345366a13d to your computer and use it in GitHub Desktop.
All the things
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
"""Collect inventory via osquery, etc""" | |
import csv | |
import datetime | |
import os | |
import platform | |
import plistlib | |
import subprocess | |
import sys | |
CWD = os.path.dirname(__file__) | |
sys.path.append(CWD + '/osquery') | |
import osquery | |
#pylint: disable=no-name-in-module | |
from SystemConfiguration import SCNetworkInterfaceCopyAll, \ | |
SCNetworkInterfaceGetLocalizedDisplayName, SCNetworkInterfaceGetBSDName, \ | |
SCNetworkInterfaceGetHardwareAddressString, SCDynamicStoreCreate, \ | |
SCDynamicStoreCopyValue | |
def get_adinfo(net_config): | |
'''Returns the ad directory info''' | |
ad_info = SCDynamicStoreCopyValue(net_config, "com.apple.opendirectoryd.ActiveDirectory") | |
if ad_info: | |
return ad_info | |
else: | |
return {} | |
def get_adadmins(): | |
'''Returns list of ad-connector-configured admin groups''' | |
try: | |
nested_admins = \ | |
plistlib.readPlistFromString(subprocess.check_output( \ | |
['/usr/sbin/dsconfigad', '-show', '-xml'])) | |
if nested_admins: | |
return nested_admins['Administrative']['Allowed admin groups'] | |
except Exception as _err: | |
print _err | |
return {} | |
def get_printers(): | |
'''Returns name, ppd, and uri of printers''' | |
full_prints = \ | |
plistlib.readPlistFromString(subprocess.check_output( \ | |
['/usr/sbin/system_profiler', 'SPPrintersDataType', '-xml'])) | |
all_printdicts = full_prints[0]['_items'] | |
just_prints = {} | |
for printer in all_printdicts: | |
just_prints[printer.get('_name', None)] = [printer.get('ppd', None), | |
printer.get('uri', None)] | |
return just_prints | |
def get_dnsinfo(net_config): | |
'''Returns the currently considered primary network interface''' | |
dns_info = SCDynamicStoreCopyValue(net_config, "State:/Network/Global/DNS") | |
return dict(dns_info) | |
def get_primaryinterface(net_config): | |
'''Returns the currently considered primary network interface''' | |
states = SCDynamicStoreCopyValue(net_config, "State:/Network/Global/IPv4") | |
if states: | |
return states['PrimaryInterface'] | |
else: | |
return {} | |
def lookup_guids(net_config, guid): | |
'''Returns net iface for IPv4 guid''' | |
netname = SCDynamicStoreCopyValue(net_config, "Setup:/Network/Service/%s" % | |
guid) | |
if netname: | |
return netname['UserDefinedName'] | |
else: | |
return {} | |
def get_is_dhcp(net_config, guid): | |
'''Returns net iface for IPv4 guid''' | |
config = SCDynamicStoreCopyValue(net_config, | |
"Setup:/Network/Service/%s/IPv4" % | |
guid) | |
if config: | |
if ['ConfigMethod'] == 'DHCP': | |
return True | |
else: | |
return False | |
def get_ip_address(net_config, iface): | |
'''Returns IP for provided interface''' | |
try: | |
addresses = SCDynamicStoreCopyValue(net_config, | |
"State:/Network/Interface/%s/IPv4" % | |
iface) | |
return addresses['Addresses'][0] | |
except TypeError: | |
return None | |
def get_networkinterfacelist(net_config): | |
'''Returns a list of all network interface names/MACs''' | |
all_media = {} | |
names_forguids = [] | |
guids = {} | |
network_interfaces = SCNetworkInterfaceCopyAll() | |
for interface in network_interfaces: | |
bsdname = SCNetworkInterfaceGetBSDName(interface) | |
disp_name = SCNetworkInterfaceGetLocalizedDisplayName(interface) | |
mac_addy = SCNetworkInterfaceGetHardwareAddressString(interface) | |
if not disp_name == 'Bluetooth PAN': | |
all_media[disp_name] = [get_ip_address(net_config, bsdname), | |
mac_addy, bsdname] | |
names_forguids.append(disp_name) | |
guid_list = SCDynamicStoreCopyValue(net_config, "Setup:/Network/Global/IPv4") | |
for guid in guid_list['ServiceOrder']: | |
lookd_up = lookup_guids(net_config, guid) | |
guids[lookd_up] = guid | |
for iface_key in names_forguids: | |
associated_guid = guids.get(iface_key, None) | |
check_dhcp = get_is_dhcp(net_config, associated_guid) | |
if check_dhcp: | |
all_media[iface_key].append('DHCP') | |
return all_media.items() | |
def parse_groups(group, result_dict): | |
'''takes group to parse and current output, returns filtered list''' | |
parsed = [] | |
stripd_usrs = result_dict.get('users') | |
group_usrs = result_dict.get(group) | |
for user_dict in stripd_usrs: | |
new_dict = {} | |
this_uid = user_dict.get('uid') | |
for grp_uid in group_usrs: | |
if this_uid == grp_uid.get('uid'): | |
new_dict[user_dict.get('username')] = this_uid | |
parsed.append(new_dict) | |
return parsed | |
def parse_users(result_dict): | |
'''takes group to parse and current output, returns filtered list''' | |
parsed = [] | |
stripd_usrs = result_dict.get('users') | |
for user_dict in stripd_usrs: | |
del user_dict['uid_signed'] | |
del user_dict['gid_signed'] | |
del user_dict['shell'] | |
parsed.append(user_dict) | |
return parsed | |
def run_osquery(sql_dict): | |
"""take sql command you'd like output from osquery for, returns...""" | |
result_dict = {} | |
instance = osquery.SpawnInstance(path='%s/osqueryd' % CWD) | |
instance.open() | |
for name, quer in sql_dict.items(): | |
results = instance.client.query(quer) | |
result_dict[name] = results.response | |
return result_dict | |
def pokemon(): | |
"""returns inventory items""" | |
query_dict = {} | |
result_dict = {} | |
inventory = [('hostname, cpu_serial, hardware_model, physical_memory, \ | |
cpu_brand', 'system_info'), | |
('major, minor, patch, build', 'os_version'), | |
('global_state', 'alf'), | |
('name, path, bundle_identifier, bundle_name, \ | |
bundle_short_version, bundle_version, copyright, \ | |
bundle_executable', 'apps') | |
] | |
for query in inventory: | |
sql_string = ''.join(['select ', query[0], ' from ', query[1]]) | |
query_dict[query[1]] = sql_string | |
complex_qs = [("select uid from user_groups where gid like '80' and uid \ | |
not like '0'", 'admins'), | |
("select uid from user_groups where gid like '398' and uid \ | |
not like '0'", 'ARD'), | |
("select uid from user_groups where gid like '404' and uid \ | |
not like '0'", 'ssh'), | |
("select * from users where shell not like '/usr/%' and uid \ | |
not like '0' and uid not like '248'", 'users'), | |
("select * from disk_encryption where encrypted like '1'", | |
'disk_encryption'), | |
("select device, path, blocks, blocks_size, blocks_free, \ | |
blocks_available from mounts where device not like 'map%' \ | |
and device not like 'dev%'", 'mounts'), | |
("select name, label from block_devices where type not like \ | |
'Virtual Interface' and label like '%Media'", 'block_devices') | |
] | |
for quer in complex_qs: | |
query_dict[quer[1]] = quer[0] | |
result_dict = run_osquery(query_dict) | |
result_dict['admins'] = parse_groups('admins', result_dict) | |
result_dict['ARD'] = parse_groups('ARD', result_dict) | |
result_dict['ssh'] = parse_groups('ssh', result_dict) | |
result_dict['users'] = parse_users(result_dict) | |
return result_dict | |
def main(): | |
'''gimme some main''' | |
if int(platform.mac_ver()[0].split('.')[1]) < 9: | |
print 'OS too old for support' | |
sys.exit(1) | |
net_config = SCDynamicStoreCreate(None, "net", None, None) | |
now = str(datetime.datetime.now())[:-7] | |
extend = {} | |
extend['ad_info'] = dict(get_adinfo(net_config)) | |
extend['ad_groups'] = get_adadmins() | |
extend['primary_interface'] = get_primaryinterface(net_config) | |
extend['printers'] = get_printers() | |
extend['network_interfaces'] = get_networkinterfacelist(net_config) | |
extend['DateCollected'] = now | |
result_dict = pokemon() | |
result_dict.update(extend) | |
pars_serial = result_dict.get('system_info')[0] | |
serial = pars_serial.get('cpu_serial') | |
filename_now = (now.replace(':', '-')).replace(' ', '_') | |
print result_dict | |
our_filepath = os.path.join('/tmp/', serial + '_' + filename_now + '.csv') | |
print our_filepath | |
with open(our_filepath, 'w') as target: | |
writer = csv.writer(target) | |
writer.writerows(zip(result_dict.keys(), result_dict.values())) | |
if __name__ == '__main__': | |
main() |
Did I do the thing? https://gist.github.com/arubdesu/eee54869af345366a13d#file-adhoc-osq-py-L149-L157 I really can't tell, I use such primitive data structure handling.
Yes! This looks good!
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
If you keep the
instance
within a longer-lived scope you'll get a bit more performance improvement!