Skip to content

Instantly share code, notes, and snippets.

@markoheijnen
Created July 19, 2024 16:12
Show Gist options
  • Save markoheijnen/c7aca7c6b95cbf38566fc2f40e7a01a9 to your computer and use it in GitHub Desktop.
Save markoheijnen/c7aca7c6b95cbf38566fc2f40e7a01a9 to your computer and use it in GitHub Desktop.
Sync Patch Feed to Title Editor
import json
import time
from asn1crypto import cms, pem, x509
import requests
from requests.auth import HTTPBasicAuth
class PatchFeedSync:
items = []
token = None
expiration: int = -1
software_id_dict = None
software_cache = {}
def __init__(self, title_editor_url, title_editor_username, title_editor_password):
self.title_editor_url = title_editor_url
self.title_editor_username = title_editor_username
self.title_editor_password = title_editor_password
def add_item(self, item_id, function=None):
self.items.append({
'id': item_id,
'function': function
})
def run(self):
software_ids = self.get_software_ids()
for item in self.items:
data = self.get_patch_data(item['id'])
if 'function' in item:
data = item['function'](data)
if data['id'] in software_ids:
self.make_api_call(f'softwaretitles/{software_ids[data["id"]]}', 'DELETE')
data['enabled'] = True
self.make_api_call(f'softwaretitles', 'POST', data)
def get_patch_data(self, title_id):
response = requests.get('https://jamf-patch.jamfcloud.com/v1/patch/' + title_id)
return self.verify_pkcs7_signature(response.content)
def verify_pkcs7_signature(self, response_content):
# If the content is PEM encoded, convert it to DER
if pem.detect(response_content):
_, _, response_content = pem.unarmor(response_content)
# Load the PKCS7 data
pkcs7_data = cms.ContentInfo.load(response_content)
# Extract the SignedData
signed_data = pkcs7_data['content']
# Extract the data
content = signed_data['encap_content_info']['content'].native
# Decode and return the JSON data
return json.loads(content.decode())
def token_time_remaining(self):
current_time = int(time.time())
time_remaining = self.expiration - current_time
return time_remaining
def refresh_token(self):
time_remaining = self.token_time_remaining()
if time_remaining <= 10:
headers = {'Accept': 'application/json'}
response = requests.post(
f"{self.title_editor_url}/v2/auth/tokens",
headers=headers,
auth=HTTPBasicAuth(self.title_editor_username, self.title_editor_password)
)
token_info = json.loads(response.text)
elif time_remaining <= 60:
token_info = self._make_api_call('auth/keepalive', 'POST')
else:
return
if 'status' in token_info and token_info['status'] != 200:
raise Exception('Invalid token')
self.token = token_info["token"]
self.expiration = token_info["expires"]
def make_api_call(self, endpoint, method="GET", data=None):
self.refresh_token()
return self._make_api_call(endpoint, method, data)
def _make_api_call(self, endpoint, method="GET", data=None):
response = requests.request(
method,
f"{self.title_editor_url}/v2/{endpoint}",
headers={
'Accept': 'application/json',
'Authorization': f"Bearer {self.token}"
},
json=data
)
if not response.ok:
raise Exception("request failed")
if method == 'DELETE':
return
return json.loads(response.text)
def get_software_ids(self):
if self.software_id_dict is None:
# Returns a dict associating the recipe key for a title to its App Catalog database ID
response = self.make_api_call('softwaretitles')
software_dict = {}
for item in response:
software_dict.update({item["id"]: item["softwareTitleId"]})
self.software_id_dict = software_dict
return self.software_id_dict
sync = PatchFeedSync('changeme', 'changeme', 'changeme')
sync.add_item('220', lambda item: {**item, 'name': 'New Name'})
sync.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment