Created
July 19, 2024 16:12
-
-
Save markoheijnen/c7aca7c6b95cbf38566fc2f40e7a01a9 to your computer and use it in GitHub Desktop.
Sync Patch Feed to Title Editor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
asn1crypto |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import time | |
from asn1crypto import cms, pem, x509 | |
import requests | |
from requests.auth import HTTPBasicAuth | |
class PatchFeedSync: | |
items = [] | |
token = None | |
expiration: int = -1 | |
software_id_dict = None | |
software_cache = {} | |
def __init__(self, title_editor_url, title_editor_username, title_editor_password): | |
self.title_editor_url = title_editor_url | |
self.title_editor_username = title_editor_username | |
self.title_editor_password = title_editor_password | |
def add_item(self, item_id, function=None): | |
self.items.append({ | |
'id': item_id, | |
'function': function | |
}) | |
def run(self): | |
software_ids = self.get_software_ids() | |
for item in self.items: | |
data = self.get_patch_data(item['id']) | |
if 'function' in item: | |
data = item['function'](data) | |
if data['id'] in software_ids: | |
self.make_api_call(f'softwaretitles/{software_ids[data["id"]]}', 'DELETE') | |
data['enabled'] = True | |
self.make_api_call(f'softwaretitles', 'POST', data) | |
def get_patch_data(self, title_id): | |
response = requests.get('https://jamf-patch.jamfcloud.com/v1/patch/' + title_id) | |
return self.verify_pkcs7_signature(response.content) | |
def verify_pkcs7_signature(self, response_content): | |
# If the content is PEM encoded, convert it to DER | |
if pem.detect(response_content): | |
_, _, response_content = pem.unarmor(response_content) | |
# Load the PKCS7 data | |
pkcs7_data = cms.ContentInfo.load(response_content) | |
# Extract the SignedData | |
signed_data = pkcs7_data['content'] | |
# Extract the data | |
content = signed_data['encap_content_info']['content'].native | |
# Decode and return the JSON data | |
return json.loads(content.decode()) | |
def token_time_remaining(self): | |
current_time = int(time.time()) | |
time_remaining = self.expiration - current_time | |
return time_remaining | |
def refresh_token(self): | |
time_remaining = self.token_time_remaining() | |
if time_remaining <= 10: | |
headers = {'Accept': 'application/json'} | |
response = requests.post( | |
f"{self.title_editor_url}/v2/auth/tokens", | |
headers=headers, | |
auth=HTTPBasicAuth(self.title_editor_username, self.title_editor_password) | |
) | |
token_info = json.loads(response.text) | |
elif time_remaining <= 60: | |
token_info = self._make_api_call('auth/keepalive', 'POST') | |
else: | |
return | |
if 'status' in token_info and token_info['status'] != 200: | |
raise Exception('Invalid token') | |
self.token = token_info["token"] | |
self.expiration = token_info["expires"] | |
def make_api_call(self, endpoint, method="GET", data=None): | |
self.refresh_token() | |
return self._make_api_call(endpoint, method, data) | |
def _make_api_call(self, endpoint, method="GET", data=None): | |
response = requests.request( | |
method, | |
f"{self.title_editor_url}/v2/{endpoint}", | |
headers={ | |
'Accept': 'application/json', | |
'Authorization': f"Bearer {self.token}" | |
}, | |
json=data | |
) | |
if not response.ok: | |
raise Exception("request failed") | |
if method == 'DELETE': | |
return | |
return json.loads(response.text) | |
def get_software_ids(self): | |
if self.software_id_dict is None: | |
# Returns a dict associating the recipe key for a title to its App Catalog database ID | |
response = self.make_api_call('softwaretitles') | |
software_dict = {} | |
for item in response: | |
software_dict.update({item["id"]: item["softwareTitleId"]}) | |
self.software_id_dict = software_dict | |
return self.software_id_dict | |
sync = PatchFeedSync('changeme', 'changeme', 'changeme') | |
sync.add_item('220', lambda item: {**item, 'name': 'New Name'}) | |
sync.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment