|
import os |
|
import sys |
|
import re |
|
from xml.etree import cElementTree |
|
from itertools import chain |
|
|
|
from elasticsearch import Elasticsearch |
|
from elasticsearch.helpers import streaming_bulk |
|
|
|
POST_TYPES = { |
|
1: 'question', |
|
2: 'answer', |
|
} |
|
|
|
user_mapping = { |
|
"properties": { |
|
"display_name": { |
|
"type": "string", |
|
"fields": { |
|
"raw": {"type" : "string", "analyzer" : "keyword"}, |
|
} |
|
} |
|
} |
|
} |
|
|
|
INDEX_NAME = 'stack' |
|
INDEX_SETTINGS= { |
|
"settings": { |
|
"analysis": { |
|
"analyzer": { |
|
"html_strip": { |
|
"tokenizer": "standard", |
|
"filter": ["standard", "lowercase", "stop", "snowball"], |
|
"char_filter": ["html_strip"] |
|
} |
|
} |
|
} |
|
}, |
|
"mappings": { |
|
"question": { |
|
"properties": { |
|
"body": {"type": "string", "analyzer": "html_strip"}, |
|
"owner": user_mapping, |
|
"last_editor": user_mapping, |
|
"tags": {"type" : "string", "analyzer" : "keyword"}, |
|
"comments": { |
|
"type": "nested", |
|
"properties": { |
|
"owner": user_mapping, |
|
} |
|
} |
|
} |
|
}, |
|
"answer": { |
|
"_parent": { |
|
"type": "question" |
|
}, |
|
"properties": { |
|
"owner": user_mapping, |
|
"comments": { |
|
"type": "nested", |
|
"properties": { |
|
"owner": user_mapping |
|
} |
|
} |
|
} |
|
}, |
|
"user": user_mapping, |
|
} |
|
} |
|
|
|
# regext for camelcase -> underscore conversion |
|
under_re = re.compile(r'([a-z])([A-Z])') |
|
# find all tags |
|
tag_re = re.compile(r'<([^>]+)>') |
|
# in memory user cache |
|
users = {} |
|
questions = set() |
|
answers = {} |
|
|
|
def _parse(xml_file): |
|
with open(xml_file) as input: |
|
root = cElementTree.iterparse(input) |
|
|
|
for event, e in root: |
|
if event != 'end' or e.tag != 'row': |
|
continue |
|
yield _row_to_dict(e) |
|
|
|
def _row_to_dict(row): |
|
return dict( |
|
( |
|
under_re.sub(r'\1_\2', k).lower(), |
|
int(v) if v.isdigit() else v |
|
) for (k, v) in row.items() |
|
) |
|
|
|
def parse_users(users_file='Users.xml'): |
|
""" |
|
Build global lookup dict for users along the way |
|
|
|
<row |
|
Id="2" |
|
Reputation="101" |
|
CreationDate="2011-01-03T20:14:55.783" |
|
DisplayName="Geoff Dalgas" |
|
LastAccessDate="2012-12-19T00:28:45.110" |
|
WebsiteUrl="http://stackoverflow.com" |
|
Location="Corvallis, OR" |
|
AboutMe="<p>Developer on ...." |
|
Views="6" |
|
UpVotes="6" |
|
DownVotes="0" |
|
EmailHash="b437f461b3fd27387c5d8ab47a293d35" |
|
Age="36" |
|
/> |
|
""" |
|
for user in _parse(users_file): |
|
users[user['id']] = { |
|
'id': user['id'], |
|
'display_name': user['display_name'], |
|
} |
|
if 'location' in user: |
|
users[user['id']]['location'] = user['location'] |
|
yield { |
|
'_type': 'user', |
|
'_id': user['id'], |
|
'_source': user |
|
} |
|
|
|
def parse_comments(comments_file='Comments.xml'): |
|
""" |
|
Comments.xml: |
|
|
|
<row |
|
Id="9" |
|
PostId="9" |
|
Score="3" |
|
Text="Point.... " |
|
CreationDate="2011-01-03T21:16:09.603" |
|
UserId="60" |
|
/> |
|
""" |
|
for comment in _parse(comments_file): |
|
if 'user_id' in comment: |
|
comment['owner'] = users[comment.pop('user_id')] |
|
elif 'user_display_name' in comment: |
|
comment['owner'] = {'display_name': comment.pop('user_display_name')} |
|
|
|
meta = { |
|
'_type': '', |
|
'_op_type': 'update', |
|
'_id': comment['post_id'], |
|
'script': 'ctx._source.comments += comment', |
|
'params': {'comment': comment} |
|
} |
|
if comment['post_id'] in answers: |
|
meta['_type'] = 'answer' |
|
meta['_routing'] = meta['_parent'] = answers[comment['post_id']] |
|
else: |
|
meta['_type'] = 'question' |
|
yield meta |
|
|
|
|
|
def parse_posts(posts_file='Posts.xml'): |
|
""" |
|
Posts.xml: |
|
Q: <row |
|
Id="5" |
|
PostTypeId="1" |
|
AcceptedAnswerId="73" |
|
CreationDate="2011-01-03T20:52:52.880" |
|
Score="39" |
|
ViewCount="5638" |
|
Body="<p>....." |
|
OwnerUserId="24" |
|
LastEditorUserId="97" |
|
LastEditDate="2011-01-06T11:34:27.610" |
|
LastActivityDate="2012-01-27T19:12:50.900" |
|
Title="What are the differences between NoSQL and a traditional RDBMS?" |
|
Tags="<nosql><rdbms><database-recommendation>" |
|
AnswerCount="5" |
|
CommentCount="0" |
|
FavoriteCount="22" |
|
/> |
|
|
|
A: <row |
|
Id="12" |
|
PostTypeId="2" |
|
ParentId="3" |
|
CreationDate="2011-01-03T21:01:19.160" |
|
Score="15" |
|
Body="<p>In ..." |
|
OwnerUserId="14" |
|
LastActivityDate="2011-01-03T21:01:19.160" |
|
CommentCount="3" |
|
/> |
|
|
|
|
|
""" |
|
|
|
for post in _parse(posts_file): |
|
try: |
|
post_type = POST_TYPES[post.pop('post_type_id')] |
|
except KeyError: |
|
# unknown post type, ignore |
|
continue |
|
|
|
for k in ('owner_user_id', 'last_editor_user_id'): |
|
if k in post: |
|
post[k[:-8]] = users[post.pop(k)] |
|
|
|
if 'score' in post: |
|
post['rating'] = post.pop('score') |
|
|
|
if 'tags' in post: |
|
post['tags'] = tag_re.findall(post['tags']) |
|
|
|
post['comments'] = [] |
|
|
|
meta = {'_type': post_type, '_id': post['id']} |
|
|
|
if post_type == 'answer': |
|
answers[post['id']] = meta['_parent'] = post.pop('parent_id') |
|
|
|
meta['_source'] = post |
|
|
|
yield meta |
|
|
|
def parse_all(index_name=INDEX_NAME): |
|
es = Elasticsearch() |
|
es.indices.delete(index=index_name, ignore=404) |
|
es.indices.create(index=index_name, body=INDEX_SETTINGS) |
|
for success, data in streaming_bulk(es, chain(parse_users(), parse_posts(), parse_comments()), index=index_name): |
|
if success: |
|
sys.stdout.write('.'); sys.stdout.flush() |
|
else: |
|
sys.stdout.write('X'); sys.stdout.flush() |
|
|
|
if __name__ == '__main__': |
|
index_name = 'stack' |
|
if len(sys.argv) > 1: |
|
os.chdir(sys.argv[1]) |
|
index_name = os.path.basename(os.path.abspath(os.curdir)) |
|
parse_all(index_name) |