-
-
Save 1dot75cm/8ff5266c4f7aa158c2b0a366d21d8bb8 to your computer and use it in GitHub Desktop.
Python 登录新浪微博 (requests 真的比 urllib2 强了 2^^32 倍,兼容 py2/3)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
future==0.16.0 | |
requests==2.11.1 | |
rsa==3.4.2 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from __future__ import absolute_import, division, print_function, unicode_literals | |
from future import standard_library, utils | |
standard_library.install_aliases() | |
from builtins import int, input, open, zip, object | |
from lxml import etree | |
import re | |
import rsa | |
import time | |
import json | |
import base64 | |
import binascii | |
import requests | |
import argparse | |
import logging | |
class WeiboLogin(object): | |
'''新浪微博登录''' | |
WBCLIENT = 'ssologin.js(v1.4.18)' | |
user_agent = ( | |
'Mozilla/5.0 (Windows NT 5.1) AppleWebKit/536.11 (KHTML, like Gecko) ' | |
'Chrome/55.0.2883.21 Safari/536.11') | |
pre_login_url = ('http://login.sina.com.cn/sso/prelogin.php?entry=weibo&' | |
'callback=sinaSSOController.preloginCallBack&rsakt=mod&su=%s&client=%s') | |
login_ticket_url = 'http://login.sina.com.cn/sso/login.php?client=%s' | |
post_login_url = ('http://weibo.com/ajaxlogin.php?framelogin=1&retcode=0&' | |
'callback=parent.sinaSSOController.feedBackUrlCallBack&ticket=%s') | |
search_url = 'http://s.weibo.com/weibo/%s&page=%s' | |
search_user_url = 'http://s.weibo.com/user/%s&auth=%s&page=%s' | |
def __init__(self): | |
'''实例化对象''' | |
self.session = requests.session() # session会保存cookie | |
self.session.headers['User-Agent'] = self.user_agent | |
self.get = self.session.get | |
self.post = self.session.post | |
self.username, self.nick = '', '' | |
self.userid, self.status = '', '' | |
self.error = '' | |
#password = RSAKey.encrypt([me.servertime, me.nonce].join("\t") +"\n"+ password) | |
def _encrypt_passwd(self, passwd, pubkey, servertime, nonce): | |
'''加密密码,返回 sp''' | |
key = rsa.PublicKey(int(pubkey, 16), int('10001', 16)) # 创建公钥 | |
message = str(servertime) +'\t'+ str(nonce) +'\n'+ str(passwd) | |
passwd = rsa.encrypt(message.encode('utf8'), key) # 加密 | |
return binascii.b2a_hex(passwd) # 转换为16进制 | |
def _pre_login(self, username): | |
'''预登陆, 获取 servertime, nonce, pubkey, rsakv, pcid''' | |
resp = self.get(self.pre_login_url % ( | |
base64.b64encode(username.encode('utf-8')), | |
self.WBCLIENT)) | |
pre_login_str = re.match(r'.+({.+?})', resp.text).group(1) | |
return json.loads(pre_login_str) | |
def _get_login_ticket(self, username, password, pre_login): | |
'''登录, 获取 retcode, ticket, uid, nick''' | |
param = { | |
'entry': 'weibo', | |
'gateway': 1, | |
'from': '', | |
'savestate': 0, | |
'useticket': 1, | |
'vsnf': 1, | |
'su': base64.b64encode(requests.utils.quote(username).encode('utf-8')), | |
'service': 'miniblog', | |
'servertime': pre_login['servertime'], | |
'nonce': pre_login['nonce'], | |
'pwencode': 'rsa2', | |
'rsakv': pre_login['rsakv'], | |
'sp': self._encrypt_passwd(password, pre_login['pubkey'], | |
pre_login['servertime'], pre_login['nonce']), | |
'encoding': 'UTF-8', | |
'domain': 'weibo.com', | |
'prelt': 115, | |
'returntype': 'TEXT' # 返回数据类型 TEXT, META | |
} | |
resp = self.post(self.login_ticket_url % self.WBCLIENT, param) | |
return json.loads(resp.text) | |
def _post_login(self, login): | |
'''验证登录 ticket, 完成登录''' | |
if login['retcode'] == '0': | |
resp = self.get(self.post_login_url % login['ticket']) | |
login_str = re.search(r'\(({.+?})\)', resp.text).group(1) | |
login.update(json.loads(login_str)) | |
return login | |
def login(self, username, password): | |
'''微博登录''' | |
pre_login = self._pre_login(username) | |
login = self._get_login_ticket(username, password, pre_login) | |
login = self._post_login(login) | |
logging.debug(' Login profile: %s' % json.dumps(login, ensure_ascii=False)) | |
self.username = username | |
self.status = login['retcode'] | |
if self.status == '0': | |
self.nick = login['nick'] | |
self.userid = login['uid'] | |
print('登录成功!欢迎您,%s。' % self.nick) | |
else: | |
self.error = login['reason'] | |
print('%s。错误代码:%s' % (self.error, self.status)) | |
exit(1) | |
def search(self, query, search_type='user', user_type='org', page=1): | |
'''微博搜索''' | |
user_dict = { | |
'org': 'org_vip', # 机构 | |
'person': 'per_vip', # 个人 | |
'user': 'ord' # 普通用户 | |
} | |
url = self.search_url % (requests.utils.quote(query), page) | |
if search_type == 'user': | |
url = self.search_user_url % ( | |
requests.utils.quote(query), user_dict[user_type], page) | |
# 设置每页项目数 | |
self.session.headers['Origin'] = 'http://s.weibo.com' | |
self.session.headers['Referer'] = 'http://s.weibo.com/preferences' | |
resp = self.post('http://s.weibo.com/ajax/preferences', | |
data={'page_num': 30, '_t': 0}) | |
logging.debug('设置项数响应: '+resp.text) | |
# 开始搜索 | |
logging.debug('page: '+str(page)) | |
resp = self.get(url) | |
if search_type == 'user': | |
html = re.search('.*pl_user_feedList".*', resp.text).group() | |
js = json.loads(re.search('{.*}', html).group()) | |
tree = etree.HTML(js['html']) | |
names = tree.xpath('//p[@class="person_name"]/a[1]/@title') | |
hrefs = tree.xpath('//p[@class="person_name"]/a[1]/@href') | |
addrs = tree.xpath('//p[@class="person_addr"]/span[2]/text()') | |
fans = tree.xpath('//p[@class="person_num"]/span[2]/a/text()') | |
search_str = tree.xpath('//div[@class="search_num"]/span/text()')[0] | |
search_num = re.search('找到([0-9]+)条结果', search_str).group(1) | |
results = list(zip(names, hrefs, addrs, fans)) | |
else: | |
html = re.search('.*pl_weibo_direct".*', resp.text).group() | |
js = json.loads(re.search('{.*}', html).group()) | |
tree = etree.HTML(js['html']) | |
names = tree.xpath('//div/a[@class="W_texta W_fb"]/@title') | |
hrefs = tree.xpath('//div/a[@class="W_texta W_fb"]/@href') | |
contents = tree.xpath('//p[@class="comment_txt"]') | |
contents = [' '.join(e.xpath('string()').split()) for e in contents] | |
times = tree.xpath('//div/a[@class="W_textb"]/text()') | |
search_str = tree.xpath('//div[@class="search_rese clearfix"]/span/text()')[0] | |
search_num = re.search('找到([0-9]+)条结果', search_str).group(1) | |
results = list(zip(names, hrefs, contents, times)) | |
return { | |
'per_page': len(names), | |
'page': int(page), | |
'pages': (int(search_num)//len(names)+1), | |
'total': int(search_num), | |
'results': results | |
} | |
def _input(string): | |
return input(string.encode('utf8')) if utils.PY2 \ | |
else input(string) # py3 | |
def parse_args(): | |
'''Parser for command-line options''' | |
parser = argparse.ArgumentParser(description='新浪微博登录示例') | |
parser.add_argument('-u', '--username', metavar='USER', type=str, dest='username', | |
action='store', required=True, help='微博用户名') | |
parser.add_argument('-p', '--password', metavar='PASS', type=str, dest='password', | |
action='store', required=True, help='微博密码') | |
parser.add_argument('-o', '--output', metavar='FILE', type=str, dest='file', | |
action='store', required=False, help='输出文件名') | |
parser.add_argument('-q', '--query', metavar='QUERY', type=str, dest='query', | |
action='store', required=False, default='书店'.encode('utf8'), help='微博搜索内容') | |
parser.add_argument('--page', metavar='PAGE', type=int, dest='page', | |
action='store', required=False, default=1, help='搜索页数') | |
parser.add_argument('--search_type', metavar='TYPE', type=str, dest='search_type', | |
action='store', required=False, default='user', help='微博搜索类型[all|user]') | |
parser.add_argument('-i', '--interactive', dest='interactive', action='store_true', help='交互式') | |
parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', help='调试信息') | |
return parser.parse_args() | |
def file_output(weibo, args): | |
total = 0 | |
while True: | |
js = weibo.search(args.query, search_type=args.search_type, page=args.page) | |
with open(args.file, 'a+') as fp: | |
for i,j,k,l in js['results']: | |
logging.debug('%s %s %s %s' % (i,j,k,l)) | |
try: | |
fp.write('%s,%s,%s,%s\n' % (i,j,k,l)) # unicode | |
except: | |
fp.write(b'%s,%s,%s,%s\n' % (i.encode('utf8'), j.encode('utf8'), | |
k.encode('utf8'), l.encode('utf8'))) # 兼容 py2, unicode -> bytes | |
args.page += 1 | |
total += js['per_page'] | |
logging.info('每页项数: %d' % js['per_page']) | |
logging.info('页数/总数: %d / %d' % (js['page'], js['pages'])) | |
logging.info('已获取项数: %d' % total) | |
logging.info('项目总数: %d' % js['total']) | |
if total >= js['total']: | |
break | |
time.sleep(10) | |
def interactive(weibo, args): | |
while True: | |
args.search_type = _input('搜索类型[user|all]: ') or args.search_type | |
args.query = _input('搜索内容: ') or args.query | |
args.page = _input('页数: ') or args.page | |
js = weibo.search(args.query, search_type=args.search_type, page=args.page) | |
for i,j,k,l in js['results']: | |
print('-> %s (%s)\n %s\n %s\n' % (i,j,k,l)) | |
print('每页项数: %d' % js['per_page']) | |
print('页数/总数: %d / %d' % (js['page'], js['pages'])) | |
print('搜索结果: %d' % js['total']) | |
if __name__ == '__main__': | |
args = parse_args() | |
if args.verbose: | |
logging.basicConfig(level=logging.DEBUG) # 日志 | |
logging.debug('Arguments: %s' % args) | |
if args.interactive: | |
args.username = args.username if args.username else _input('用户名: ') | |
args.password = args.password if args.password else _input('密码: ') | |
me = WeiboLogin() | |
me.login(args.username, args.password) | |
if args.interactive: | |
interactive(me, args) | |
if args.file: | |
file_output(me, args) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from __future__ import absolute_import, division, print_function, unicode_literals | |
from future import standard_library, utils | |
standard_library.install_aliases() | |
from builtins import str, int, input, zip, object | |
from lxml import etree | |
from future.moves.urllib import request | |
from urllib.parse import urlencode | |
from http.cookiejar import LWPCookieJar as CookieJar | |
import re | |
import rsa | |
import json | |
import time | |
import base64 | |
import binascii | |
import logging | |
logging.basicConfig(level=logging.DEBUG) # 日志 | |
class WeiboLogin(object): | |
'''新浪微博登录''' | |
WBCLIENT = 'ssologin.js(v1.4.18)' | |
user_agent = ( | |
'Mozilla/5.0 (Windows NT 5.1) AppleWebKit/536.11 (KHTML, like Gecko) ' | |
'Chrome/55.0.2883.21 Safari/536.11') | |
headers = {'User-Agent': user_agent} | |
pre_login_url = ('http://login.sina.com.cn/sso/prelogin.php?entry=weibo&' | |
'callback=sinaSSOController.preloginCallBack&rsakt=mod&su=%s&client=%s') | |
login_ticket_url = 'http://login.sina.com.cn/sso/login.php?client=%s' | |
post_login_url = ('http://weibo.com/ajaxlogin.php?framelogin=1&retcode=0&' | |
'callback=parent.sinaSSOController.feedBackUrlCallBack&ticket=%s') | |
search_url = 'http://s.weibo.com/weibo/%s&page=%s' | |
search_user_url = 'http://s.weibo.com/user/%s&auth=%s&page=%s' | |
def __init__(self): | |
# 保存 Cookie | |
cj = CookieJar() # cookie对象 | |
cookie_support = request.HTTPCookieProcessor(cj) # HTTPCookie 处理器绑定 cookie对象 | |
opener = request.build_opener(cookie_support, request.HTTPHandler) # 设置 handler 处理请求 | |
request.install_opener(opener) # 安装 opener | |
self.username, self.nick = '', '' | |
self.userid, self.status = '', '' | |
self.error = '' | |
def get(self, url): | |
'''封装 HTTP GET 方法''' | |
logging.info(' Starting HTTP connection: %s' % url.split('/')[2]) | |
req = request.Request(url, headers=self.headers) | |
resp = request.urlopen(req) | |
logging.debug(' "GET /%s" %s' % (resp.url.split('/', 3)[3], resp.code)) | |
return resp.read().decode('utf8') | |
def post(self, url, data): | |
'''封装 HTTP POST 方法''' | |
logging.info(' Starting HTTP connection: %s' % url.split('/')[2]) | |
data = _urlencode(data) # urlencode将请求对象用'&'字符连接 | |
req = request.Request(url, data=data, headers=self.headers) | |
resp = request.urlopen(req) | |
logging.debug(' "POST /%s" %s' % (resp.url.split('/', 3)[3], resp.code)) | |
return resp.read().decode('utf8') | |
def _encrypt_passwd(self, passwd, pubkey, servertime, nonce): | |
'''加密密码,返回 sp''' | |
key = rsa.PublicKey(int(pubkey, 16), int('10001', 16)) # 创建公钥 | |
message = str(servertime) +'\t'+ str(nonce) +'\n'+ str(passwd) | |
passwd = rsa.encrypt(message.encode('utf-8'), key) # 加密 | |
return binascii.b2a_hex(passwd) # 转换为16进制 | |
def _pre_login(self, username): | |
'''预登陆, 获取 servertime, nonce, pubkey, rsakv, pcid''' | |
resp = self.get(self.pre_login_url % (username, self.WBCLIENT)) | |
pre_login_str = re.match(r'.+({.+?})', resp).group(1) | |
return json.loads(pre_login_str) | |
def _get_login_ticket(self, username, password, pre_login): | |
'''登录, 获取 retcode, ticket, uid, nick''' | |
param = { | |
'entry': 'weibo', | |
'gateway': 1, | |
'from': '', | |
'savestate': 0, | |
'useticket': 1, | |
'vsnf': 1, | |
'su': base64.b64encode(request.quote(username).encode('utf-8')), | |
'service': 'miniblog', | |
'servertime': pre_login['servertime'], | |
'nonce': pre_login['nonce'], | |
'pwencode': 'rsa2', | |
'rsakv': pre_login['rsakv'], | |
'sp': self._encrypt_passwd(password, pre_login['pubkey'], | |
pre_login['servertime'], pre_login['nonce']), | |
'encoding': 'UTF-8', | |
'domain': 'weibo.com', | |
'prelt': 115, | |
'returntype': 'TEXT' | |
} | |
resp = self.post(self.login_ticket_url % self.WBCLIENT, param) | |
return json.loads(resp) | |
def _post_login(self, login): | |
'''验证登录 ticket, 完成登录''' | |
if login['retcode'] == '0': | |
resp = self.get(self.post_login_url % login['ticket']) | |
login_str = re.search(r'\(({.+?})\)', resp).group(1) | |
login.update(json.loads(login_str)) | |
return login | |
def login(self, username, password): | |
'''微博登录''' | |
pre_login = self._pre_login(username) | |
login = self._get_login_ticket(username, password, pre_login) | |
login = self._post_login(login) | |
logging.debug(' Login profile: %s' % json.dumps(login)) | |
self.username = username | |
self.status = login['retcode'] | |
if self.status == '0': | |
self.nick = login['nick'] | |
self.userid = login['uid'] | |
print('登录成功!欢迎您,%s。' % self.nick) | |
else: | |
self.error = login['reason'] | |
print('%s。错误代码:%s' % (self.error, self.status)) | |
exit(1) | |
def search(self, query, search_type='user', user_type='org', page=1): | |
'''微博搜索''' | |
user_dict = { | |
'org': 'org_vip', # 机构 | |
'person': 'per_vip', # 个人 | |
'user': 'ord' # 普通用户 | |
} | |
url = self.search_url % (request.quote(query), page) | |
if search_type == 'user': | |
url = self.search_user_url % ( | |
request.quote(query), user_dict[user_type], page) | |
# 设置每页项目数 | |
self.headers['Origin'] = 'http://s.weibo.com' | |
self.headers['Referer'] = 'http://s.weibo.com/preferences' | |
self.headers['X-Requested-With'] = 'XMLHttpRequest' | |
resp = self.post('http://s.weibo.com/ajax/preferences', | |
data={'page_num': 30, '_t': 0}) | |
logging.debug('设置项数响应: '+resp) | |
# 开始搜索 | |
logging.debug('page: '+str(page)) | |
resp = self.get(url) | |
if search_type == 'user': | |
html = re.search('.*pl_user_feedList".*', resp).group() | |
js = json.loads(re.search('{.*}', html).group()) | |
tree = etree.HTML(js['html']) | |
names = tree.xpath('//p[@class="person_name"]/a[1]/@title') | |
hrefs = tree.xpath('//p[@class="person_name"]/a[1]/@href') | |
addrs = tree.xpath('//p[@class="person_addr"]/span[2]/text()') | |
fans = tree.xpath('//p[@class="person_num"]/span[2]/a/text()') | |
search_str = tree.xpath('//div[@class="search_num"]/span/text()')[0] | |
search_num = re.search('找到([0-9]+)条结果', search_str).group(1) | |
results = list(zip(names, hrefs, addrs, fans)) | |
else: | |
html = re.search('.*pl_weibo_direct".*', resp).group() | |
js = json.loads(re.search('{.*}', html).group()) | |
tree = etree.HTML(js['html']) | |
names = tree.xpath('//div/a[@class="W_texta W_fb"]/@title') | |
hrefs = tree.xpath('//div/a[@class="W_texta W_fb"]/@href') | |
contents = tree.xpath('//p[@class="comment_txt"]') | |
contents = [' '.join(e.xpath('string()').split()) for e in contents] | |
times = tree.xpath('//div/a[@class="W_textb"]/text()') | |
search_str = tree.xpath('//div[@class="search_rese clearfix"]/span/text()')[0] | |
search_num = re.search('找到([0-9]+)条结果', search_str).group(1) | |
results = list(zip(names, hrefs, contents, times)) | |
return { | |
'per_page': len(names), | |
'page': int(page), | |
'pages': (int(search_num)//len(names)+1), | |
'total': int(search_num), | |
'results': results | |
} | |
def _input(string): | |
return input(string.encode('utf8')) if utils.PY2 \ | |
else input(string) # py3 | |
def _urlencode(string): | |
return urlencode(string) if utils.PY2 \ | |
else urlencode(string).encode('utf8') # py3 | |
if __name__ == '__main__': | |
username = _input('用户名: ') | |
password = _input('密码: ') | |
me = WeiboLogin() | |
me.login(username, password) | |
st2, q2 = 'user', '书店' | |
while True: | |
st = _input('搜索类型[user|all]: ') or st2 | |
q = _input('搜索内容: ') or q2.encode('utf8') | |
page = _input('页数: ') or 1 | |
st2, q2 = st, q | |
js = me.search(q, search_type=st, page=page) | |
for i,j,k,l in js['results']: | |
print('-> %s (%s)\n %s\n %s\n' % (i,j,k,l)) | |
print('每页项数: %d' % js['per_page']) | |
print('页数/总数: %d / %d' % (js['page'], js['pages'])) | |
print('搜索结果: %d' % js['total']) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment