Last active
December 3, 2018 12:40
-
-
Save vijayanandrp/5bf32b34a71da4a0e19f880a7ae1ce69 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# *-* coding: utf-8 *-* | |
import requests | |
try: | |
from pymongo import MongoClient | |
except ImportError: | |
raise ImportError('PyMongo is not installed') | |
try: | |
from bs4 import BeautifulSoup | |
except Exception as err: | |
raise ImportError('Bs4 is not imported correctly. - {}'.format(err)) | |
class MongoDB(object): | |
def __init__(self, host='localhost', port=27017, database_name=None, | |
collection_name=None, drop_n_create=False): | |
try: | |
self._connection = MongoClient(host=host, port=port, maxPoolSize=200) | |
except Exception as error: | |
raise Exception(error) | |
if drop_n_create: | |
self.drop_db(database_name) | |
self._database = None | |
self._collection = None | |
if database_name: | |
self._database = self._connection[database_name] | |
if collection_name: | |
self._collection = self._database[collection_name] | |
@staticmethod | |
def check_state(obj): | |
if not obj: | |
return False | |
else: | |
return True | |
def check_db(self): | |
if not self.check_state(self._database): | |
# validate the database name | |
raise ValueError('Database is empty/not created') | |
def check_collection(self): | |
# validate the collection name | |
if not self.check_state(self._collection): | |
raise ValueError('Collection is empty/not created') | |
def get_overall_details(self): | |
# get overall connection information | |
client = self._connection | |
details = dict((db, [collection for collection in client[db].collection_names()]) | |
for db in client.database_names()) | |
return details | |
def insert(self, post): | |
# add/append/new single record | |
self.check_collection() | |
post_id = self._collection.insert_one(post).inserted_id | |
return post_id | |
def insert_many(self, posts): | |
# add/append/new multiple records | |
self.check_collection() | |
result = self._collection.insert_many(posts) | |
return result.inserted_ids | |
if __name__ == '__main__': | |
url = 'http://climatedataapi.worldbank.org/climateweb/rest/v1/country/cru/tas/year/CAN.csv' | |
response = requests.get(url) | |
data = response.text | |
if response.status_code != 200: | |
print('Failed to get data:', response.status_code) | |
else: | |
print('First 100 characters of data are') | |
print(data[:100]) | |
print('[*] Parsing response text') | |
data = data.split('\n') | |
data_list = list() | |
for value in data: | |
if 'year,data' not in value: | |
if value: | |
value = value.split(',') | |
data_list.append({'year': int(value[0]), 'data': float(value[1])}) | |
print(data_list) | |
print('[*] Pushing data to MongoDB ') | |
mongo_db = MongoDB(database_name='Climate_DB', collection_name='climate_data') | |
for collection in data_list: | |
print('[!] Inserting - ', collection) | |
mongo_db.insert(collection) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment