Skip to content

Instantly share code, notes, and snippets.

@maliubiao
Last active January 1, 2016 07:49
Show Gist options
  • Select an option

  • Save maliubiao/8114555 to your computer and use it in GitHub Desktop.

Select an option

Save maliubiao/8114555 to your computer and use it in GitHub Desktop.
a stupid weibo sdk demo
#a very stupid weibo sdk
#-*-encoding=utf-8-*-
import io
import os
import pdb
import urllib
import json
import time
from simple_http import get
from simple_http import post
import subprocess
class Weibo:
def __init__(self):
self.code = None
self.init_api_url()
def init_api_url(self):
baseurl = "https://api.weibo.com/2/"
self.__dict__.update({
"baseurl": "https://api.weibo.com/2/",
"authorize_url": "https://api.weibo.com/oauth2/authorize",
"access_token_url": "https://api.weibo.com/oauth2/access_token",
"statuses_update_url": baseurl + "statuses/update.json",
"statuses_upload_url": baseurl +"statuses/upload.json",
"public_timeline": baseurl + "statuses/public_timeline.json",
"user_timeline_url": baseurl + "statuses/user_timeline.json",
"home_timeline_url": baseurl + "statuses/home_timeline.json",
"repost_timeline_url": baseurl + "statuses/repost_timeline.json",
"mentions_url": baseurl + "statuses/mentions.json",
"bilateral_timeline_url": baseurl + "statuses/bilateral_timeline.json",
"showone_url": baseurl + "statuses/show.json"
})
def weibo_get_home_timeline(self, args):
args['access_token'] = self.token
header, _, content = get(self.home_timeline_url, query=args)
response_object = json.loads(content)
if "error" in response_object:
handle_error(response_object)
return response_object
def weibo_get_user_timeline(self, args):
args['access_token'] = self.token
header, _, content = get(self.user_timeline_url, query=args)
response_object = json.loads(content)
if "error_code" in response_object.keys():
handle_error(response_object)
return response_object
def weibo_statuses_update(self, args):
args['access_token'] = self.token
hedaer, _, content = post(self.statuses_update_url, args)
response_object = json.loads(content)
if "error_code" in response_object.keys():
handle_error(response_object)
def weibo_statuses_update_with_image(self, args):
upload_data = {
"access_token": self.token,
"status": args['status'],
"pic": open(args['pic'], "rb")
}
header, _, content = post(self.statuses_upload_url, upload_data)
response_object = json.loads(content)
if "error_code" in response_object.keys():
handle_error(response_object)
def authorize(self):
authorize_data = {
"client_id": self.client_info['client_id'],
"redirect_uri": self.client_info['callback_url'],
"display": "client"
}
subprocess.call('firefox '+'"%s?client_id=%s&redirect_uri=%s&display=%s"' %(self.authorize_url, self.client_info['client_id'] ,urllib.quote(self.client_info['callback_url']), "mobile"), shell=True)
self.code = receiver()
def get_access_token(self, code=None):
if not code:
code = self.code
token_data = {
"client_id": self.client_info['client_id'],
"client_secret": self.client_info['client_secret'],
"grant_type": "authorization_code",
"code": code,
"redirect_uri": self.client_info['callback_url']
}
pdb.set_trace()
header, _, content = post(self.access_token_url, token_data)
response_object = json.loads(content)
if "error_code" in response_object.keys():
handle_error(response_object)
else:
self.token = response_object["access_token"]
return response_object
def handle_error(status_object):
raise Exception(status_object)
def receiver():
def html_response_forcode():
body = "<p>操作已经成功可以关闭页面了</p>"
htmlbuffer = cStringIO.StringIO()
htmlbuffer.write("HTTP/1.1 200 OK\r\n")
htmlbuffer.write("Content-Length: %d\r\n" % len(body))
htmlbuffer.write("Content-Type: text/html;charset=utf-8\r\n")
htmlbuffer.write("\r\n%s\r\n" % (body))
result = htmlbuffer.getvalue()
htmlbuffer.close()
return result
code_regex = re.compile("code=\S*")
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
sk.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sk.bind(("127.0.0.1", 8000))
sk.listen(1)
#get code
connection, addr = sk.accept()
client_data = connection.recv(4096)
code = re.findall(code_regex, client_data)
connection.close()
sk.close()
if len(code) !=0:
return code
raise Exception("can't find code")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment