Last active
August 31, 2016 12:59
-
-
Save from1to9/3023641 to your computer and use it in GitHub Desktop.
Fanfou API for python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# code by from1to9 | |
# [email protected] | |
# update 31/08/2016 | |
# pass a local image addr to upload photo, like '/home/user/test.jpg' | |
import oauth2 as oauth | |
import re, time | |
import itertools, mimetools, mimetypes | |
import urllib, urllib2 | |
import binascii, hashlib, hmac | |
class MultiPartForm(object): | |
def __init__(self): | |
self.form_fields = [] | |
self.files = [] | |
self.boundary = mimetools.choose_boundary() | |
return | |
def get_content_type(self): | |
return 'multipart/form-data; boundary=%s' % self.boundary | |
def add_field(self, name, value): | |
self.form_fields.append((name, value)) | |
return | |
def add_file(self, fieldname, filename, fileHandle, mimetype=None): | |
#body = fileHandle.read() | |
if mimetype is None: | |
mimetype = mimetypes.guess_type(filename)[0] or 'application/octet-stream' | |
self.files.append((fieldname, filename, mimetype, fileHandle)) | |
return | |
def __str__(self): | |
parts = [] | |
part_boundary = '--' + self.boundary | |
parts.extend( | |
[ part_boundary, | |
'Content-Disposition: form-data; name="%s"' % name, | |
'', | |
value, | |
] | |
for name, value in self.form_fields | |
) | |
parts.extend( | |
[ part_boundary, | |
'Content-Disposition: file; name="%s"; filename="%s"' % \ | |
(field_name, filename), | |
'Content-Type: %s' % content_type, | |
'', | |
body, | |
] | |
for field_name, filename, content_type, body in self.files | |
) | |
flattened = list(itertools.chain(*parts)) | |
flattened.append('--' + self.boundary + '--') | |
flattened.append('') | |
return '\r\n'.join(flattened) | |
class Api: | |
api_base = "api.fanfou.com" | |
#extention="xml" #json, xml, rss | |
extension = 'json' | |
api = { | |
"public_timeline": {"url": "statuses/public_timeline", "method": "GET"}, | |
"friends_timeline": {"url": "statuses/friends_timeline", "method": "GET"}, | |
"replies": {"url": "statuses/replies", "method": "GET"}, | |
"mentions": {"url": "statuses/mentions", "method": "GET"}, | |
"show": {"url": "statuses/show", "method": "GET"}, | |
"user_timeline": {"url": "statuses/user_timeline", "method": "GET"}, | |
"update": {"url": "statuses/update", "method": "POST"}, | |
"destroy": {"url": "statuses/destroy", "method": "POST"}, | |
"statuses_friends": {"url": "statuses/friends", "method": "POST"}, | |
"statuses_followers": {"url": "statuses/followers", "method": "POST"}, | |
#photo | |
"photo_upload": {"url": "photos/upload", "method": "POST"}, | |
#private msg | |
"direct_messages_sent": {"url": "direct_messages/sent", "method": "GET"}, | |
"direct_messages_inbox": {"url": "direct_messages/inbox", "method": "GET"}, | |
"direct_messages_new": {"url": "direct_messages/new", "method": "POST"}, | |
"direct_messages_destroy": {"url": "direct_messages/destroy", "method": "POST"}, | |
#friends | |
"friendships_create": {"url": "friendships/create", "method": "POST"}, | |
"friendships_destroy": {"url": "friendships/destroy", "method": "POST"}, | |
"friendships_exists": {"url": "friendships/exists", "method": "GET"}, | |
"friends_ids": {"url": "friends/ids", "method": "GET"}, | |
"followers_ids": {"url": "followers/ids", "method": "GET"}, | |
#user | |
"users_show": {"url": "users/show", "method": "GET"}, | |
"users_followers": {"url": "users/followers", "method": "GET"}, | |
"users_friends": {"url": "users/friends", "method": "GET"}, | |
#notification | |
"notifications_follow": {"url": "notifications/follow", "method": "POST"}, | |
"notifications_leave": {"url": "notifications/leave", "method": "POST"}, | |
} | |
def __init__(self, api_type): | |
self.api_type = api_type | |
def url(self): | |
if self.api_type in self.api.keys(): | |
api = self.api[self.api_type] | |
url = "http://" + self.api_base + "/" + api["url"] + "." + self.extension | |
return url | |
else: | |
return "" | |
def method(self): | |
return self.api[self.api_type]["method"] | |
class fanfou: | |
def __init__( self, consumer_key, consumer_secret, oauth_token, oauth_token_secret ): | |
self.__token = oauth.Token(key = oauth_token, secret = oauth_token_secret) | |
self.__consumer = oauth.Consumer(key = consumer_key, secret = consumer_secret) | |
self.__client = oauth.Client(self.__consumer, self.__token) | |
def _execute( self, params = {}, method = "GET", url = "" ): | |
try: | |
if method == "POST": | |
req = oauth.Request(method = method, url = url, parameters = params, is_form_encoded = True) | |
resp, content = self.__client.request(uri = url, method = "POST", body = req.to_postdata()) | |
else: | |
req = oauth.Request(method = method, url = url, parameters = params) | |
resp, content = self.__client.request(uri = req.to_url(), method = "POST") | |
rev = {"resp": resp, "content": content} | |
return rev | |
except: | |
pass | |
def get_sign(self, params = "", url = "", http_method = "", consumer_secret = "", oauth_token_secret = ""): | |
q = lambda x: urllib.quote(x, safe="~") | |
k = params.keys() | |
normalized_params = "" | |
for key in sorted(k): | |
normalized_params = "&".join((normalized_params, urllib.urlencode({key: params[key]}))) | |
normalized_params = normalized_params[1:] | |
base_string = "&".join((http_method, q(url), q(normalized_params))) | |
sig = hmac.new("&".join([consumer_secret, oauth_token_secret]), base_string, hashlib.sha1) | |
return binascii.b2a_base64(sig.digest())[:-1] | |
def get_auth_header(self, params): | |
rev = "OAuth " | |
for key in params: | |
rev = rev + key + '="' + urllib.quote(params[key]) + '",' | |
return rev[:-1] | |
def call( self , apitype, params ): | |
api = Api(apitype) | |
rev = self._execute( params, api.method(), api.url() ) | |
return rev | |
def update( self, message, source = ""): | |
params = { | |
"status": message, | |
"source": source, | |
#"in_reply_to_status_id": "", | |
#"in_reply_to_user_id": "", | |
#"repost_status_id": "", | |
#"location": "", | |
} | |
rev = self.call("update", params) | |
return rev | |
def reply_to( self, message, in_reply_to_status_id, in_reply_to_user_id, source = "" ): | |
params = { | |
"status": message, | |
"source": source, | |
"in_reply_to_status_id": in_reply_to_status_id, | |
"in_reply_to_user_id": in_reply_to_user_id, | |
} | |
rev = self.call("update", params) | |
return rev | |
def repost( self, message, repost_status_id, source = "" ): | |
params = { | |
"status": message, | |
"source": source, | |
"repost_status_id": repost_status_id, | |
} | |
rev = self.call("update", params) | |
return rev | |
def photo(self, photo, status = "", source = ""): | |
form = MultiPartForm() | |
params = { | |
"oauth_version": "1.0", | |
"oauth_timestamp": str(oauth.generate_timestamp()), | |
"oauth_nonce": oauth.generate_nonce(), | |
"oauth_consumer_key": self.__consumer.key, | |
"oauth_token": self.__token.key, | |
"oauth_signature_method": "HMAC-SHA1", | |
} | |
sign = self.get_sign(params, Api("photo_upload").url(), "POST", self.__consumer.secret, self.__token.secret) | |
params["oauth_signature"] = sign | |
header = { | |
"content-type": "multipart/form-data; boundary=" + form.boundary, | |
"Authorization": self.get_auth_header(params) | |
} | |
#change here if you want to use photo on web, pass binary data to form.add_file | |
form.add_field("status", status) | |
f = open(photo, "rb") | |
fd = f.read() | |
form.add_file("photo", photo, fd) | |
f.close() | |
post = str(form) | |
try: | |
req = urllib2.Request(Api("photo_upload").url(), post, header) | |
response = urllib2.urlopen(req) | |
resp = response.getcode() | |
content = response.read() | |
rev = {"resp": resp, "content": content} | |
except: | |
rev = {"resp": "", "content": ""} | |
return rev | |
def replies(self, since_id = "", mode = "lite", format = ""): | |
params = { | |
"since_id": since_id, | |
#"count": count, | |
#"page": page, | |
"mode": mode, | |
"format": format, | |
} | |
rev = self.call("replies", params) | |
return rev | |
def mentions(self, since_id = "", mode = "lite", format = ""): | |
params = { | |
"since_id": since_id, | |
#"count": count, | |
#"page": page, | |
"mode": mode, | |
"format": format, | |
} | |
rev = self.call("mentions", params) | |
return rev | |
def direct_messages_inbox(self, since_id = "", mode = "lite"): | |
params = { | |
"since_id": since_id, | |
#"count": count, | |
#"page": page, | |
"mode": mode, | |
} | |
rev = self.call("direct_messages_inbox", params) | |
return rev | |
def direct_messages_new(self, user, text, in_reply_to_id = ""): | |
params = { | |
"user": user, | |
"text": text[:140], | |
"in_reply_to_id": in_reply_to_id, | |
"mode": "lite", | |
"format": format, | |
} | |
rev = self.call("direct_messages_new", params) | |
return rev | |
def friends_ids(self, id = "", page = "1", count = "60"): | |
params = { | |
"id": id, | |
"count": count, | |
"page": page, | |
#"mode": mode, | |
} | |
rev = self.call("friends_ids", params) | |
return rev | |
def followers_ids(self, id = "", page = "1", count = "60"): | |
params = { | |
"id": id, | |
"count": count, | |
"page": page, | |
#"mode": mode, | |
} | |
rev = self.call("followers_ids", params) | |
return rev | |
def users_show(self, id = ""): | |
params = { | |
"id": id, | |
"mode": "lite", | |
#"mode": mode, | |
} | |
rev = self.call("users_show", params) | |
return rev | |
if __name__ == "__main__": | |
consumer_key = 'XXXX' # api key | |
consumer_secret = 'XXXX' # api secret | |
oauth_token = 'XXX' | |
oauth_token_secret = 'XXX' | |
fanfou = fanfou(consumer_key, consumer_secret, oauth_token, oauth_token_secret) | |
msg = fanfou.update("test pyfanfou") | |
print msg["content"].decode("utf-8") | |
msg = fanfou.photo("test.jpg", "photo info") | |
print msg["content"].decode("utf-8") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import oauth2 as oauth | |
import re, time | |
import urlparse | |
consumer_key = 'xxxx' # api key | |
consumer_secret = 'xxxx' # api secret | |
access_token_url = 'http://fanfou.com/oauth/access_token' | |
authorize_url = 'http://fanfou.com/oauth/authorize' | |
request_token_url = 'http://fanfou.com/oauth/request_token' | |
import urlparse | |
import oauth2 as oauth | |
consumer = oauth.Consumer(consumer_key, consumer_secret) | |
client = oauth.Client(consumer) | |
resp, content = client.request(request_token_url, "GET") | |
if resp['status'] != '200': | |
raise Exception("Invalid response %s." % resp['status']) | |
request_token = dict(urlparse.parse_qsl(content)) | |
print "Request Token:" | |
print " - oauth_token = %s" % request_token['oauth_token'] | |
print " - oauth_token_secret = %s" % request_token['oauth_token_secret'] | |
print "Go to the following link in your browser:" | |
print "%s?oauth_token=%s" % (authorize_url, request_token['oauth_token']) | |
oauth_verifier = raw_input('What is the PIN? ') | |
token = oauth.Token(request_token['oauth_token'], | |
request_token['oauth_token_secret']) | |
token.set_verifier(oauth_verifier) | |
client = oauth.Client(consumer, token) | |
resp, content = client.request(access_token_url, "POST") | |
access_token = dict(urlparse.parse_qsl(content)) | |
print "Access Token:" | |
print " - oauth_token = %s" % access_token['oauth_token'] | |
print " - oauth_token_secret = %s" % access_token['oauth_token_secret'] | |
print "You may now access protected resources using the access tokens above." | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment