Last active
August 22, 2024 16:10
-
-
Save badstreff/f5a81e8986fb180b5257db1dd7143917 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/local/bin/python3 | |
""" | |
Custom inventory script for Ansible populated by the JSS | |
""" | |
from functools import lru_cache | |
from os.path import dirname, realpath, join | |
from urllib.parse import quote | |
import argparse | |
import json | |
import configparser | |
import requests | |
import os | |
import time | |
# import pprint | |
CACHE_PATH = os.path.expanduser('~/Library/Caches/ansible/jss.inventory') | |
REFRESH_THRESHOLD = 600 # time in seconds to wait before refreshing the cache | |
class JSS: | |
def __init__(self, url, username, password): | |
self.url = url | |
self.username = username | |
self.password = password | |
def get_all_computers(self): | |
req = requests.get(self.url + '/JSSResource/computers', | |
auth=(self.username, self.password), | |
headers={'Accept': 'application/json'}) | |
if req.status_code == 200: | |
return json.loads(req.content.decode('utf-8')) | |
return None | |
@lru_cache(maxsize=None) | |
def get_computer(self, id=None, name=None): | |
if id: | |
url = (self.url + | |
'/JSSResource/computers/id/' + | |
str(id)) | |
else: | |
url = (self.url + | |
'/JSSResource/computers/name/' + | |
quote(name)) | |
req = requests.get(url, | |
auth=(self.username, self.password), | |
headers={'Accept': 'application/json'}) | |
if req.status_code == 200: | |
return json.loads(req.content.decode('utf-8')) | |
return None | |
def get_all_computergroups(self): | |
req = requests.get(self.url + '/JSSResource/computergroups', | |
auth=(self.username, self.password), | |
headers={'Accept': 'application/json'}) | |
if req.status_code == 200: | |
return json.loads(req.content.decode('utf-8')) | |
return None | |
def get_computergroup(self, id=None, name=None): | |
if id: | |
url = (self.url + | |
'/JSSResource/computergroups/id/' + | |
str(id)) | |
else: | |
url = (self.url + | |
'/JSSResource/computergroups/name/' + | |
quote(name)) | |
req = requests.get(url, | |
auth=(self.username, self.password), | |
headers={'Accept': 'application/json'}) | |
if req.status_code == 200: | |
return json.loads(req.content.decode('utf-8')) | |
return None | |
# Empty inventory for testing. | |
def empty_inventory(): | |
return {'_meta': {'hostvars': {}}} | |
def main(args=None): | |
mypath = dirname(realpath(__file__)) | |
config = read_jss_config(join(dirname(mypath), | |
'private/jss.conf'), | |
'JSS') | |
refresh_cache = args.refresh_cache | |
cache_last_modified = os.stat(CACHE_PATH).st_mtime | |
jss = JSS(config['url'], | |
config['api'][0], | |
config['api'][1]) | |
if args.host: | |
print(json.dumps(empty_inventory())) | |
exit(0) | |
if cache_last_modified < time.time() - REFRESH_THRESHOLD: | |
refresh_cache = True | |
if refresh_cache: | |
all = jss.get_all_computers() | |
# print(jss.get_computer(name=all['computers'][0]['name'])) | |
computers = [jss.get_computer(name=x['name'])['computer']['general']['ip_address'] for x in all['computers']] | |
ret = {'all': computers, | |
'_meta': {'hostvars': {}}} | |
for group in jss.get_all_computergroups()['computer_groups']: | |
group = jss.get_computergroup(id=group['id']) | |
name = group['computer_group']['name'].replace(' ', '_') | |
ret[name] = [] | |
for computer in group['computer_group']['computers']: | |
ret[name].append(jss.get_computer(name=computer['name'])['computer']['general']['ip_address']) | |
with open(CACHE_PATH, 'w') as cache: | |
cache.write(json.dumps(ret)) | |
with open(CACHE_PATH, 'r') as cache: | |
ret = json.loads(cache.read()) | |
config = read_ansible_config(join(dirname(mypath), | |
'private/ansible.conf'), | |
'inventory') | |
for computer in ret['all']: | |
ret['_meta']['hostvars'][computer] = {'ansible_ssh_pass': config[0], | |
'ansible_become_pass': config[1]} | |
print(json.dumps(ret)) | |
def read_ansible_config(path, section): | |
config = configparser.ConfigParser() | |
config.read(path) | |
return (config.get(section, 'ansible_ssh_pass'), | |
config.get(section, 'ansible_become_pass')) | |
def read_jss_config(path, section): | |
'''Read the jss config and return a dictionary containing the | |
parsed settings | |
:path - Full path to the config file | |
:section - Section name that contains the JSS info | |
''' | |
config = configparser.ConfigParser() | |
config.read(path) | |
return {'url': config.get(section, 'URL'), | |
'api': (config.get(section, 'username'), | |
config.get(section, 'password')), | |
'repo': (config.get(section, 'repo_rw_username'), | |
config.get(section, 'repo_rw_password'), | |
config.get(section, 'repo_name'), | |
config.get(section, 'repo_mount_point'))} | |
if __name__ == '__main__': | |
if not os.path.exists(CACHE_PATH): | |
os.makedirs(os.path.dirname(CACHE_PATH), exist_ok=True) | |
open(CACHE_PATH, 'a').close() | |
os.utime(CACHE_PATH, | |
(os.stat(CACHE_PATH).st_atime, | |
os.stat(CACHE_PATH).st_mtime - REFRESH_THRESHOLD)) | |
PARSER = argparse.ArgumentParser() | |
PARSER.add_argument('--host', action='store') | |
PARSER.add_argument('--list', action='store_true') | |
PARSER.add_argument('--refresh-cache', action='store_true') | |
main(PARSER.parse_args()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment