Last active
September 25, 2018 05:34
-
-
Save gabefair/26139ba91c13b5018940c6a0ce5a330d to your computer and use it in GitHub Desktop.
Json to Mongo file importer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# pylint: disable=unsubscriptable-object | |
import sys, os, json, datetime, platform, argparse, traceback, mechanize | |
from time import time | |
try: | |
import mechanize | |
except ImportError: | |
print("Please run `pip install mechanize` from a command line") | |
exit() | |
try: | |
import pymongo | |
except ImportError: | |
print("Please run `pip install pymongo` from a command line ") | |
exit() | |
try: | |
import scrape_posts | |
import scrape_comments | |
except ImportError: | |
print("Please run in the same directory as scrape_posts and scrape_comments") | |
exit() | |
try: | |
import dateutil.relativedelta | |
except ImportError: | |
print("Please run `pip install python-dateutil` from a command line ") | |
exit() | |
# from bson import json_util | |
from os.path import basename | |
try: | |
from exceptions import WindowsError | |
import curses | |
except ImportError: | |
class WindowsError(OSError): | |
pass | |
except: | |
pass | |
# Global Variables | |
total_imported = 0 | |
any_errors = False | |
errors_array = [] | |
error_ids = [] | |
skipped_files = 0 | |
response = 0 | |
def update_progress(total_imported, skipped_files, current_location): | |
text = str(total_imported) + "/" + str(total_imported + skipped_files) + " imported. " + str( | |
len(error_ids)) + " errors. Looking in folder: " + current_location | |
sys.stdout.write('\r' + text) | |
sys.stdout.flush() | |
def report_last_file(full_path, response): | |
global total_imported | |
print("\nLast file imported: " + str(total_imported) + " At: " + full_path + " with mongodb response: " + str( | |
response)) | |
return | |
def import_files(run_location, import_type, folder_path, database, destination_coll, connection_url): | |
global total_imported, any_errors, error_ids, errors_array, skipped_files, response | |
total_imported = 0 | |
any_errors = False | |
errors_array = [] | |
error_ids = [] | |
skipped_files = 0 | |
client = pymongo.MongoClient("mongodb://"+connection_url+":27017") | |
db = client[database] | |
collection_conn = db[destination_coll] | |
for root, dirs, files in os.walk(folder_path): | |
full_base = os.path.join(run_location, root) # type: str | |
if import_type in root: | |
try: | |
update_progress(total_imported, skipped_files, root) | |
for name in files: | |
if name.endswith((".json")) and not name == "auth.json": | |
full_path = os.path.join(full_base, name) | |
timestamp_of_json_collected = os.path.getmtime( | |
full_path) # Date created is not reliable as that could be the date it was copied to a folder, getting modified date instead | |
collected_on = datetime.datetime.fromtimestamp(timestamp_of_json_collected) | |
with open(full_path) as json_file: | |
json_data = json.load(json_file) | |
document = {} | |
document['id'] = os.path.splitext(name)[0] | |
document['collected_on'] = collected_on.strftime('%Y-%m-%d %H:%M:%S') | |
document['data'] = json_data | |
document['type'] = import_type | |
try: | |
response = collection_conn.insert_one(document) | |
total_imported += 1 | |
except pymongo.errors.DuplicateKeyError: | |
skipped_files += 1 | |
pass | |
del json_data, json_file | |
except KeyboardInterrupt: | |
report_last_file(full_path, response) | |
save_error_ids(len(error_ids), run_location, import_type) | |
try: | |
sys.exit(0) | |
except SystemExit: | |
os._exit(0) | |
exit() | |
except ValueError: | |
any_errors = True | |
error_msg = "Broken File: " + full_path | |
error_ids.append(basename(name)) | |
errors_array.append(error_msg) | |
pass | |
except WindowsError: | |
raise | |
except: | |
report_last_file(full_path, response) | |
save_error_ids(len(error_ids), run_location, import_type) | |
raise | |
update_progress(total_imported, skipped_files, root) | |
def login(username="", password=""): | |
if not len(username) or not len(password): | |
auth_data = json.load(open("auth.json")) | |
try: | |
username = auth_data["username"] | |
except: | |
print "No username specified." | |
return | |
try: | |
password = auth_data["password"] | |
except: | |
print "No password specified." | |
return | |
browser = mechanize.Browser() | |
browser.set_handle_robots(False) | |
browser.set_handle_refresh(False) | |
browser.addheaders = [("User-agent", | |
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36")] | |
r = browser.open("https://gab.ai/auth/login") | |
browser.select_form(nr=1) | |
browser["username"] = username | |
browser["password"] = password | |
r = browser.submit() | |
return browser | |
def start_scrape(jsons, import_type): | |
browser = login() | |
if browser is not None: | |
if (import_type == "users"): | |
process_users(browser, jsons) | |
else: | |
process_submissions(browser, jsons, import_type) | |
else: | |
print "Failed login." | |
def process_submissions(browser, jsons, import_type): | |
""" Scrapes the specified posts. """ | |
fail = 0 | |
j = 0 | |
k = 0 | |
for i in jsons: | |
# Check if the post already exists. | |
num = str(i) | |
ones = num[-1] | |
tens = num[-2:] | |
hundreds = num[-3:] | |
# Make directory structure if necessary. | |
error_dir = import_type + "_errors" | |
if not os.path.exists(error_dir): | |
os.makedirs(error_dir) | |
if not os.path.exists(error_dir + "/" + ones): | |
os.makedirs(error_dir + "/" + ones) | |
if not os.path.exists(error_dir + "/" + ones + "/" + tens): | |
os.makedirs(error_dir + "/" + ones + "/" + tens) | |
if not os.path.exists(error_dir + "/" + ones + "/" + tens + "/" + hundreds): | |
os.makedirs(error_dir + "/" + ones + "/" + tens + "/" + hundreds) | |
# Read the post | |
try: | |
if import_type == 'posts': | |
r = browser.open("https://gab.ai/posts/" + str(i)) | |
elif import_type == 'comments': | |
# r = browser.open("https://gab.ai/posts/" + str(i) + "/comments?sort=score") #old version | |
r = browser.open("https://gab.ai/posts/" + str(i) + "/comments/index?sort=score") | |
else: | |
print("Unexpected scrape type. Must be posts or comments") | |
exit() | |
data = r.read() | |
with open(error_dir + "/" + ones + "/" + tens + "/" + hundreds + "/" + str(i) + ".json", "w") as f: | |
f.write(data) | |
except mechanize.HTTPError as error_data: | |
if isinstance(error_data.code, int) and error_data.code == 429: | |
print "ALERT TOO MANY REQUESTS SHUT DOWN" | |
print i | |
sys.exit(-1) | |
return | |
elif isinstance(error_data.code, int) and error_data.code == 404: | |
# print "Gab post deleted or ID not allocated" | |
# print i | |
fail = fail + 1 | |
elif isinstance(error_data.code, int) and error_data.code == 400: | |
# print "Invalid request -- possibly a private Gab post?" | |
# print i | |
fail = fail + 1 | |
else: | |
print error_data.code | |
print traceback.format_exc() | |
print "ERROR: DID NOT WORK" | |
print i | |
except: | |
print traceback.format_exc() | |
print "ERROR: STILL DID NOT WORK" | |
print i | |
def process_users(browser, user_names): | |
""" Scrapes the specified posts. """ | |
global error_ids | |
j = 0 | |
k = 0 | |
total_imported = 0 | |
skipped_files = 0 | |
total_users = len(user_names) | |
for i in user_names: | |
high_intensity_user = 0 | |
# Check if the post already exists. | |
prefix = i[0:2].lower() | |
if os.path.isfile("users/" + prefix + "/" + i + ".json"): | |
print "Already have user " + i + ". Skipping." | |
skipped_files += 1 | |
continue | |
# Make directory structure if necessary. | |
if not os.path.exists("users"): | |
os.makedirs("users") | |
if not os.path.exists("users/" + prefix): | |
os.makedirs("users/" + prefix) | |
# Read the user | |
try: | |
# print str(i), "user page" | |
r = browser.open("https://gab.ai/users/" + str(i)) | |
user_data = json.loads(r.read()) | |
r = browser.open("https://gab.ai/users/" + str(i) + "/followers") | |
# print str(i), "follower page" | |
follower_data = json.loads(r.read()) | |
if not follower_data["no-more"]: | |
page = 1 | |
done = 0 | |
while not done and page < 1500: | |
min_back = page * 30 | |
r = browser.open("https://gab.ai/users/" + str(i) + "/followers?before=" + str(min_back)) | |
page = page + 1 | |
follower_page = json.loads(r.read()) | |
if follower_page["no-more"]: | |
done = 1 | |
follower_data["data"].extend(follower_page["data"]) | |
if page % 10 == 1: | |
# print str(i), "follower page", str(page) | |
time.sleep(3) | |
high_intensity_user = 1 | |
else: | |
time.sleep(0.1) | |
r = browser.open("https://gab.ai/users/" + str(i) + "/following") | |
# print str(i), "following page" | |
following_data = json.loads(r.read()) | |
if i == "aux": | |
continue | |
if not following_data["no-more"]: | |
page = 1 | |
done = 0 | |
while not done and page < 1500: | |
min_back = page * 30 | |
r = browser.open("https://gab.ai/users/" + str(i) + "/following?before=" + str(min_back)) | |
page = page + 1 | |
following_page = json.loads(r.read()) | |
if following_page["no-more"]: | |
done = 1 | |
following_data["data"].extend(following_page["data"]) | |
if page % 10 == 1: | |
# print str(i), "following page", str(page) | |
time.sleep(3) | |
high_intensity_user = 1 | |
else: | |
time.sleep(0.1) | |
data = {"user": user_data, "followers": follower_data, "following": following_data} | |
with open("users/" + prefix + "/" + str(i) + ".json", "w") as f: | |
json.dump(data, f) | |
# print data | |
# print i | |
# print "" | |
total_imported += 1 | |
# Error handling. | |
except mechanize.HTTPError as error_code: | |
if isinstance(error_code.code, int) and error_code.code == 429: | |
print "TOO MANY REQUESTS. SHUT DOWN." | |
print i | |
sys.exit(-1) | |
return | |
# elif isinstance(error_code.code, int) and error_code.code == 404: | |
# print "Gab post deleted or ID not allocated" | |
# print i | |
# elif isinstance(error_code.code, int) and error_code.code == 400: | |
# print "Invalid request -- possibly a private Gab post?" | |
# print i | |
else: | |
print error_code.code | |
print traceback.format_exc() | |
print "ERROR: DID NOT WORK" | |
time.sleep(random.randint(1, 10)) | |
error_ids.append(i) | |
print i | |
except: | |
print traceback.format_exc() | |
print "ERROR: STILL DID NOT WORK" | |
print i | |
# Pausing between jobs. | |
pause_timer = random.randint(1, 100) | |
if pause_timer >= 99: | |
# print "Waiting..." | |
time.sleep(random.randint(1, 3)) | |
k = k + 1 | |
j = j + 1 | |
if k >= 15000: | |
# print "Long break." | |
time.sleep(random.randint(1, 10)) | |
k = 0 | |
if high_intensity_user: | |
print "Tough job, time to take a break." | |
time.sleep(random.randint(20, 30)) | |
update_progress(total_imported, total_users, i) | |
if (any_errors): | |
save_error_ids(str(len(error_ids)), base_path, import_type) | |
def redownload_jsons(error_ids, import_type): | |
print("Redownloading " + import_type + " that had an error") | |
jsons = [] | |
for json_id in error_ids: | |
jsons.append(json_id[:-5]) | |
if import_type == "posts" or import_type == "post": | |
start_scrape(jsons, "posts") | |
elif import_type == "comments" or import_type == "comment": | |
start_scrape(jsons, "comments") | |
elif import_type == "users" or import_type == "user": | |
start_scrape(jsons, "users") | |
print "Reimporting now" | |
error_dir = import_type + "_errors" | |
base_path = os.path.split(os.path.abspath(os.path.realpath(sys.argv[0])))[0] | |
import_files(base_path, import_type, error_dir, sys.argv[3], sys.argv[4], sys.argv[5]) | |
def save_error_ids(error_len, base_path, import_type): | |
global error_ids | |
save_loc = os.path.join(base_path, sys.argv[1] + '_import_errors.txt') | |
print("But there were " + str( | |
error_len) + " errors \n A file has been created at " + save_loc + " with all the ids of the " + import_type + " that were not collected correctly") | |
f = open(save_loc, 'w') | |
f.write(str(error_ids)) | |
f.close() | |
redownload_jsons(error_ids, import_type) | |
def find_os(): | |
return platform.system() | |
def rename_folder(input_folder): | |
full_address = os.path.realpath(input_folder) | |
new_folder_name = input_folder + "_v2_import_complete" | |
os.rename(input_folder, new_folder_name) | |
print("Renamed: " + str(input_folder) + " to: " + new_folder_name) | |
def fix_import_type(import_type): | |
if import_type == "posts" or import_type == "post": | |
import_type = "posts" | |
elif import_type == "comments" or import_type == "comment": | |
import_type = "comments" | |
elif import_type == "users" or import_type == "user": | |
import_type = "users" | |
else: | |
print("Can only import posts, users, or comments") | |
exit() | |
print("This script will skip the import of any json files that are not " + import_type) | |
return import_type | |
def main(): | |
arg_length = len(sys.argv) | |
if (arg_length < 6): | |
print( | |
"Please specify at least 5 parameters: \n\t\t v2_import.py <posts|comments|users> <folder path> <mongodb database> <mongodb collection name> <connection_url> <Don't rename folder>") | |
exit() | |
start_time = datetime.datetime.fromtimestamp(time()) | |
print("Starting Import with the following parameters: " + str(sys.argv)) | |
base_path = os.path.split(os.path.abspath(os.path.realpath(sys.argv[0])))[0] | |
import_type = fix_import_type(sys.argv[1]) | |
import_files(base_path, import_type, sys.argv[2], sys.argv[3], sys.argv[4], sys.argv[5]) | |
end_time = datetime.datetime.fromtimestamp(time()) | |
total_time = dateutil.relativedelta.relativedelta(end_time, start_time) | |
print "\nImport completed in: %d days, %d hours, %d minutes and %d seconds" % ( | |
total_time.days, total_time.hours, total_time.minutes, total_time.seconds) | |
if (any_errors): | |
save_error_ids(str(len(error_ids)), base_path, import_type) | |
if(arg_length <= 5) and (not sys.argv[6]): | |
print("renaming input folder to reflect completed import") | |
rename_folder(sys.argv[2]) | |
if __name__ == '__main__': | |
os.system('mode con: cols=150 lines=14') | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Example:
python v2_import.py comment data\2018\comments research_db comment_collection
Parameters explained: