Created
December 27, 2022 17:58
-
-
Save suhailpatel/90b3d366634e0f5a4f97e8ee13166392 to your computer and use it in GitHub Desktop.
A CLI to interact with the O'Reilly Live Events site
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import cmd, json, sys, traceback | |
from collections import defaultdict | |
from dataclasses import dataclass | |
from typing import List, Dict | |
import requests | |
# This is part of Suhail's talk on the Infrastructure and Ops Superstream | |
# track for O'Reilly | |
class OReillyShell(cmd.Cmd): | |
prompt = "\nO'Reilly Shell 🌍 > " | |
def __init__(self): | |
super(OReillyShell, self).__init__() | |
self.postcmd(None, None) | |
def postcmd(self, stop, line): | |
if self.lastcmd == "exit": | |
return True | |
def do_events(self, args): | |
for event in fetch_events(): | |
contributors = ' & '.join(event.contributors) | |
topics = ', '.join(event.topics) | |
print(f"{event.title} by {contributors} (topics: {topics})") | |
def do_contributors(self, args): | |
for contributor, events in sorted(fetch_events_by_contributor(), key=lambda x: x[0]): | |
print(f"{contributor}: ({len(events)} events)") | |
for event in events: | |
contributors = ' & '.join(event.contributors) | |
topics = ', '.join(event.topics) | |
print(f" - {event.title} by {contributors} (topics: {topics})") | |
def do_topics(self, args): | |
for topic, events in sorted(fetch_events_by_topic(), key=lambda x: x[0]): | |
print(f"{topic}: ({len(events)} events)") | |
for event in events: | |
contributors = ' & '.join(event.contributors) | |
topics = ', '.join(event.topics) | |
print(f" - {event.title} by {contributors} (topics: {topics})") | |
def do_exit(self, _): | |
raise KeyboardInterrupt() | |
@dataclass | |
class LiveEvent: | |
title: str | |
contributors: List[str] | |
topics: List[str] | |
def fetch_events() -> List[LiveEvent]: | |
query = """query LiveEvents { | |
liveEvents(limit: 10000, onlyPrivate: false, excludeClosed: true) { | |
count | |
previousOffset | |
nextOffset | |
results { | |
productIdentifier | |
seriesIdentifier | |
ourn | |
title | |
slug | |
coverImageUrl | |
openGraphImageUrl | |
registrationCloses | |
eventType | |
contributors { | |
fullName | |
} | |
sessions { | |
ourn | |
startTime | |
endTime | |
} | |
topics { | |
slug | |
name | |
} | |
registrationInfo { | |
openSeats | |
userRegistrationStatus | |
cancelled | |
registrationIsOpen | |
waitlistStarted | |
} | |
} | |
} | |
} | |
""" | |
headers = {"Content-Type": "application/json"} | |
data = json.dumps({"query": query}) | |
req = requests.post("https://www.oreilly.com/api/v1/attend/live-events/graphql", headers=headers, data=data) | |
output = [] | |
for result in req.json()["data"]["liveEvents"]["results"]: | |
output.append(LiveEvent( | |
title=result["title"], | |
contributors=[contributor["fullName"] for contributor in result["contributors"]], | |
topics=[topic["name"] for topic in result["topics"]], | |
)) | |
return sorted(output, key=lambda x: x.title) | |
def fetch_events_by_contributor() -> Dict[str, List[LiveEvent]]: | |
output = defaultdict(list) | |
for event in fetch_events(): | |
for contributor in event.contributors: | |
output[contributor].append(event) | |
return output.items() | |
def fetch_events_by_topic() -> Dict[str, List[LiveEvent]]: | |
output = defaultdict(list) | |
for event in fetch_events(): | |
for topic in event.topics: | |
output[topic].append(event) | |
return output.items() | |
if __name__ == '__main__': | |
print(f"Welcome to the O'Reilly Infrastructure and Ops Superstream: Platform Engineering") | |
shell = OReillyShell() | |
while True: | |
try: | |
shell.cmdloop() | |
except KeyboardInterrupt: | |
sys.exit() | |
except: | |
traceback.print_exc() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment