-
-
Save mt3/5064227 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Use extractDocs.py to parse and index the StackOverflow posts.xml file into an existing index. | |
Usage: extractDocs.py [options] file | |
Options: | |
-h, --help show this help message and exit | |
-s SERVER, --server=SERVER | |
ElasticSearch Server | |
-i INDEX, --index=INDEX | |
Index name to use | |
-b BULKSIZE, --bulk-size=BULKSIZE | |
Number of docs to submit in each bulk request. | |
-m MAXDOCS, --max-docs=MAXDOCS | |
Max number of docs to index | |
-v, --verbose Enable verbose output | |
Use the setup script to execute the following steps: | |
- create index with parent mapping | |
- feed documents using bulk indexing via extractDocs.py | |
- optimize the index | |
- execute the query | |
Run with ./setup.sh /path/to/posts.xml |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import sys | |
import json | |
import httplib | |
from datetime import datetime | |
from HTMLParser import HTMLParser | |
from optparse import OptionParser | |
# pre-compiled regular expressions | |
rowRe = re.compile('^\s*<row') # detects a row | |
attrRe = re.compile('(\w+)="(.*?)"') # extracts all attribues and values | |
cleanupRe = re.compile('<[^<]+?>|[\r\n]+|\s+') # strips out html and extra whitespace | |
tagsRe = re.compile('<(.*?)>') # splits tags into a list | |
intRe = re.compile('^\d+$') # determines if field is an integer | |
def http(method, path, data): | |
""" Execute a Generic HTTP Request againt ElasticSearch """ | |
conn = httplib.HTTPConnection(HOST) | |
conn.request(method, path, data) | |
return conn.getresponse() | |
def bulk(data): | |
""" Submit Bulk ElasticSearch Request """ | |
resp = http('POST', '/%s/_bulk' % INDEX, data) | |
# load response and check for errors | |
status = json.loads(resp.read()) | |
for stat in status['items']: | |
if not stat['index']['ok']: | |
print json.dumps(stat) | |
return resp | |
def main(fileName): | |
""" Parse StackExchange data into ElasticSearch Bulk Format """ | |
# html parser used to unescape the body and title | |
html = HTMLParser() | |
with open(fileName) as f: | |
docs = [] | |
for i, line in enumerate(f): | |
# skip line if not a row | |
if rowRe.match(line) is None: | |
continue | |
# build the document to be indexed | |
doc = {} | |
for field, val in attrRe.findall(line): | |
# strip whitespace and skip field if empty value | |
val = val.strip() | |
if not val: | |
continue | |
# cleanup title and body by stripping html and whitespace | |
if field in ['Body', 'Title']: | |
val = cleanupRe.sub(' ', html.unescape(unicode(val, 'utf-8', 'ignore'))) | |
# make sure dates are in correct format | |
elif field in ['CreationDate', 'LastActivityDate', 'LastEditDate']: | |
# 2008-07-31T21:42:52.667 | |
val = '%sZ' % val | |
# parse creation month, day, hour, and minute | |
if field == 'CreationDate': | |
dateObj = datetime.strptime(val, '%Y-%m-%dT%H:%M:%S.%fZ') | |
doc['CreationMonth'] = dateObj.strftime('%B') | |
doc['CreationDay'] = dateObj.strftime('%A') | |
doc['CreationHour'] = dateObj.strftime('%H') | |
doc['CreationMinute'] = dateObj.strftime('%M') | |
# split tags into an aray of tags | |
elif field == 'Tags': | |
val = tagsRe.findall(val) | |
# convert vals to integers if needed | |
elif intRe.match(val) is not None: | |
val = int(val) | |
doc[field] = val | |
# create the bulk action | |
action = {'_id': '%s' % doc['Id'], '_type': 'question'} | |
if doc['PostTypeId'] == 2: | |
action['_type'] = 'answer' | |
action['_parent'] = '%s' % doc['ParentId'] | |
# queue bulk json request | |
docs.append(json.dumps({'index': action})) | |
docs.append(json.dumps(doc)) | |
# submit bulk request | |
if len(docs) == BULK_SIZE * 2: # multiple by 2 to account for action | |
if VERBOSE: | |
print 'Submitting %s docs...' % BULK_SIZE | |
bulk('%s\n' % '\n'.join(docs)) # newline so last item is processed | |
docs = [] | |
# only index a subset of the posts | |
# set max to -1 for all docs | |
if MAX_DOCS != -1 and i == MAX_DOCS + 1: | |
if VERBOSE: | |
print 'Hit max document count of %s' % MAX_DOCS | |
break; | |
# submit any hanging requests | |
if len(docs) > 0: | |
if VERBOSE: | |
print 'Submitting remaing %s docs in queue...' % (len(docs) / 2) | |
bulk('%s\n' % '\n'.join(docs)) | |
return 0 | |
if __name__ == '__main__': | |
usage = 'usage: %prog [options] file' | |
parser = OptionParser(usage) | |
parser.add_option('-s', '--server', dest='server', default='localhost:9200', help='ElasticSearch Server') | |
parser.add_option('-i', '--index', dest='index', default='stackoverflow', help='Index name to use') | |
parser.add_option('-b', '--bulk-size', dest='bulkSize', type='int', default=100000, help='Number of docs to submit in each bulk request.') | |
parser.add_option('-m', '--max-docs', dest='maxDocs', type='int', default=-1, help='Max number of docs to index') | |
parser.add_option('-v', '--verbose', dest='verbose', action='store_true', default=False, help='Enable verbose output') | |
options, args = parser.parse_args() | |
if len(args) != 1: | |
parser.error('The StackOverflow posts.xml file location must be specified') | |
# globals | |
HOST = options.server | |
INDEX = options.index | |
BULK_SIZE = options.bulkSize | |
MAX_DOCS = options.maxDocs | |
VERBOSE = options.verbose | |
ret = main(args[0]) | |
sys.exit(ret) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
HOST="localhost:9200" | |
INDEX="stackoverflow" | |
BATCH_SIZE=10000 | |
MAX_DOCS=1500000 | |
echo "Deleting Existing Index" | |
curl -XDELETE "http://${HOST}/${INDEX}" | |
echo -e "\n\nCreating index: ${INDEX}" | |
curl -XPUT "http://${HOST}/${INDEX}" -d '{ | |
"settings": { | |
"number_of_replicas": 0, | |
"refresh_interval": -1 | |
}, | |
"mappings": { | |
"question": { | |
"_all": {"enabled": false} | |
}, | |
"answer": { | |
"_all": {"enabled": false}, | |
"_parent": { | |
"type": "question" | |
} | |
} | |
} | |
}' | |
echo -e "\n\nFeeding Documents" | |
python extractDocs.py -s ${HOST} -i ${INDEX} -b ${BATCH_SIZE} -m ${MAX_DOCS} -v $1 | |
echo -e "\n\nEnabling index refresh interval" | |
curl -XPUT "http://${HOST}/${INDEX}/_settings" -d '{ | |
"index": { | |
"refresh_interval": "1s" | |
} | |
}' | |
echo -e "\n\nOptimizing index" | |
curl -XPOST "http://${HOST}/${INDEX}/_optimize?max_num_segments=5" | |
echo -e "\n\nExecuting Search" | |
curl -XGET "http://${HOST}/${INDEX}/_search?pretty=true" -d '{ | |
"size": 0, | |
"query": { | |
"has_parent": { | |
"parent_type": "question", | |
"_scope": "parents", | |
"query": { | |
"query_string": { | |
"query": "+java +javascript", | |
"default_field": "Tags" | |
} | |
} | |
} | |
}, | |
"facets": { | |
"month": { | |
"terms": { | |
"field": "CreationMonth" | |
} | |
}, | |
"day": { | |
"terms": { | |
"field": "CreationDay" | |
} | |
}, | |
"hour": { | |
"terms": { | |
"field": "CreationHour", | |
"size": 10 | |
} | |
}, | |
"minute": { | |
"terms": { | |
"field": "CreationMinute", | |
"size": 10 | |
} | |
}, | |
"score": { | |
"statistical": { | |
"field": "Score" | |
}, | |
"scope": "parents" | |
}, | |
"score_by_tags": { | |
"terms_stats": { | |
"key_field": "Tags", | |
"value_field": "Score" | |
}, | |
"scope": "parents" | |
} | |
} | |
}' | |
echo -e "\n\nDone." | |
exit 0 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment