Skip to content

Instantly share code, notes, and snippets.

@mt3
Forked from mattweber/README
Created March 1, 2013 12:07
Show Gist options
  • Save mt3/5064227 to your computer and use it in GitHub Desktop.
Save mt3/5064227 to your computer and use it in GitHub Desktop.
Use extractDocs.py to parse and index the StackOverflow posts.xml file into an existing index.
Usage: extractDocs.py [options] file
Options:
-h, --help show this help message and exit
-s SERVER, --server=SERVER
ElasticSearch Server
-i INDEX, --index=INDEX
Index name to use
-b BULKSIZE, --bulk-size=BULKSIZE
Number of docs to submit in each bulk request.
-m MAXDOCS, --max-docs=MAXDOCS
Max number of docs to index
-v, --verbose Enable verbose output
Use the setup script to execute the following steps:
- create index with parent mapping
- feed documents using bulk indexing via extractDocs.py
- optimize the index
- execute the query
Run with ./setup.sh /path/to/posts.xml
import re
import sys
import json
import httplib
from datetime import datetime
from HTMLParser import HTMLParser
from optparse import OptionParser
# pre-compiled regular expressions
rowRe = re.compile('^\s*<row') # detects a row
attrRe = re.compile('(\w+)="(.*?)"') # extracts all attribues and values
cleanupRe = re.compile('<[^<]+?>|[\r\n]+|\s+') # strips out html and extra whitespace
tagsRe = re.compile('&lt;(.*?)&gt;') # splits tags into a list
intRe = re.compile('^\d+$') # determines if field is an integer
def http(method, path, data):
""" Execute a Generic HTTP Request againt ElasticSearch """
conn = httplib.HTTPConnection(HOST)
conn.request(method, path, data)
return conn.getresponse()
def bulk(data):
""" Submit Bulk ElasticSearch Request """
resp = http('POST', '/%s/_bulk' % INDEX, data)
# load response and check for errors
status = json.loads(resp.read())
for stat in status['items']:
if not stat['index']['ok']:
print json.dumps(stat)
return resp
def main(fileName):
""" Parse StackExchange data into ElasticSearch Bulk Format """
# html parser used to unescape the body and title
html = HTMLParser()
with open(fileName) as f:
docs = []
for i, line in enumerate(f):
# skip line if not a row
if rowRe.match(line) is None:
continue
# build the document to be indexed
doc = {}
for field, val in attrRe.findall(line):
# strip whitespace and skip field if empty value
val = val.strip()
if not val:
continue
# cleanup title and body by stripping html and whitespace
if field in ['Body', 'Title']:
val = cleanupRe.sub(' ', html.unescape(unicode(val, 'utf-8', 'ignore')))
# make sure dates are in correct format
elif field in ['CreationDate', 'LastActivityDate', 'LastEditDate']:
# 2008-07-31T21:42:52.667
val = '%sZ' % val
# parse creation month, day, hour, and minute
if field == 'CreationDate':
dateObj = datetime.strptime(val, '%Y-%m-%dT%H:%M:%S.%fZ')
doc['CreationMonth'] = dateObj.strftime('%B')
doc['CreationDay'] = dateObj.strftime('%A')
doc['CreationHour'] = dateObj.strftime('%H')
doc['CreationMinute'] = dateObj.strftime('%M')
# split tags into an aray of tags
elif field == 'Tags':
val = tagsRe.findall(val)
# convert vals to integers if needed
elif intRe.match(val) is not None:
val = int(val)
doc[field] = val
# create the bulk action
action = {'_id': '%s' % doc['Id'], '_type': 'question'}
if doc['PostTypeId'] == 2:
action['_type'] = 'answer'
action['_parent'] = '%s' % doc['ParentId']
# queue bulk json request
docs.append(json.dumps({'index': action}))
docs.append(json.dumps(doc))
# submit bulk request
if len(docs) == BULK_SIZE * 2: # multiple by 2 to account for action
if VERBOSE:
print 'Submitting %s docs...' % BULK_SIZE
bulk('%s\n' % '\n'.join(docs)) # newline so last item is processed
docs = []
# only index a subset of the posts
# set max to -1 for all docs
if MAX_DOCS != -1 and i == MAX_DOCS + 1:
if VERBOSE:
print 'Hit max document count of %s' % MAX_DOCS
break;
# submit any hanging requests
if len(docs) > 0:
if VERBOSE:
print 'Submitting remaing %s docs in queue...' % (len(docs) / 2)
bulk('%s\n' % '\n'.join(docs))
return 0
if __name__ == '__main__':
usage = 'usage: %prog [options] file'
parser = OptionParser(usage)
parser.add_option('-s', '--server', dest='server', default='localhost:9200', help='ElasticSearch Server')
parser.add_option('-i', '--index', dest='index', default='stackoverflow', help='Index name to use')
parser.add_option('-b', '--bulk-size', dest='bulkSize', type='int', default=100000, help='Number of docs to submit in each bulk request.')
parser.add_option('-m', '--max-docs', dest='maxDocs', type='int', default=-1, help='Max number of docs to index')
parser.add_option('-v', '--verbose', dest='verbose', action='store_true', default=False, help='Enable verbose output')
options, args = parser.parse_args()
if len(args) != 1:
parser.error('The StackOverflow posts.xml file location must be specified')
# globals
HOST = options.server
INDEX = options.index
BULK_SIZE = options.bulkSize
MAX_DOCS = options.maxDocs
VERBOSE = options.verbose
ret = main(args[0])
sys.exit(ret)
#!/bin/bash
HOST="localhost:9200"
INDEX="stackoverflow"
BATCH_SIZE=10000
MAX_DOCS=1500000
echo "Deleting Existing Index"
curl -XDELETE "http://${HOST}/${INDEX}"
echo -e "\n\nCreating index: ${INDEX}"
curl -XPUT "http://${HOST}/${INDEX}" -d '{
"settings": {
"number_of_replicas": 0,
"refresh_interval": -1
},
"mappings": {
"question": {
"_all": {"enabled": false}
},
"answer": {
"_all": {"enabled": false},
"_parent": {
"type": "question"
}
}
}
}'
echo -e "\n\nFeeding Documents"
python extractDocs.py -s ${HOST} -i ${INDEX} -b ${BATCH_SIZE} -m ${MAX_DOCS} -v $1
echo -e "\n\nEnabling index refresh interval"
curl -XPUT "http://${HOST}/${INDEX}/_settings" -d '{
"index": {
"refresh_interval": "1s"
}
}'
echo -e "\n\nOptimizing index"
curl -XPOST "http://${HOST}/${INDEX}/_optimize?max_num_segments=5"
echo -e "\n\nExecuting Search"
curl -XGET "http://${HOST}/${INDEX}/_search?pretty=true" -d '{
"size": 0,
"query": {
"has_parent": {
"parent_type": "question",
"_scope": "parents",
"query": {
"query_string": {
"query": "+java +javascript",
"default_field": "Tags"
}
}
}
},
"facets": {
"month": {
"terms": {
"field": "CreationMonth"
}
},
"day": {
"terms": {
"field": "CreationDay"
}
},
"hour": {
"terms": {
"field": "CreationHour",
"size": 10
}
},
"minute": {
"terms": {
"field": "CreationMinute",
"size": 10
}
},
"score": {
"statistical": {
"field": "Score"
},
"scope": "parents"
},
"score_by_tags": {
"terms_stats": {
"key_field": "Tags",
"value_field": "Score"
},
"scope": "parents"
}
}
}'
echo -e "\n\nDone."
exit 0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment