Skip to content

Instantly share code, notes, and snippets.

@playpauseandstop
Created April 25, 2013 14:14
Show Gist options
  • Save playpauseandstop/5460016 to your computer and use it in GitHub Desktop.
Save playpauseandstop/5460016 to your computer and use it in GitHub Desktop.
Import posts from Posterous backup to Blogger account.
#!/usr/bin/env python
#
# Import posts from Posterous backup to Blogger account.
#
# Requirements
# ============
#
# * `Python <http://www.python.org/>` 2.6 or 2.7
# * `GData <http://pypi.python.org/pypi/gdata>`_ 2.0.17 or higher
#
# Installation
# ============
#
# Download script, place to $PATH and give execution rights, like::
#
# $ chmod +x ~/bin/posterous-to-blogger.py
#
# License
# =======
#
# Script licensed under the terms of `BSD License
# <http://opensource.org/licenses/BSD-3-Clause>`_.
#
# Usage
# =====
#
# ::
#
# $ posterous-to-blogger.py
#
from __future__ import print_function
import datetime
import glob
import operator
import os
import readline
import sys
import zipfile
from functools import partial
from getpass import getpass
from xml.etree import ElementTree
import atom
from gdata.blogger import BlogPostEntry
from gdata.blogger.service import BloggerService
from gdata.service import BadAuthentication, Query as GDataQuery
FOOTER_XML = """</channel>
</rss>
"""
XMLNS_MAP = {
'content': 'http://purl.org/rss/1.0/modules/content/',
'wp': 'http://wordpress.org/export/1.0/',
}
def build_item_element(head, body, footer=None):
"""
Concatenate head, body and footer parts of item XML and parse whole content
with ElementTree and returns proper item Element.
"""
item = '\n'.join((head.strip(), body.strip(), footer or FOOTER_XML))
return ElementTree.fromstring(item).find('.//item')
def convert_posterous_item_to_dict(item):
"""
Convert posterous XML item to simple Python dict.
"""
children = list(item)
content = filter(partial(match_element, 'content:encoded'), children)[0]
post_id = filter(partial(match_element, 'wp:post_id'), children)[0]
# As wp:post_date_gmt stores same shit as pubDate does, do datetime
# manipulation here to convert PST or PST+DST date to GMT value
published = item.find('pubDate').text
is_dst = '-0700' in published
published = published.replace('-0700', '').replace('-0800', '')
published = datetime.datetime.strptime(published, '%a %b %d %H:%M:%S %Y')
published += datetime.timedelta(hours=7 if is_dst else 8)
return {
'id': int(post_id.text),
'title': item.find('title').text,
'content': prepare_content(content.text),
'published': published,
'link': item.find('guid').text,
'tags': map(lambda tag: tag.get('nicename').replace('+', ' '),
item.findall('./category[@domain="tag"]')),
}
def get_blogger_blog_id(blog):
"""
Get blog ID from <Entry> atom tag.
"""
return int(blog.GetSelfLink().href.split('/')[-1])
def main():
"""
Get user data to do import.
"""
print('Import blog posts from Posterous to Google\n')
blogger = step_1()
blog_id = step_2(blogger)
posts = step_3()
if step_4(blogger, blog_id, posts):
print('\nAll OK!')
def match_element(name, element):
"""
Returns True if ``name`` is namespaced name of element's tag.
"""
if not hasattr(match_element, 'reversed_map'):
setattr(match_element,
'reversed_map',
dict(zip(XMLNS_MAP.values(), XMLNS_MAP.keys())))
namespace = None
reversed_map = getattr(match_element, 'reversed_map')
if ':' in name:
namespace, name = name.split(':')
if namespace and not namespace in XMLNS_MAP:
return False
if not '{' in element.tag:
return name == element.tag
uri, tag = element.tag.split('}')
uri = uri.lstrip('{')
return reversed_map.get(uri) == namespace and name == tag
def prepare_content(content):
"""
Prepare Posterous post content to Blogger rules.
"""
return (content.replace(u'<br />', u'\n').
replace(u'<div class="CodeRay">', u'').
replace(u'<div class="code">', u'').
replace(u'</div>', u'').
replace(u'<pre>',
u'<pre class="python" name="code"><code>').
replace(u'</pre>', u'</code></pre>'))
def setup_readline():
"""
Setup readline auto completer.
"""
def complete(text, state):
"""
Auto complete function for filepathes, based on glob.
"""
if text.startswith('~'):
text = os.path.expanduser(text)
return (glob.glob('{0}*'.format(text)) + [None])[state]
readline.set_completer_delims(' \t\n;')
readline.parse_and_bind('tab: complete')
readline.set_completer(complete)
def step_1():
"""
Setup Google username/password and if all OK returns initialized and logged
in Google service.
"""
print('Step 1. Credentials')
# Google username
username = raw_input('Enter your Google username: ')
# And password
password = getpass('And password: ')
# Check credentials
blogger = BloggerService()
try:
blogger.ClientLogin(username, password)
except BadAuthentication as err:
print('ERROR: {0}. Exit...'.format(err), file=sys.stderr)
sys.exit(1)
return blogger
def step_2(blogger):
"""
Read list of all blogs for logged in account and if blogs number higher
than 1, select blog to import in.
"""
print('\nStep 2. Blogger')
# Get blog ID, title list
query = GDataQuery()
query.feed = '/feeds/default/blogs'
feed = blogger.Get(query.ToUri())
blogs = [(get_blogger_blog_id(blog), blog.title.text)
for blog in feed.entry]
if not blogs:
print('ERROR: No blogs to import in. Exit...', file=sys.stderr)
sys.exit(1)
if len(blogs) == 1:
print('Auto-selected: {0}'.format(blog_title))
return blogs[0][0]
for i, blog_data in enumerate(blogs):
print('{0}. {1}'.format(i + 1, blog_data[1]))
while True:
index = raw_input('Select blog to import in [1-{}]: '.
format(i + 1))
try:
index = int(index)
if not (1 <= index <= i + 1):
raise ValueError('Wrong index')
except (TypeError, ValueError):
print('Please, input valid number, between 1 and {}'.
format(i + 1))
continue
break
return blogs[index - 1][0]
def step_3():
"""
Provide path to Posterous backup file.
"""
print('\nStep 3. Posterous Backup')
# Setup readline to autocomplete file path
setup_readline()
filename = raw_input('Path to zip backup file: ')
# Check ZIP file contents and tries to found XML posts in /mnt/posts
# directory of archive
if not os.path.isfile(filename):
print('ERROR: File does not exist: {0}. Exit...'.format(filename))
sys.exit(1)
try:
with zipfile.ZipFile(filename) as handler:
filenames = filter(
lambda name: 'posts/' in name and name.endswith('.xml'),
handler.namelist()
)
items = map(handler.read, filenames)
head_filename = filter(
operator.methodcaller('endswith', 'head.xml'),
handler.namelist()
)[0]
head = handler.read(head_filename)
except (IndexError, zipfile.error):
print('ERROR: Cannot read export file from backup archive. Exit...')
sys.exit(1)
items = map(partial(build_item_element, head), items)
return sorted(map(convert_posterous_item_to_dict, items),
key=operator.itemgetter('id'))
def step_4(blogger, blog_id, posts):
"""
Real import process.
Import all items from posts sequence to blog ID using authenticated blogger
client.
"""
print('\nStep 4. Import process')
while True:
answer = raw_input('Are you sure to import {} posts to your Blogger? '
'[Y/n] '.format(len(posts))).lower()
if not answer in ('', 'y', 'n'):
print('Wrong answer: {!r}'.format(answer))
continue
if answer == 'n':
print('Exit...')
return False
break
blog_uri = '/feeds/{0}/posts/default'.format(blog_id)
for post in posts:
entry = BlogPostEntry()
entry.title = atom.Title('xhtml', post['title'])
entry.content = atom.Content('html', text=post['content'])
published = '{:%Y-%m-%dT%H%:%M:%S+00:00}'.format(post['published'])
entry.published = atom.Published(published)
map(entry.AddLabel, post['tags'])
blogger.Post(entry, blog_uri)
return True
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('\nOK! OK! Exiting...', file=sys.stderr)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment