-
-
Save qingfeng/253321 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# encoding: utf-8 | |
import urllib2 | |
import cookielib | |
import re | |
import xml.dom as dom | |
import xml.dom.minidom as minidom | |
from BeautifulSoup import BeautifulSoup, Tag, Comment, NavigableString | |
__all__ = ( | |
'AmebaNowClientException', | |
'UnsupportedContentTypeError', | |
'AuthenticationError', | |
'PostError', | |
'UnexpectedResponseError', | |
'AmebaNowClient', | |
) | |
class AmebaNowClientException(Exception): | |
pass | |
class UnsupportedContentTypeError(AmebaNowClientException): | |
pass | |
class AuthenticationError(AmebaNowClientException): | |
pass | |
class UnexpectedResponseError(AmebaNowClientException): | |
pass | |
class PostError(AmebaNowClientException): | |
pass | |
class AmebaNowClient(object): | |
LOGIN_URL = 'http://www.ameba.jp/login.do' | |
FORM_URL = 'http://now.ameba.jp/' | |
POST_URL = 'http://ucsnow.ameba.jp/post' | |
API_MYTIMELINE = 'http://now.ameba.jp/api/timeline' | |
API_TIMELINE = 'http://now.ameba.jp/api/entryList' | |
API_ENCODING = 'utf-8' | |
def __init__(self, credentials): | |
self.credentials = credentials | |
self.cookiejar = cookielib.CookieJar() | |
self.opener = urllib2.build_opener( | |
urllib2.HTTPCookieProcessor(self.cookiejar)) | |
@staticmethod | |
def _textify(nodelist): | |
retval = '' | |
for n in nodelist: | |
if isinstance(n, Comment): | |
pass | |
elif isinstance(n, Tag): | |
if n.name == 'br': | |
retval += "\n" | |
else: | |
retval += AmebaNowClient._textify(n) | |
elif isinstance(n, NavigableString): | |
retval += unicode(n) | |
return retval | |
def _urlread(self, url, data=None, default_charset='utf-8'): | |
r = self.opener.open(url, data) | |
headers = r.info() | |
resp = r.read() | |
if headers.gettype() == 'text/html': | |
encoding = headers.getparam('charset') | |
if encoding == None: | |
encoding = default_charset | |
match = re.search(r'<meta\s+http-equiv=(["\']?)Content-Type\1\s+content=("[^"]*"|\'[^\']*\'|[^"\'\s]*)[^>]*>', resp, re.IGNORECASE) | |
value = match.group(2) | |
if value[0] == '"' or value[0] == "'": | |
value = value[1:-1] | |
# XXX: should take care of quoted values | |
tmp = re.split(r'\s*;\s*', value) | |
for k, v in (t.split('=') for t in tmp[1:]): | |
if k.lower() == 'charset': | |
encoding = v | |
elif headers.gettype() == 'text/xml': | |
encoding = headers.getparam('charset') | |
if encoding == None: | |
encoding = default_charset | |
match = re.match(r'''<?xml\s+[^?>]*encoding=("[^"]*"|'[^']*'|[^"'?>]*)[^?>]*?>''', resp, re.IGNORECASE) | |
value = match.group(1) | |
if value[0] == '"' or value[0] == "'": | |
value = value[1:-1] | |
encoding = v | |
else: | |
raise UnsupportedContentTypeError(headers.gettype()) | |
r.close() | |
return r, encoding, resp.decode(encoding) | |
def login(self): | |
_, _, resp = self._urlread(self.LOGIN_URL, | |
'password=%s&amebaId=%s' % ( | |
urllib2.quote(self.credentials['password']), | |
urllib2.quote(self.credentials['ameba_id']))) | |
error_node = BeautifulSoup(resp).find('div', 'errorId', recursive=True) | |
if error_node: | |
raise AuthenticationError( | |
AmebaNowClient._textify(error_node).strip()) | |
@staticmethod | |
def _buildquery(params, encoding): | |
data = [] | |
for k, v in params.iteritems(): | |
if isinstance(k, unicode): k = k.encode(encoding) | |
if isinstance(v, unicode): v = v.encode(encoding) | |
k = str(k) | |
v = str(v) | |
data.append('%s=%s' % (urllib2.quote(k), urllib2.quote(v))) | |
return '&'.join(data) | |
def post(self, text): | |
_, encoding, resp = self._urlread(self.FORM_URL) | |
form = BeautifulSoup(resp).find('form', id='inputForm') | |
params = {} | |
for n in form.findAll('input', type='hidden'): | |
params[n['name']] = n['value'] | |
text_area_name = form.find('textarea')['name'] | |
params[text_area_name] = text | |
r, encoding, resp = self._urlread(self.POST_URL, AmebaNowClient._buildquery(params, encoding)) | |
if r.url != self.FORM_URL: | |
if r.url == self.POST_URL: | |
error_node = BeautifulSoup(resp).find('p', id='errorArea', recursive=True) | |
if error_node: | |
raise PostError(AmebaNowClient._textify(error_node).strip()) | |
raise PostError() | |
@staticmethod | |
def _selectonenode(parent, name): | |
nodes = parent.getElementsByTagName(name) | |
if len(nodes) == 0: | |
raise UnexpectedResponseError('No <%s> element' % name) | |
elif len(nodes) > 1: | |
raise UnexpectedResponseError('More than one <%s> elements found' % name) | |
return nodes[0] | |
@staticmethod | |
def _getnodevalue(node, name): | |
n = AmebaNowClient._selectonenode(node, name) | |
if len(n.childNodes) > 0: | |
return ''.join(i.nodeValue if i.nodeValue is not None else '' for i in n.childNodes) | |
else: | |
return None | |
@staticmethod | |
def _parseentrylist(entry_list_node): | |
result = [] | |
getnodevalue = AmebaNowClient._getnodevalue | |
for i in entry_list_node.childNodes: | |
if i.nodeType == dom.Node.ELEMENT_NODE: | |
result.append({ | |
'id': getnodevalue(i, 'entryId'), | |
'ameba_id': getnodevalue(i, 'amebaId'), | |
'text': getnodevalue(i, 'entryText'), | |
'reply_to': { | |
'ameba_id': getnodevalue(i, 'replyAmebaId'), | |
'id': getnodevalue(i, 'replyEntryId'), | |
}, | |
'nickname': getnodevalue(i, 'thumbnailNickname'), | |
'image': { | |
'url': getnodevalue(i, 'thumbnailImagePath'), | |
'width': getnodevalue(i, 'thumbnailImageWidth'), | |
'height': getnodevalue(i, 'thumbnailImageHeight'), | |
}, | |
'reply_allowed': \ | |
not bool(int(getnodevalue(i, 'denyReplyFlag'))) | |
}) | |
return result | |
def getmytimeline(self, offset=0, limit=20): | |
doc = minidom.parse(self.opener.open( | |
self.API_MYTIMELINE + '?' \ | |
+ AmebaNowClient._buildquery(dict(offset=offset), | |
self.API_ENCODING))) | |
if doc.documentElement.nodeName != u'response': | |
raise UnexpectedResponseError(doc.documentElement.nodeName) | |
entry_list_node = AmebaNowClient._selectonenode(doc.documentElement, 'entryList') | |
offset = entry_list_node.getAttribute('offset') | |
return int(offset), AmebaNowClient._parseentrylist(entry_list_node) | |
def gettimeline(self, ameba_id, offset=0, limit=20): | |
doc = minidom.parse(self.opener.open( | |
self.API_TIMELINE + '/%s' % urllib2.quote(ameba_id) + '?' \ | |
+ AmebaNowClient._buildquery(dict(offset=offset), | |
self.API_ENCODING))) | |
doc = minidom.parseString(resp) | |
if doc.documentElement.nodeName != u'response': | |
raise UnexpectedResponseError(doc.documentElement.nodeName) | |
entry_list_node = AmebaNowClient._selectonenode(doc.documentElement, 'entryList') | |
offset = entry_list_node.getAttribute('offset') | |
return int(offset), AmebaNowClient._parseentrylist(entry_list_node) | |
if __name__ == '__main__': | |
import os | |
c = AmebaNowClient(dict(ameba_id=os.environ['AMEBA_ID'], password=os.environ['AMEBA_PASSWORD'])) | |
c.login() | |
offset, entries = c.getmytimeline() | |
for entry in entries: | |
print entry['id'], entry['nickname'], entry['text'] | |
c.post(u'ちんこ') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment