Skip to content

Instantly share code, notes, and snippets.

@sakethramanujam
Last active September 12, 2019 10:40
Show Gist options
  • Save sakethramanujam/ba5cfc201b1770cbe9681cb66fa49b3f to your computer and use it in GitHub Desktop.
Save sakethramanujam/ba5cfc201b1770cbe9681cb66fa49b3f to your computer and use it in GitHub Desktop.
Adding Launch Notifications to Collection
import asyncio
import json
import urllib.request
from pymongo import MongoClient
def connect(database_name: str, collection_name: str) -> object:
try:
conn = MongoClient()
db = conn.snapi
collection = db.launchnotifications
return collection
except Exception as e:
print(e)
def get_articles(api_url: str):
response = urllib.request.urlopen(api_url)
response = response.read().decode()
response = json.loads(response)
articles = response['results']
return articles
def _generate_doc(article: dict) -> dict:
article = dict(article)
newArticle = {}
newArticle['id'] = article['id']
newArticle['name'] = article['name']
newArticle['url'] = article['url']
newArticle['status'] = article['status']
newArticle['img_url'] = article['img_url'] or article['image_url']
newArticle['net'] = article['net']
newArticle['launch_service_provider'] = article['rocket']['configuration']['launch_service_provider']
# newArticle['description'] = article['mission']['description']
# newArticle['agency'] = article['rocket']['spacecraft_stage']['spacecraft']['configuration']['name']
newArticle['pad'] = article['pad']['name']
newArticle['pad_url'] = article['pad']['map_url']
return newArticle
async def update_db(collection: object, article: dict):
id = article['id']
collection = MongoClient().snapi.launchnotifications
try:
obj_id = collection.find_one_and_update(filter={'id': id},
update={"$set": article},
upsert=True)
except Exception:
print(f"Failed to add article {id}")
async def add_articles_to_db():
collection = connect('snapi', 'launchnotifications')
api_articles = get_articles('https://spacelaunchnow.me/api/3.3.0/launch/upcoming?limit=5')
articles = [_generate_doc(article) for article in api_articles]
for article in articles:
await update_db(collection, article)
async def main():
print("main!")
# add_articles_to_db()
await asyncio.gather(add_articles_to_db())
if __name__ == "__main__":
# main()
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment