-
-
Save AviDuda/89031ae65cf1253e49e7 to your computer and use it in GitHub Desktop.
Blogger's backup file to WordPress' WXR converter for Disqus
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# Blogger's backup file to WordPress' WXR converter for Disqus. | |
# | |
# Original script: https://gist.github.com/fajran/5659455 | |
# | |
# Only tested with posts and comments, and NOT with pages. | |
# May not be efficient for huge blogs since the script keeps | |
# all content in the memory during conversion. | |
# | |
# Released as public domain. | |
# | |
# Required library: dateutil - https://labix.org/python-dateutil | |
# | |
# Usage: python blogger-to-disqus.py blogger.xml > output.xml | |
import sys | |
from datetime import datetime | |
from xml.dom.minidom import parse, parseString | |
from xml.dom import Node | |
import cgi | |
from HTMLParser import HTMLParser | |
import dateutil.parser | |
inp = sys.argv[1] | |
def d(*msg): | |
print >> sys.stderr, ' '.join(map(str, msg)) | |
class Blog(object): | |
class Author(object): | |
name = None | |
email = None | |
uri = None | |
class Entry(object): | |
entry_id = None | |
url = None | |
permalink = None | |
title = None | |
title_type = None | |
content = None | |
content_type = None | |
published = None | |
updated = None | |
author = None | |
related = None | |
class Post(Entry): | |
draft = False | |
def __init__(self): | |
self.labels = [] | |
self.comments = [] | |
class Comment(Entry): | |
pass | |
author = Author() | |
posts = [] | |
class BlogParser(object): | |
def __init__(self, atom_file): | |
self.atom_file = atom_file | |
def parse(self): | |
self.blog = Blog() | |
dom = parse(open(self.atom_file)) | |
feed = None | |
for child in dom.childNodes: | |
if child.nodeName == 'feed': | |
feed = child | |
break | |
if feed is not None: | |
self.parse_metadata(feed) | |
self.parse_entries(feed) | |
return self.blog | |
def get_text(self, el): | |
if el.nodeType == Node.TEXT_NODE: | |
return el.nodeValue | |
value = [] | |
if el.nodeType == Node.ELEMENT_NODE: | |
for child in el.childNodes: | |
value.append(self.get_text(child)) | |
return ''.join(value) | |
def parse_date(self, txt): | |
return dateutil.parser.parse(txt) | |
def parse_metadata(self, feed): | |
for child in feed.childNodes: | |
name = child.nodeName.split(':')[-1] | |
if name == 'entry': | |
break | |
if name == 'id': | |
self.blog.blog_id = self.get_text(child) | |
elif name == 'updated': | |
self.blog.updated = self.parse_date(self.get_text(child)) | |
elif name == 'title': | |
self.blog.title = self.get_text(child) | |
elif name == 'author': | |
self.blog.author = self.parse_author(child) | |
def parse_author(self, author): | |
data = Blog.Author() | |
for child in author.childNodes: | |
name = child.nodeName.split(':')[-1] | |
if name == 'name': | |
data.name = self.get_text(child) | |
if len(data.name) < 3: | |
data.name = 'Anonymous' | |
else: | |
# automatically generated email has a max of 75 characters including @wordpress.disqus.net, so use a sane value | |
data.name = data.name[:42] | |
elif name == 'uri': | |
data.uri = self.get_text(child) | |
elif name == 'email': | |
data.email = self.get_text(child) | |
return data | |
def parse_entries(self, feed): | |
self.posts = [] | |
self.comments = [] | |
self.post_ids = {} | |
self.comment_ids = {} | |
for child in feed.childNodes: | |
if child.nodeName != 'entry': | |
continue | |
self.parse_entry(child) | |
self.assign_comments() | |
self.blog.posts = self.posts | |
total = len(self.blog.posts) | |
d('total posts:', total) | |
d('total comments:', len(self.comments)) | |
# for i, post in enumerate(self.blog.posts): | |
# d('(%d / %d) -> %d: %s' % (i+1, total, len(post.comments), post.title)) | |
def assign_comments(self): | |
i = 0 | |
for comment in self.comments: | |
entry_id = comment.post_entry_id | |
if entry_id not in self.post_ids: | |
continue | |
post = self.post_ids[entry_id] | |
post.comments.append(comment) | |
# d('%s. comment: %s -> %s' % (i+1, id(comment), id(post))) | |
i += 1 | |
def parse_category(self, category): | |
scheme = category.attributes['scheme'].nodeValue | |
term = category.attributes['term'].nodeValue | |
return scheme, term | |
def get_kind(self, entry): | |
for child in entry.childNodes: | |
if child.nodeName == 'category': | |
scheme, term = self.parse_category(child) | |
if scheme == 'http://schemas.google.com/g/2005#kind': | |
return term | |
def parse_entry(self, entry): | |
kind = self.get_kind(entry) | |
if kind == 'http://schemas.google.com/blogger/2008/kind#post': | |
post = self.parse_post(entry) | |
self.posts.append(post) | |
self.post_ids[post.entry_id] = post | |
elif kind == 'http://schemas.google.com/blogger/2008/kind#comment': | |
comment = self.parse_comment(entry) | |
self.comments.append(comment) | |
self.comment_ids[comment.entry_id] = comment | |
def get_draft(self, control): | |
for child in control.childNodes: | |
name = child.nodeName.split(':')[-1] | |
if name == 'draft': | |
return self.get_text(child) == 'yes' | |
return False | |
def parse_entry_common(self, entry, target): | |
for child in entry.childNodes: | |
name = child.nodeName.split(':')[-1] | |
ns = child.namespaceURI | |
if name == 'id': | |
target.entry_id = self.get_text(child) | |
elif name == 'published': | |
target.published = self.parse_date(self.get_text(child)) | |
elif name == 'updated': | |
target.updated = self.parse_date(self.get_text(child)) | |
elif name == 'title': | |
target.title = self.get_text(child) | |
target.title_type = child.attributes['type'].nodeValue | |
elif name == 'content': | |
target.content = self.get_text(child) | |
target.content = target.content.ljust(3, '.') # Disqus requires 3 characters | |
target.content_type = child.attributes['type'].nodeValue | |
elif name == 'author': | |
target.author = self.parse_author(child) | |
elif name == 'link': | |
rel = child.attributes['rel'].nodeValue | |
href = child.attributes['href'].nodeValue | |
if rel == 'self': | |
target.permalink = href | |
elif rel == 'alternate': | |
target.url = href | |
def parse_post(self, entry): | |
post = Blog.Post() | |
self.parse_entry_common(entry, post) | |
for child in entry.childNodes: | |
name = child.nodeName.split(':')[-1] | |
ns = child.namespaceURI | |
if name == 'category': | |
scheme, term = self.parse_category(child) | |
if scheme == 'http://www.blogger.com/atom/ns#': | |
post.labels.append(term) | |
elif ns == 'http://purl.org/atom/app#' and name == 'control': | |
post.draft = self.get_draft(child) | |
return post | |
def parse_comment(self, entry): | |
comment = Blog.Comment() | |
self.parse_entry_common(entry, comment) | |
for child in entry.childNodes: | |
name = child.nodeName.split(':')[-1] | |
ns = child.namespaceURI | |
if ns == 'http://purl.org/syndication/thread/1.0' and name == 'in-reply-to': | |
ref = child.attributes['ref'].nodeValue | |
comment.post_entry_id = ref | |
if name == 'link' and child.attributes['rel'].nodeValue == 'related': | |
related = child.attributes['href'].nodeValue | |
comment.related = related[related.rfind('/') + 1 :] | |
return comment | |
class WXRWriter(object): | |
comment_status = 'open' | |
def __init__(self, blog): | |
self.blog = blog | |
def write(self): | |
self.post_id = 0 | |
self.comment_id = 0 | |
doc = self.get_header() + self.get_entries() + self.get_footer() | |
doc = [line.strip() for line in doc] | |
doc = '\n'.join(doc) | |
return unicode(doc).encode('utf-8') | |
def get_header(self): | |
res = [] | |
res.append('<?xml version="1.0" encoding="UTF-8" ?>') | |
res.append('<rss version="2.0"') | |
res.append(' xmlns:content="http://purl.org/rss/1.0/modules/content/"') | |
res.append(' xmlns:dsq="http://www.disqus.com/"') | |
res.append(' xmlns:dc="http://purl.org/dc/elements/1.1/"') | |
res.append(' xmlns:wp="http://wordpress.org/export/1.0/">') | |
res.append('<channel>') | |
return res | |
def get_footer(self): | |
res = [] | |
res.append('</channel>') | |
res.append('</rss>') | |
return res | |
def get_entries(self): | |
res = [] | |
for post in self.blog.posts: | |
res += self.get_post(post) | |
return res | |
def get_date(self, ts): | |
return ts.strftime("%a, %d %b %Y %H:%M:%S +0000") | |
def get_date_wp(self, ts): | |
return ts.strftime("%Y-%m-%d %H:%M:%S") | |
def escape(self, s): | |
return cgi.escape(s).encode('ascii', 'xmlcharrefreplace') | |
def unescape(self, s): | |
parser = HTMLParser() | |
return parser.unescape(s) | |
def get_comment_id(self, comment): | |
self.comment_id += 1 | |
comment_id_blogger = comment.entry_id[comment.entry_id.rfind('-') + 1 :] | |
self.post_comment_ids[comment_id_blogger] = self.comment_id | |
def get_comment(self, comment): | |
status = 1 | |
res = [] | |
self.comment_id += 1 | |
res.append(' <wp:comment>') | |
res.append(' <wp:comment_id>%s</wp:comment_id>' % self.comment_id) | |
if not comment.author.name: | |
comment.author.name = 'Anonymous' | |
res.append(' <wp:comment_author><![CDATA[%s]]></wp:comment_author>' % comment.author.name) | |
# Blogger has the [email protected] email by default. Uncomment if you need it. | |
# if comment.author.email: | |
# res.append(' <wp:comment_author_email>%s</wp:comment_author_email>' % comment.author.email) | |
if comment.author.uri: | |
res.append(' <wp:comment_author_url><![CDATA[%s]]></wp:comment_author_url>' % comment.author.uri) | |
res.append(' <wp:comment_author_IP>%s</wp:comment_author_IP>' % '') | |
res.append(' <wp:comment_date_gmt>%s</wp:comment_date_gmt>' % self.get_date_wp(comment.published)) | |
res.append(' <wp:comment_content><![CDATA[%s]]></wp:comment_content>' % self.unescape(comment.content)) | |
res.append(' <wp:comment_approved>%s</wp:comment_approved>' % status) | |
if comment.related: | |
if comment.related in self.post_comment_ids: | |
res.append(' <wp:comment_parent>%s</wp:comment_parent>' % self.post_comment_ids[comment.related]) | |
else: | |
d('could not find related comment %s for comment entry %s (comment_id %s)' % (comment.related, comment.entry_id, self.comment_id)) | |
res.append(' </wp:comment>') | |
return res | |
def get_post(self, post): | |
if post.content.strip() == '': | |
return [] | |
res = [] | |
slug = None | |
if post.url is not None: | |
slug = post.url.split('/')[-1] | |
slug = slug[:-5] | |
status = 'publish' | |
if post.draft: | |
status = 'draft' | |
self.post_id += 1 | |
res.append('<item>') | |
res.append(' <title><![CDATA[%s]]></title>' % self.escape(post.title)) | |
res.append(' <link>%s</link>' % post.url) | |
res.append(' <content:encoded><![CDATA[%s]]></content:encoded>' % self.escape(post.content)) | |
res.append(' <wp:post_date_gmt>%s</wp:post_date_gmt>' % self.get_date_wp(post.published)) | |
res.append(' <wp:comment_status>%s</wp:comment_status>' % self.comment_status) | |
self.post_comment_ids = {} | |
old_comment_id = self.comment_id | |
for comment in post.comments: | |
self.get_comment_id(comment) | |
self.comment_id = old_comment_id | |
for comment in post.comments: | |
res += self.get_comment(comment) | |
res.append('</item>') | |
return res | |
p = BlogParser(inp) | |
blog = p.parse() | |
writer = WXRWriter(blog) | |
xml = writer.write() | |
print xml |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment