Created
November 20, 2018 19:22
-
-
Save sharoonthomas/dc6ada7b88835384e9ea53af8efcb985 to your computer and use it in GitHub Desktop.
Utilize Fulfil API to download complete Activity Stream Data history
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import pandas as pd | |
from pandas.io.json import json_normalize | |
import numpy as np | |
import os | |
import json | |
from google.cloud import storage, bigquery | |
import datetime | |
import time | |
from multiprocessing.pool import ThreadPool | |
def chunks(l, n): | |
"""Yield successive n-sized chunks from l.""" | |
for i in range(0, len(l), n): | |
yield l[i:i + n] | |
def get_url(url): | |
"""Get response JSON from FulFil with Headers""" | |
try: | |
start_time = time.time() | |
headers = {'Accept':'application/json', | |
'Authorization':'Bearer bot-AUTHORIZATIONCODE', | |
'Host': 'braceability.fulfil.io'} | |
response = requests.get(url,headers=headers,timeout=15) | |
response_json = response.json() | |
print(url,'retrieved in',time.time()-start_time,'seconds') | |
except Exception as ex: | |
print(ex) | |
return {} | |
return response_json | |
def multi_thread_fulfil_object_csvs_to_bucket(response_ids_df, | |
endpoint='stock.shipment.in', | |
filename_base='fulfil_supplier_shipments_all_', | |
bucketfolder = 'fulfil/shipments/in/', | |
column_subset_list = [], | |
upload_csvs = False): | |
"""Download full JSON objects 1-by-1 from Fulfil API using 8 threads. | |
If upload_csvs == True, upload the CSVs to Google Cloud Storage in chunks of 100 at a time.""" | |
start_time = time.time() | |
response_ids = response_ids_df['id'].tolist() | |
urls = ['https://braceability.fulfil.io/api/v2/model/'+ endpoint+'/' + str(pid) for pid in response_ids] | |
n_chunks = int(np.ceil(len(urls)/ (len(urls) / 100) )) | |
url_chunks = chunks(urls, n_chunks) | |
for chunk in url_chunks: | |
response_records = [] | |
pool = ThreadPool(8) | |
response_generator = pool.imap_unordered(get_url, chunk) | |
for response_record in response_generator: | |
response_records.append(response_record) | |
pool.close() | |
pool.join() | |
if len(response_records) >= 90 or len(response_records) == len(response_ids): | |
response_records_df = json_normalize(response_records,sep='_') | |
if column_subset_list: | |
response_records_columns_set = set(list(response_records_df.columns)) | |
missing_columns = list(set(column_subset_list) - response_records_columns_set) | |
for column in missing_columns: | |
response_records_df[column] = '' | |
response_records_df = response_records_df[column_subset_list] | |
if upload_csvs: | |
now_string = datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S') | |
file_name = filename_base + now_string + '.csv' | |
bucket_name = 'BUCKETNAME' | |
bucket_folder = bucketfolder | |
dataset_id = 'fulfil' | |
response_records_df.to_csv(file_name,index=False) | |
upload_csv(bucket_name,bucket_folder,file_name) | |
os.remove(file_name) | |
response_records = [] | |
else: | |
print("Not uploading ", len(response_records), endpoint,"response records") | |
print(response_records_df.columns) | |
return response_records_df | |
print(time.time() - start_time, "seconds for entire process to complete") | |
return True | |
def pull_fulfil_object_ids(endpoint='stock.shipment.in', | |
filename='fulfil_supplier_shipment_in_ids.csv', | |
bucketfolder = 'fulfil/shipments/in/ids/'): | |
"""Pull 10,000 Object IDs at a time from Fulfil API. | |
Save the comp[lete list of Object IDs to the Google Cloud Storage bucketfolder as a CSV.""" | |
response_status_code = 200 | |
i = 1 | |
responses = [] | |
response_json = ['starting'] | |
while response_status_code == 200 and len(response_json) > 0: | |
try: | |
url = 'https://braceability.fulfil.io/api/v2/model/'+endpoint+'?page={0}&per_page=10000'.format(i) | |
headers = {'Accept':'application/json', | |
'Authorization':'Bearer bot-AUTHORIZATIONCODE', | |
'Host': 'braceability.fulfil.io'} | |
response = requests.get(url,headers=headers) | |
response_json = response.json() | |
response_status_code = response.status_code | |
responses.extend(response_json) | |
if i % 4 == 0 and i != 0: | |
print(i * 10000, endpoint, "records retrieved") | |
i += 1 | |
except Exception as ex: | |
print(ex) | |
time.sleep(15) | |
try: | |
url = 'https://braceability.fulfil.io/api/v2/model/'+endpoint+'?page={0}&per_page=10000'.format(i) | |
headers = {'Accept':'application/json', | |
'Authorization':'Bearer bot-AUTHORIZATIONCODE', | |
'Host': 'braceability.fulfil.io'} | |
response = requests.get(url,headers=headers) | |
response_json = response.json() | |
response_status_code = response.status_code | |
responses.extend(response_json) | |
if i % 4 == 0 and i != 0: | |
print(i * 10000, endpoint, "records retrieved") | |
i += 1 | |
except Exception as ex1: | |
print(ex1) | |
response_status_code = 500 | |
continue | |
response_ids_df = pd.DataFrame(responses) | |
dataset_id = 'fulfil' | |
bucket_name = 'BUCKETNAME' | |
bucket_folder = bucketfolder | |
file_name = filename | |
upload_csv_from_df(bucket_name,bucket_folder,file_name,response_ids_df) | |
print(len(responses),"records uploaded to Google Cloud Bucket Folder", bucketfolder) | |
return response_ids_df | |
#### START ACTIVITY STREAM CODE #### | |
response_ids_df = pull_fulfil_object_ids(endpoint='ir.activity', | |
filename='fulfil_activity_stream_ids.csv', | |
bucketfolder = 'fulfil/journal_entries/ids/') | |
column_list = [ | |
'actor_display_string', | |
'actor_email', | |
'actor_id', | |
'application', | |
'create_date_iso_string', | |
'create_uid', | |
'id', | |
'object__display_string', | |
'object__id', | |
'object__model', | |
'rec_blurb_title', | |
'rec_name', | |
'target_display_string', | |
'target_id', | |
'target_model', | |
'title', | |
'type', | |
'web_user', | |
'write_date', | |
'write_uid'] | |
activity_stream_df = multi_thread_fulfil_object_csvs_to_bucket(response_ids_df, | |
endpoint='ir.activity', | |
filename_base='fulfil_activity_stream_all_', | |
bucketfolder = 'fulfil/activity_stream/', | |
column_subset_list = column_list, | |
upload_csvs = True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment