Skip to content

Instantly share code, notes, and snippets.

@honzakral
Last active May 21, 2023 21:21
Show Gist options
  • Save honzakral/35451c6322a78d4de8f5 to your computer and use it in GitHub Desktop.
Save honzakral/35451c6322a78d4de8f5 to your computer and use it in GitHub Desktop.

Stackoverflow to Elasticsearch

This script will load any stackoverflow site from the XML dump (retrievable at https://archive.org/details/stackexchange via torrent) into Elasticsearch.

To use just call:

python load_stack.py PATH

Where PATH is the directory where you store the XML files (Posts.xml, Users.xml and Comments.xml). The name of the target directory wull be used as the index name in Elasticsearch. The index will be deleted and recreated during the process. To use this script you must have the official python elasticsearch client installed.

import os
import sys
import re
from xml.etree import cElementTree
from itertools import chain
from elasticsearch import Elasticsearch
from elasticsearch.helpers import streaming_bulk
POST_TYPES = {
1: 'question',
2: 'answer',
}
user_mapping = {
"properties": {
"display_name": {
"type": "string",
"fields": {
"raw": {"type" : "string", "analyzer" : "keyword"},
}
}
}
}
INDEX_NAME = 'stack'
INDEX_SETTINGS= {
"settings": {
"analysis": {
"analyzer": {
"html_strip": {
"tokenizer": "standard",
"filter": ["standard", "lowercase", "stop", "snowball"],
"char_filter": ["html_strip"]
}
}
}
},
"mappings": {
"question": {
"properties": {
"body": {"type": "string", "analyzer": "html_strip"},
"owner": user_mapping,
"last_editor": user_mapping,
"tags": {"type" : "string", "analyzer" : "keyword"},
"comments": {
"type": "nested",
"properties": {
"owner": user_mapping,
}
}
}
},
"answer": {
"_parent": {
"type": "question"
},
"properties": {
"owner": user_mapping,
"comments": {
"type": "nested",
"properties": {
"owner": user_mapping
}
}
}
},
"user": user_mapping,
}
}
# regext for camelcase -> underscore conversion
under_re = re.compile(r'([a-z])([A-Z])')
# find all tags
tag_re = re.compile(r'<([^>]+)>')
# in memory user cache
users = {}
questions = set()
answers = {}
def _parse(xml_file):
with open(xml_file) as input:
root = cElementTree.iterparse(input)
for event, e in root:
if event != 'end' or e.tag != 'row':
continue
yield _row_to_dict(e)
def _row_to_dict(row):
return dict(
(
under_re.sub(r'\1_\2', k).lower(),
int(v) if v.isdigit() else v
) for (k, v) in row.items()
)
def parse_users(users_file='Users.xml'):
"""
Build global lookup dict for users along the way
<row
Id="2"
Reputation="101"
CreationDate="2011-01-03T20:14:55.783"
DisplayName="Geoff Dalgas"
LastAccessDate="2012-12-19T00:28:45.110"
WebsiteUrl="http://stackoverflow.com"
Location="Corvallis, OR"
AboutMe="&lt;p&gt;Developer on ...."
Views="6"
UpVotes="6"
DownVotes="0"
EmailHash="b437f461b3fd27387c5d8ab47a293d35"
Age="36"
/>
"""
for user in _parse(users_file):
users[user['id']] = {
'id': user['id'],
'display_name': user['display_name'],
}
if 'location' in user:
users[user['id']]['location'] = user['location']
yield {
'_type': 'user',
'_id': user['id'],
'_source': user
}
def parse_comments(comments_file='Comments.xml'):
"""
Comments.xml:
<row
Id="9"
PostId="9"
Score="3"
Text="Point.... "
CreationDate="2011-01-03T21:16:09.603"
UserId="60"
/>
"""
for comment in _parse(comments_file):
if 'user_id' in comment:
comment['owner'] = users[comment.pop('user_id')]
elif 'user_display_name' in comment:
comment['owner'] = {'display_name': comment.pop('user_display_name')}
meta = {
'_type': '',
'_op_type': 'update',
'_id': comment['post_id'],
'script': 'ctx._source.comments += comment',
'params': {'comment': comment}
}
if comment['post_id'] in answers:
meta['_type'] = 'answer'
meta['_routing'] = meta['_parent'] = answers[comment['post_id']]
else:
meta['_type'] = 'question'
yield meta
def parse_posts(posts_file='Posts.xml'):
"""
Posts.xml:
Q: <row
Id="5"
PostTypeId="1"
AcceptedAnswerId="73"
CreationDate="2011-01-03T20:52:52.880"
Score="39"
ViewCount="5638"
Body="&lt;p&gt;....."
OwnerUserId="24"
LastEditorUserId="97"
LastEditDate="2011-01-06T11:34:27.610"
LastActivityDate="2012-01-27T19:12:50.900"
Title="What are the differences between NoSQL and a traditional RDBMS?"
Tags="&lt;nosql&gt;&lt;rdbms&gt;&lt;database-recommendation&gt;"
AnswerCount="5"
CommentCount="0"
FavoriteCount="22"
/>
A: <row
Id="12"
PostTypeId="2"
ParentId="3"
CreationDate="2011-01-03T21:01:19.160"
Score="15"
Body="&lt;p&gt;In ..."
OwnerUserId="14"
LastActivityDate="2011-01-03T21:01:19.160"
CommentCount="3"
/>
"""
for post in _parse(posts_file):
try:
post_type = POST_TYPES[post.pop('post_type_id')]
except KeyError:
# unknown post type, ignore
continue
for k in ('owner_user_id', 'last_editor_user_id'):
if k in post:
post[k[:-8]] = users[post.pop(k)]
if 'score' in post:
post['rating'] = post.pop('score')
if 'tags' in post:
post['tags'] = tag_re.findall(post['tags'])
post['comments'] = []
meta = {'_type': post_type, '_id': post['id']}
if post_type == 'answer':
answers[post['id']] = meta['_parent'] = post.pop('parent_id')
meta['_source'] = post
yield meta
def parse_all(index_name=INDEX_NAME):
es = Elasticsearch()
es.indices.delete(index=index_name, ignore=404)
es.indices.create(index=index_name, body=INDEX_SETTINGS)
for success, data in streaming_bulk(es, chain(parse_users(), parse_posts(), parse_comments()), index=index_name):
if success:
sys.stdout.write('.'); sys.stdout.flush()
else:
sys.stdout.write('X'); sys.stdout.flush()
if __name__ == '__main__':
index_name = 'stack'
if len(sys.argv) > 1:
os.chdir(sys.argv[1])
index_name = os.path.basename(os.path.abspath(os.curdir))
parse_all(index_name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment