|
#!/usr/bin/python |
|
# https://github.com/tgray |
|
|
|
import xmlrpclib |
|
import sys, os |
|
from datetime import datetime |
|
import keyring |
|
import argparse |
|
import pytz |
|
|
|
"""Interacts with a wordpress installation to list, fetch, and publish posts. |
|
|
|
Files have the following simple format: |
|
|
|
Title: your title here |
|
Date: 2015-01-02 23:16:12 |
|
Tags: tag1, tag2 |
|
Status: draft |
|
Comments: open |
|
|
|
Body of the post after an empty line. |
|
|
|
After a post is published, the script returns the published version. The idea |
|
is to run this script as a filter, replacing your edited draft with the |
|
published version. |
|
|
|
The default setting for post status = draft and comments = open. |
|
|
|
Wordpress credentials are read from the file ~/.xmlrpc. The first time running |
|
the script will display an instruction of what the file should have. |
|
""" |
|
|
|
helpstr="""Must set XML-RPC url and user name in ~/.xmlrpc. One can also |
|
set the optional timezone (see pytz documentation). Example contents: |
|
|
|
url: http://www.example.com/xmlrpc.php |
|
user: name |
|
pytz: US/Eastern""" |
|
|
|
# The header fields and their wp blog synonyms. |
|
hFields = ['title', 'date', 'post', 'type', 'tags', 'slug', 'category', |
|
'link', 'short link', 'status', 'comments'] |
|
wpFields = ['post_title', 'post_date_gmt', 'post_id', 'post_type', |
|
'terms_names', 'post_name', 'terms_names', 'link', 'guid', 'post_status', |
|
'comment_status'] |
|
h2wp = dict(zip(hFields, wpFields)) |
|
|
|
wpdtfmt = "%Y%m%dT%H:%M:%S" |
|
mydtfmt = "%Y-%m-%d %H:%M:%S" |
|
|
|
def getTZ(userdat): |
|
"""Get timezone info and return it.""" |
|
# Time zones. WP is trustworthy only in UTC. |
|
utc = pytz.utc |
|
try: |
|
myTZ = pytz.timezone(userdat['pytz']) |
|
except: |
|
myTZ = pytz.timezone('US/Eastern') |
|
return utc, myTZ |
|
|
|
def get_cred(): |
|
"""Get XML-RPC credentials for ~/.xmlrpc.""" |
|
# The blog's XMLRPC URL and username. |
|
try: |
|
f = open(os.path.join(os.environ['HOME'], '.xmlrpc')) |
|
lines = f.readlines() |
|
userdat = dict([[a.strip() for a in l.split(':', 1)] for l in lines]) |
|
f.close() |
|
except: |
|
print(helpstr) |
|
sys.exit(1) |
|
|
|
return userdat |
|
|
|
def open_anything(source): |
|
"""Open a file or stdin and return the handler.""" |
|
if source == "-": |
|
import sys |
|
return sys.stdin |
|
else: |
|
return open(source, 'r') |
|
|
|
def slugify(text, delim=u'-'): |
|
"""Generates a slightly worse ASCII-only slug.""" |
|
from unicodedata import normalize |
|
result = [] |
|
for word in _punct_re.split(text.lower()): |
|
word = normalize('NFKD', word).encode('ascii', 'ignore') |
|
if word: |
|
result.append(word) |
|
return unicode(delim.join(result)) |
|
|
|
def list_posts(args, userdat): |
|
"""Get a list of posts and return.""" |
|
|
|
postFilter = {} |
|
|
|
postFilter['number'] = args.number |
|
if args.drafts: |
|
postFilter['post_status'] = 'draft' |
|
|
|
|
|
# connection stuff |
|
user = userdat['user'] |
|
url = userdat['url'] |
|
# Get the password from Keychain. |
|
password = keyring.get_password(url, user) |
|
# Connect. |
|
server = xmlrpclib.ServerProxy(url) |
|
|
|
datefmt = '%_m/%d/%y' |
|
# Return a list of post IDs and titles. |
|
try: |
|
posts = server.wp.getPosts(0, user, password, postFilter) |
|
except xmlrpclib.Fault as err: |
|
print "A fault occurred" |
|
print "Fault code: %d" % err.faultCode |
|
print "Fault string: %s" % err.faultString |
|
sys.exit(3) |
|
output = [] |
|
for p in posts: |
|
# just in case there is a date issue (I've seen it) put the following |
|
# line in a try |
|
try: |
|
dt = datetime.strptime(p['post_date'].value, wpdtfmt) |
|
except: |
|
dt = datetime.fromtimestamp(0) |
|
# format the output |
|
s = u'{0:>5} - {1} - {2} ({3})'.format(p['post_id'], |
|
dt.strftime(datefmt), p['post_title'], p['post_status']) |
|
output.append(s.encode('utf8')) |
|
textout = '\n'.join(output) |
|
return textout |
|
|
|
def get_url(post_id, userdat): |
|
"""Get the url of a post and return. |
|
|
|
Will get the actual url if it can, otherwise will return the short url.""" |
|
|
|
# connection stuff |
|
user = userdat['user'] |
|
url = userdat['url'] |
|
# Get the password from Keychain. |
|
password = keyring.get_password(url, user) |
|
# Connect. |
|
server = xmlrpclib.ServerProxy(url) |
|
|
|
try: |
|
post = server.wp.getPost(0, user, password, post_id) |
|
except xmlrpclib.Fault as err: |
|
print "A fault occurred" |
|
print "Fault code: %d" % err.faultCode |
|
print "Fault string: %s" % err.faultString |
|
sys.exit(3) |
|
|
|
fields = ['link', 'short link'] |
|
urls = dict([(k, post[h2wp[k]]) for k in fields]) |
|
if not urls['link']: |
|
url = urls['short link'] |
|
else: |
|
url = urls['link'] |
|
return url |
|
|
|
def fetch_post(post_id, userdat): |
|
"""Fetch a post and return it's content.""" |
|
utc, myTZ = getTZ(userdat) |
|
|
|
# connection stuff |
|
user = userdat['user'] |
|
url = userdat['url'] |
|
# Get the password from Keychain. |
|
password = keyring.get_password(url, user) |
|
# Connect. |
|
server = xmlrpclib.ServerProxy(url) |
|
|
|
try: |
|
post = server.wp.getPost(0, user, password, post_id) |
|
except xmlrpclib.Fault as err: |
|
print "A fault occurred" |
|
print "Fault code: %d" % err.faultCode |
|
print "Fault string: %s" % err.faultString |
|
sys.exit(3) |
|
|
|
|
|
output = [] |
|
# fix date - UTC to myTZ |
|
dt = datetime.strptime(post[h2wp['date']].value, wpdtfmt) |
|
post[h2wp['date']] = utc.localize(dt).astimezone(myTZ).strftime(mydtfmt) |
|
|
|
# do my tags up |
|
tags = [] |
|
cats = [] |
|
for i in post['terms']: |
|
if i['taxonomy'] == 'post_tag': |
|
tags.append(i['name']) |
|
elif i['taxonomy'] == 'category': |
|
cats.append(i['name']) |
|
post_tags = {'tags': ', '.join(tags), 'category': ', '.join(cats)} |
|
|
|
# grab the content and format it for output, translating from wp labels to |
|
# mine |
|
# if the type is a page, we need no stinking tags (or categories) |
|
hFields2 = hFields[:] |
|
if post['post_type'] in ['page',]: |
|
hFields2.remove('tags') |
|
hFields2.remove('category') |
|
for k in hFields2: |
|
if k in ['tags', 'category']: |
|
v = post_tags[k] |
|
else: |
|
v = post[h2wp[k]] |
|
s = '{}: {}'.format(k.capitalize(), v) |
|
output.append(s.encode('utf8')) |
|
|
|
output.append('') |
|
output.append('') |
|
output.append(post['post_content'].encode('utf8')) |
|
|
|
textout = '\n'.join(output) |
|
return textout |
|
|
|
def publish_post(infile, userdat): |
|
"""Parse and publish a post.""" |
|
utc, myTZ = getTZ(userdat) |
|
|
|
fn = open_anything(infile) |
|
lines = fn.readlines() |
|
fn.close() |
|
file_contents = ''.join(lines) |
|
|
|
header_chunk, body = file_contents.split('\n\n', 1) |
|
|
|
# split lines on ':', strip and cleanup, and rezip |
|
# The following list comprehension works but is less than clear. I think |
|
# it is clearer to break it into multiple steps |
|
# |
|
# hdict = dict([[k.lower(), v.strip()] for [k,v] in |
|
# [i.split(':', 1) for i in header_chunk.split('\n')]]) |
|
|
|
k, v = zip(*[i.split(':', 1) for i in header_chunk.split('\n')]) |
|
k = [i.strip().lower() for i in k] |
|
v = [i.strip() for i in v] |
|
hdict = dict(zip(k, v)) |
|
|
|
# set some defaults |
|
post_dict = {'comment_status': 'open', 'post_status': 'draft'} |
|
|
|
# convert the date from myTZ to UTC and put it in xml-rpc compat format |
|
dt = datetime.strptime(hdict.pop('date'), mydtfmt) |
|
dt = myTZ.localize(dt) |
|
post_dict[h2wp['date']] = xmlrpclib.DateTime(dt.astimezone(utc)) |
|
|
|
# get that tags and category right |
|
terms = {} |
|
if 'tags' in hdict: |
|
terms['post_tag'] = [i.strip() for i in hdict.pop('tags').split(',')] |
|
if 'category' in hdict: |
|
terms['category'] = [i.strip() for i in |
|
hdict.pop('category').split(',')] |
|
# if the type is a page, we need no stinking tags (or categories) |
|
if hdict['type'] not in ['page',]: |
|
post_dict.update({'terms_names': terms}) |
|
|
|
# loop through my headers and add them to the post dict |
|
for k in hdict: |
|
post_dict[h2wp[k]] = hdict[k] |
|
post_dict['post_content'] = body.strip() |
|
|
|
# connection stuff |
|
user = userdat['user'] |
|
url = userdat['url'] |
|
# Get the password from Keychain. |
|
password = keyring.get_password(url, user) |
|
# Connect. |
|
server = xmlrpclib.ServerProxy(url) |
|
|
|
if 'post_id' in post_dict: |
|
pid = post_dict.pop('post_id') |
|
#print post_dict |
|
try: |
|
server.wp.editPost(0, user, password, int(pid), post_dict) |
|
except xmlrpclib.Fault as err: |
|
print "A fault occurred" |
|
print "Fault code: %d" % err.faultCode |
|
print "Fault string: %s" % err.faultString |
|
sys.exit(3) |
|
else: |
|
try: |
|
pid = server.wp.newPost(0, user, password, post_dict) |
|
except xmlrpclib.Fault as err: |
|
print "A fault occurred" |
|
print "Fault code: %d" % err.faultCode |
|
print "Fault string: %s" % err.faultString |
|
sys.exit(3) |
|
return pid |
|
|
|
def main(argv=None): |
|
|
|
if argv is None: |
|
argv = sys.argv |
|
programName = os.path.basename(argv[0]) |
|
|
|
parser = argparse.ArgumentParser( |
|
description = ( |
|
'Publish, list, and fetch posts for a Wordpress blog ' |
|
'using XML-RPC.')) |
|
|
|
group = parser.add_mutually_exclusive_group(required = True) |
|
group_list = parser.add_argument_group('list options') |
|
|
|
group.add_argument('-l', '--list', |
|
dest = 'list', |
|
action = 'store_true', |
|
help = 'list posts') |
|
|
|
group.add_argument('-f', '--fetch', |
|
dest = 'fetch', |
|
action = 'store', |
|
type = int, |
|
metavar = 'ID', |
|
help = 'fetch post with ID') |
|
|
|
group.add_argument('-u', '--url', |
|
dest = 'url', |
|
action = 'store', |
|
type = int, |
|
metavar = 'ID', |
|
help = 'get url for post with ID') |
|
|
|
group_list.add_argument('-n', '--number', |
|
dest = 'number', |
|
action = 'store', |
|
default = 15, |
|
type = int, |
|
metavar = 'N', |
|
help = 'number of posts to list') |
|
|
|
group_list.add_argument('-d', '--drafts', |
|
dest = 'drafts', |
|
action = 'store_true', |
|
help = 'list only draft posts') |
|
|
|
group.add_argument('-p', '--post', |
|
dest = 'post', |
|
action = 'store_true', |
|
help = 'publishes a post, defaults to stdin') |
|
|
|
parser.add_argument('infile', |
|
nargs='?', |
|
default = '-', |
|
metavar = 'FILENAME', |
|
help = 'file to publish') |
|
|
|
|
|
args = parser.parse_args() |
|
if (args.drafts) and not (args.list): |
|
print("Options -d and -n must be used in conjunction with -l") |
|
sys.exit(2) |
|
|
|
userdat = get_cred() |
|
|
|
if args.list: |
|
output = list_posts(args, userdat) |
|
elif args.url: |
|
output = get_url(args.url, userdat) |
|
elif args.fetch: |
|
output = fetch_post(args.fetch, userdat) |
|
else: |
|
pid = publish_post(args.infile, userdat) |
|
output = fetch_post(pid, userdat) |
|
print(output) |
|
|
|
if __name__ == "__main__": |
|
sys.exit(main()) |