Created
December 4, 2012 19:15
-
-
Save roman-yepishev/4207657 to your computer and use it in GitHub Desktop.
Disqus importer to WordPress
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
"""Quick and dirty hack to get disqus comments into WordPress DB""" | |
import sys | |
from datetime import datetime | |
from xml.etree import ElementTree as ET | |
import MySQLdb | |
ANONYMOUS_EMAIL = '[email protected]' | |
# Database configuration | |
DATABASE = { | |
'host': 'lab.lappyfamily.net', | |
'user': 'rtg', | |
'name': 'rtginua6_wp1', | |
} | |
# Admin information | |
ADMIN_INFO = { | |
'comment_author': 'Roman Yepishev', | |
'comment_author_email': '[email protected]', | |
'user_id': 1 | |
} | |
# Names used in disqus that represent administrator | |
ADMIN_ALIASES = set(['rtg', 'Roman', 'rye']) | |
class DisqusImporter(object): | |
""" | |
Imports Disqus XML into MySQL database for WordPress | |
""" | |
def __init__(self): | |
self.wpdb = MySQLdb.connect(host=DATABASE['host'], | |
user=DATABASE['user'], | |
db=DATABASE['name'], | |
charset='utf8') | |
self.wp_post_url_to_id = {} | |
def make_wordpress_url_map(self): | |
"""Creates URL->ID map for WordPress URLS""" | |
cursor = self.wpdb.cursor() | |
cursor.execute(""" | |
SELECT | |
guid, id | |
FROM | |
wp_posts | |
WHERE | |
post_type = 'post' | |
""") | |
for row in cursor: | |
self.wp_post_url_to_id[row[0]] = row[1] | |
def parse_disqus_comments(self, path): | |
"""Parse comments creating WP-like structure""" | |
NS = '{http://disqus.com}' | |
NS_DI = '{http://disqus.com/disqus-internals}' | |
tree = ET.parse(path) | |
root = tree.getroot() | |
comments = {} | |
thread_id_to_url = {} | |
# Gathering post threads identifiers. | |
# Each thread corresponds to a blog post | |
for thread in root.findall(NS + 'thread'): | |
dsq_id = thread.attrib[NS_DI + 'id'] | |
link = thread.find(NS + 'link') | |
thread_id_to_url[dsq_id] = link.text | |
# Parsing posts | |
for post in root.findall(NS + 'post'): | |
dsq_id = post.attrib[NS_DI + 'id'] | |
thread_id = post.find(NS + 'thread').attrib[NS_DI + 'id'] | |
# If we don't have the mapping from post to | |
# WordPress post ID, we can't # proceed with this comment | |
thread_url = thread_id_to_url[thread_id] | |
if thread_url not in self.wp_post_url_to_id: | |
print "Skipping comment for {}".format(thread_url) | |
continue | |
created_at = post.find(NS + 'createdAt').text | |
author = post.find(NS + 'author') | |
email = author.find(NS + 'email').text | |
name = author.find(NS + 'name').text | |
parent = post.find(NS + 'parent') | |
if parent is not None: | |
parent_id = parent.attrib[NS_DI + 'id'] | |
else: | |
parent_id = None | |
# MySQL issues a warning if we stuff data in YYY-mm-ddTHH:MM:SSZ | |
comment_date = datetime.strptime(created_at, '%Y-%m-%dT%H:%M:%SZ' | |
).strftime('%Y-%m-%d %H:%M:%S') | |
comment_post_id = self.wp_post_url_to_id[thread_url] | |
post_data = { | |
'comment_post_ID': comment_post_id, | |
'comment_content': post.find(NS + 'message').text, | |
'comment_date': comment_date, | |
'comment_date_gmt': comment_date, | |
'comment_author': name, | |
'comment_author_IP': post.find(NS + 'ipAddress').text, | |
'comment_author_email': email if email else ANONYMOUS_EMAIL, | |
'user_id': 0, | |
'parent_id': parent_id, | |
'children': [] | |
} | |
# Fixup for my own comments | |
if name in ADMIN_ALIASES: | |
post_data.update(ADMIN_INFO) | |
comments[dsq_id] = post_data | |
# First pass - creating comment tree | |
for comment in comments.values(): | |
if comment['parent_id']: | |
parent_comment = comments[comment['parent_id']] | |
parent_comment['children'].append(comment) | |
# Second pass - dropping posts that are not toplevel | |
# They are already in 'children' | |
for comment_id in comments.keys(): | |
# If it is still here (we could have deleted it) | |
if comment_id in comments: | |
comment = comments[comment_id] | |
else: | |
continue | |
if comment['parent_id']: | |
del comments[comment_id] | |
return comments | |
def add_comment(self, comment, parent_id): | |
"""Add comment and all the child comments to the DB""" | |
cursor = self.wpdb.cursor() | |
comment['comment_parent'] = parent_id | |
cursor.execute(""" | |
INSERT INTO wp_comments ( | |
comment_post_ID, comment_author, | |
comment_author_email, comment_author_IP, | |
comment_date, comment_date_gmt, | |
comment_content, comment_parent, | |
user_id) | |
VALUES ( | |
%(comment_post_ID)s, %(comment_author)s, | |
%(comment_author_email)s, %(comment_author_IP)s, | |
%(comment_date)s, %(comment_date_gmt)s, | |
%(comment_content)s, %(comment_parent)s, | |
%(user_id)s) | |
""", comment | |
) | |
parent_id = cursor.lastrowid | |
for item in comment['children']: | |
self.add_comment(item, parent_id) | |
def update_comment_count(self): | |
""" | |
Synchronizes cached comment count with the actual number of | |
comments | |
""" | |
cursor = self.wpdb.cursor() | |
# Update post counts | |
cursor.execute(""" | |
UPDATE | |
wp_posts AS p | |
LEFT JOIN ( | |
SELECT | |
comment_post_ID, | |
count(comment_post_ID) as comment_count | |
FROM | |
wp_comments | |
WHERE | |
comment_approved = '1' | |
GROUP BY comment_post_ID | |
) as c | |
ON | |
p.id = c.comment_post_ID | |
SET | |
p.comment_count = c.comment_count | |
WHERE | |
p.id = c.comment_post_ID | |
""") | |
def main(self, path): | |
"""Entry point""" | |
self.make_wordpress_url_map() | |
comments = self.parse_disqus_comments(path) | |
for item in comments.values(): | |
self.add_comment(item, 0) | |
self.update_comment_count() | |
if __name__ == "__main__": | |
importer = DisqusImporter() | |
importer.main(sys.argv[1]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment