Created
January 26, 2018 01:53
-
-
Save bicepjai/571c47fc1cb661ca81672420e64d235c to your computer and use it in GitHub Desktop.
script to injest reddit comments data to elastic search for better searching. edited field is ignored when ingesting
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import os | |
import re | |
import gc | |
import traceback | |
import tqdm | |
import mmap | |
import datetime as dt | |
import bz2 | |
import json | |
import string | |
import itertools | |
import multiprocessing as mp | |
import time | |
from elasticsearch import Elasticsearch, TransportError | |
from pathlib import Path | |
import logging | |
import logging.config | |
#===================================================== | |
# logging setup | |
#===================================================== | |
logging_config = dict( | |
version = 1, | |
formatters = { | |
'f': {'format': | |
'%(asctime)s %(name)-12s %(processName)s %(levelname)-8s %(message)s'} | |
}, | |
handlers = { | |
'h': {'class': 'logging.StreamHandler', | |
'formatter': 'f', | |
'level': logging.DEBUG} | |
}, | |
root = { | |
'handlers': ['h'], | |
'level': logging.DEBUG, | |
}, | |
) | |
logging.config.dictConfig(logging_config) | |
logger = logging.getLogger('reddit_comments_to_es') | |
logger.propagate = False | |
fh = logging.FileHandler('reddit_comments_to_es.log') | |
fh.setLevel(logging.DEBUG) | |
logger.addHandler(fh) | |
#===================================================== | |
# helper functions | |
#===================================================== | |
def getNumberOfLines(file_path): | |
fp = open(file_path, "r+") | |
buf = mmap.mmap(fp.fileno(), 0) | |
lines = 0 | |
while buf.readline(): | |
lines += 1 | |
gc.collect() | |
return lines | |
def process_line(in_queue, es, index_name, doc_type): | |
json_data = None | |
retry = 0 | |
retry_error = "" | |
json_fix_re = re.compile(r'\'failed\sto\sparse\s\[(\w+)\]\'') | |
while True: | |
try: | |
if retry == 0 or retry > 3: | |
line_no, line = in_queue.get() | |
retry = 0 | |
# logger.debug('getting new ones {}'.format(line_no)) | |
# exit signal | |
if line == None: | |
logger.debug("completed ... ") | |
return | |
# put data into file | |
json_data = json.loads(line) | |
if 'edited' in json_data: | |
del json_data['edited'] | |
json_fix_field = json_fix_re.search(retry_error) | |
if retry > 0 and json_fix_field: | |
field = json_fix_field.group(1) | |
logger.debug('TransportError: issue with {}: updated {} to {}'.format(field,json_data[field],default_mappings[field])) | |
if field in json_data: | |
# logger.debug('Issue Json: {}'.format(json_data)) | |
json_data[field] = default_mappings[field] | |
# logger.debug('Fixed Json: {}'.format(json_data)) | |
es.index(index=index_name, doc_type=doc_type, body=json_data) | |
retry = 0 | |
except TransportError as te: | |
retry_error = str(te) | |
logger.debug('TransportError: {}'.format(retry_error)) | |
logger.debug('json_data: {}: {}'.format(line_no, line)) | |
retry += 1 | |
logger.debug('retrying ... {}'.format(str(retry))) | |
except Exception as e: | |
logger.debug('Exception in process: {}'.format(str(e))) | |
logger.debug('parent process:{} process id:{}'.format(os.getppid(), os.getpid())) | |
logger.debug('index_name: {} doc_type: {}'.format(index_name, doc_type)) | |
#===================================================== | |
# file names | |
#===================================================== | |
reddit_raw_file_path = "/media/bicepjai/data/reddit/reddit_data/raw_data" | |
# ================================================= | |
# elastic search instance needs to be running | |
# ================================================= | |
""" | |
docker run -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -v /home/bicepjai/Projects/docker_files/elastic-search/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml -v /media/bicepjai/data/reddit/reddit_data/elasticsearch/data:/var/lib/elasticsearch -v /media/bicepjai/data/reddit/reddit_data/elasticsearch/logs:/var/log/elasticsearch docker.elastic.co/elasticsearch/elasticsearch-oss:6.1.2 | |
""" | |
index_mappings = { | |
"settings": { | |
"index.mapping.ignore_malformed": True, | |
"dynamic": False | |
}, | |
"mappings": { | |
"subreddit": { "null_value": "null", "type": "text" }, | |
"body": { "null_value": "null", "type": "text" }, | |
"stickied": { "null_value": False, "type": "boolean" }, | |
"gilded": { "null_value": 0, "type": "integer" }, | |
"score": { "null_value": 0, "type": "integer" }, | |
"author": { "null_value": "null", "type": "text" }, | |
"link_id": { "null_value": "null", "type": "text" }, | |
"retrieved_on": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" }, | |
"subreddit_id": { "null_value": "null", "type": "text" }, | |
"controversiality": { "null_value": 0, "type": "integer" }, | |
"author_flair_css_class": { "null_value": "null", "type": "text" }, | |
"author_flair_text": { "null_value": "null", "type": "text" }, | |
"created_utc": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" }, | |
"distinguished": { "null_value": "null", "type": "text" }, | |
"parent_id": { "null_value": "null", "type": "text" }, | |
"id": { "null_value": "null", "type": "text" }, | |
# "edited": { "null_value": False, "type": "boolean" } | |
} | |
} | |
default_mappings = { | |
"subreddit": "nul", | |
"body": "nul", | |
"stickied": False, | |
"gilded": 0, | |
"score": 0, | |
"author": "nul", | |
"link_id": "nul", | |
"retrieved_on": 0, | |
"subreddit_id": "nul", | |
"controversiality": 0, | |
"author_flair_css_class": "nul", | |
"author_flair_text": "nul", | |
"created_utc": 0, | |
"distinguished": "nul", | |
"parent_id": "nul", | |
"id": "nul", | |
# "edited": False | |
} | |
try: | |
es = Elasticsearch() | |
logger.debug('Elasticsearch Info: %s', str(es.info())) | |
# ==================================================================== | |
# order all the files that needs to be read and update elastic search | |
# ==================================================================== | |
all_files = [] | |
for filename in Path(reddit_raw_file_path).glob('*/*.bz2'): | |
all_files += [str(filename)] | |
all_files = sorted(all_files, reverse=True) | |
logger.debug('total number of files: %s', len(all_files)) | |
# ==================================================================== | |
# iterate thru files | |
# ==================================================================== | |
gc.collect() | |
# testing | |
# all_files = ['/home/bicepjai/Projects/ml-compete/kaggle/05_toxic/dataset/external/reddit_comments/sample.json'] | |
# go through all the non processes or partially processed files | |
for current_file in all_files: | |
logger.debug('writing from file .... ') | |
logger.debug('current_file: %s', current_file) | |
total_lines = getNumberOfLines(current_file) | |
logger.debug('total_lines in current_file: %s', total_lines) | |
year_str = current_file.split('/')[-2] | |
# testing | |
# year_str = "2017" | |
index_name = "reddit-comments-"+year_str | |
doc_type = "json-comments" | |
logger.debug('Elastic Search index_name: %s', index_name) | |
logger.debug('Elastic Search doc_type: %s', doc_type) | |
# testing | |
# es.indices.delete(index=index_name, ignore=[400, 404]) | |
es.indices.create(index=index_name, ignore=400, body=index_mappings) | |
# lets create multiple processes and queues | |
# to make the loading faster | |
num_workers = 11 | |
manager = mp.Manager() | |
work_q = manager.Queue(num_workers) | |
# start for worker processes | |
pools = [] | |
for i in range(num_workers): | |
es1 = Elasticsearch() | |
p = mp.Process(target=process_line, args=(work_q, es1, index_name, doc_type)) | |
p.start() | |
pools.append(p) | |
logger.debug('created workers ...') | |
# produce data for all the processes | |
with bz2.BZ2File(current_file, 'r') as f: | |
# testing | |
# with open(current_file, 'r') as f: | |
iters = itertools.chain(f, (None,)*num_workers) | |
for num_and_line in tqdm.tqdm(enumerate(iters), total=total_lines): | |
work_q.put(num_and_line) | |
logger.debug('closing pool connections') | |
for p in pools: | |
p.join() | |
gc.collect() | |
except (KeyboardInterrupt, Exception) as e: | |
logger.debug('Exception/KeyboardInterrupt occured .... ') | |
logger.debug('current_file: %s', current_file) | |
logger.debug('Exception: %s', str(e)) | |
logger.debug('traceback.format_exc(): \n%s', traceback.format_exc()) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment