Created
April 25, 2013 14:14
-
-
Save playpauseandstop/5460016 to your computer and use it in GitHub Desktop.
Import posts from Posterous backup to Blogger account.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# Import posts from Posterous backup to Blogger account. | |
# | |
# Requirements | |
# ============ | |
# | |
# * `Python <http://www.python.org/>` 2.6 or 2.7 | |
# * `GData <http://pypi.python.org/pypi/gdata>`_ 2.0.17 or higher | |
# | |
# Installation | |
# ============ | |
# | |
# Download script, place to $PATH and give execution rights, like:: | |
# | |
# $ chmod +x ~/bin/posterous-to-blogger.py | |
# | |
# License | |
# ======= | |
# | |
# Script licensed under the terms of `BSD License | |
# <http://opensource.org/licenses/BSD-3-Clause>`_. | |
# | |
# Usage | |
# ===== | |
# | |
# :: | |
# | |
# $ posterous-to-blogger.py | |
# | |
from __future__ import print_function | |
import datetime | |
import glob | |
import operator | |
import os | |
import readline | |
import sys | |
import zipfile | |
from functools import partial | |
from getpass import getpass | |
from xml.etree import ElementTree | |
import atom | |
from gdata.blogger import BlogPostEntry | |
from gdata.blogger.service import BloggerService | |
from gdata.service import BadAuthentication, Query as GDataQuery | |
FOOTER_XML = """</channel> | |
</rss> | |
""" | |
XMLNS_MAP = { | |
'content': 'http://purl.org/rss/1.0/modules/content/', | |
'wp': 'http://wordpress.org/export/1.0/', | |
} | |
def build_item_element(head, body, footer=None): | |
""" | |
Concatenate head, body and footer parts of item XML and parse whole content | |
with ElementTree and returns proper item Element. | |
""" | |
item = '\n'.join((head.strip(), body.strip(), footer or FOOTER_XML)) | |
return ElementTree.fromstring(item).find('.//item') | |
def convert_posterous_item_to_dict(item): | |
""" | |
Convert posterous XML item to simple Python dict. | |
""" | |
children = list(item) | |
content = filter(partial(match_element, 'content:encoded'), children)[0] | |
post_id = filter(partial(match_element, 'wp:post_id'), children)[0] | |
# As wp:post_date_gmt stores same shit as pubDate does, do datetime | |
# manipulation here to convert PST or PST+DST date to GMT value | |
published = item.find('pubDate').text | |
is_dst = '-0700' in published | |
published = published.replace('-0700', '').replace('-0800', '') | |
published = datetime.datetime.strptime(published, '%a %b %d %H:%M:%S %Y') | |
published += datetime.timedelta(hours=7 if is_dst else 8) | |
return { | |
'id': int(post_id.text), | |
'title': item.find('title').text, | |
'content': prepare_content(content.text), | |
'published': published, | |
'link': item.find('guid').text, | |
'tags': map(lambda tag: tag.get('nicename').replace('+', ' '), | |
item.findall('./category[@domain="tag"]')), | |
} | |
def get_blogger_blog_id(blog): | |
""" | |
Get blog ID from <Entry> atom tag. | |
""" | |
return int(blog.GetSelfLink().href.split('/')[-1]) | |
def main(): | |
""" | |
Get user data to do import. | |
""" | |
print('Import blog posts from Posterous to Google\n') | |
blogger = step_1() | |
blog_id = step_2(blogger) | |
posts = step_3() | |
if step_4(blogger, blog_id, posts): | |
print('\nAll OK!') | |
def match_element(name, element): | |
""" | |
Returns True if ``name`` is namespaced name of element's tag. | |
""" | |
if not hasattr(match_element, 'reversed_map'): | |
setattr(match_element, | |
'reversed_map', | |
dict(zip(XMLNS_MAP.values(), XMLNS_MAP.keys()))) | |
namespace = None | |
reversed_map = getattr(match_element, 'reversed_map') | |
if ':' in name: | |
namespace, name = name.split(':') | |
if namespace and not namespace in XMLNS_MAP: | |
return False | |
if not '{' in element.tag: | |
return name == element.tag | |
uri, tag = element.tag.split('}') | |
uri = uri.lstrip('{') | |
return reversed_map.get(uri) == namespace and name == tag | |
def prepare_content(content): | |
""" | |
Prepare Posterous post content to Blogger rules. | |
""" | |
return (content.replace(u'<br />', u'\n'). | |
replace(u'<div class="CodeRay">', u''). | |
replace(u'<div class="code">', u''). | |
replace(u'</div>', u''). | |
replace(u'<pre>', | |
u'<pre class="python" name="code"><code>'). | |
replace(u'</pre>', u'</code></pre>')) | |
def setup_readline(): | |
""" | |
Setup readline auto completer. | |
""" | |
def complete(text, state): | |
""" | |
Auto complete function for filepathes, based on glob. | |
""" | |
if text.startswith('~'): | |
text = os.path.expanduser(text) | |
return (glob.glob('{0}*'.format(text)) + [None])[state] | |
readline.set_completer_delims(' \t\n;') | |
readline.parse_and_bind('tab: complete') | |
readline.set_completer(complete) | |
def step_1(): | |
""" | |
Setup Google username/password and if all OK returns initialized and logged | |
in Google service. | |
""" | |
print('Step 1. Credentials') | |
# Google username | |
username = raw_input('Enter your Google username: ') | |
# And password | |
password = getpass('And password: ') | |
# Check credentials | |
blogger = BloggerService() | |
try: | |
blogger.ClientLogin(username, password) | |
except BadAuthentication as err: | |
print('ERROR: {0}. Exit...'.format(err), file=sys.stderr) | |
sys.exit(1) | |
return blogger | |
def step_2(blogger): | |
""" | |
Read list of all blogs for logged in account and if blogs number higher | |
than 1, select blog to import in. | |
""" | |
print('\nStep 2. Blogger') | |
# Get blog ID, title list | |
query = GDataQuery() | |
query.feed = '/feeds/default/blogs' | |
feed = blogger.Get(query.ToUri()) | |
blogs = [(get_blogger_blog_id(blog), blog.title.text) | |
for blog in feed.entry] | |
if not blogs: | |
print('ERROR: No blogs to import in. Exit...', file=sys.stderr) | |
sys.exit(1) | |
if len(blogs) == 1: | |
print('Auto-selected: {0}'.format(blog_title)) | |
return blogs[0][0] | |
for i, blog_data in enumerate(blogs): | |
print('{0}. {1}'.format(i + 1, blog_data[1])) | |
while True: | |
index = raw_input('Select blog to import in [1-{}]: '. | |
format(i + 1)) | |
try: | |
index = int(index) | |
if not (1 <= index <= i + 1): | |
raise ValueError('Wrong index') | |
except (TypeError, ValueError): | |
print('Please, input valid number, between 1 and {}'. | |
format(i + 1)) | |
continue | |
break | |
return blogs[index - 1][0] | |
def step_3(): | |
""" | |
Provide path to Posterous backup file. | |
""" | |
print('\nStep 3. Posterous Backup') | |
# Setup readline to autocomplete file path | |
setup_readline() | |
filename = raw_input('Path to zip backup file: ') | |
# Check ZIP file contents and tries to found XML posts in /mnt/posts | |
# directory of archive | |
if not os.path.isfile(filename): | |
print('ERROR: File does not exist: {0}. Exit...'.format(filename)) | |
sys.exit(1) | |
try: | |
with zipfile.ZipFile(filename) as handler: | |
filenames = filter( | |
lambda name: 'posts/' in name and name.endswith('.xml'), | |
handler.namelist() | |
) | |
items = map(handler.read, filenames) | |
head_filename = filter( | |
operator.methodcaller('endswith', 'head.xml'), | |
handler.namelist() | |
)[0] | |
head = handler.read(head_filename) | |
except (IndexError, zipfile.error): | |
print('ERROR: Cannot read export file from backup archive. Exit...') | |
sys.exit(1) | |
items = map(partial(build_item_element, head), items) | |
return sorted(map(convert_posterous_item_to_dict, items), | |
key=operator.itemgetter('id')) | |
def step_4(blogger, blog_id, posts): | |
""" | |
Real import process. | |
Import all items from posts sequence to blog ID using authenticated blogger | |
client. | |
""" | |
print('\nStep 4. Import process') | |
while True: | |
answer = raw_input('Are you sure to import {} posts to your Blogger? ' | |
'[Y/n] '.format(len(posts))).lower() | |
if not answer in ('', 'y', 'n'): | |
print('Wrong answer: {!r}'.format(answer)) | |
continue | |
if answer == 'n': | |
print('Exit...') | |
return False | |
break | |
blog_uri = '/feeds/{0}/posts/default'.format(blog_id) | |
for post in posts: | |
entry = BlogPostEntry() | |
entry.title = atom.Title('xhtml', post['title']) | |
entry.content = atom.Content('html', text=post['content']) | |
published = '{:%Y-%m-%dT%H%:%M:%S+00:00}'.format(post['published']) | |
entry.published = atom.Published(published) | |
map(entry.AddLabel, post['tags']) | |
blogger.Post(entry, blog_uri) | |
return True | |
if __name__ == '__main__': | |
try: | |
main() | |
except KeyboardInterrupt: | |
print('\nOK! OK! Exiting...', file=sys.stderr) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment