-
-
Save maltoze/1749830d21a8ea0a960a9f60a1858c2d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
两个功能: | |
1:保存微信群聊天信息(文本和图片信息) | |
2: 对话以'twr'开始时自动调用聊天机器人小黄鸡 | |
致谢: | |
https://github.com/0x5e/wechat-deleted-friends | |
http://www.tanhao.me/talk/1466.html | |
""" | |
import os | |
import urllib2 | |
import time | |
import re | |
import subprocess | |
import sys | |
import xml.dom.minidom | |
import json | |
import ssl | |
import logging | |
from urllib import urlencode, quote | |
from cookielib import CookieJar | |
from logging.handlers import RotatingFileHandler | |
reload(sys) | |
sys.setdefaultencoding('utf-8') | |
DEBUG = False | |
tip = 0 | |
uuid = '' | |
base_uri = '' | |
redirect_uri = '' | |
skey = '' | |
wxsid = '' | |
wxuin = '' | |
pass_ticket = '' | |
device_id = 'e498011921532452' | |
SyncKey = {} | |
_ = 0 | |
BaseRequest = {} | |
ContactList = [] | |
My = {} | |
MemberList = [] | |
BatchContactList = [] | |
QRImagePath = os.path.join(os.getcwd(), 'qrcode.jpg') | |
# 文件大于20M自动分割,最多100个 | |
rtf_handler = RotatingFileHandler('./msg.log', | |
maxBytes=20*1024*1024, | |
backupCount=100, | |
encoding='utf-8') | |
formatter = logging.Formatter('%(message)s') | |
rtf_handler.setFormatter(formatter) | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
logger.addHandler(rtf_handler) | |
def get_request(url, data=None): | |
try: | |
data = data.encode('utf-8') | |
except: | |
pass | |
finally: | |
return urllib2.Request(url=url, data=data) | |
def get_uuid(): | |
global uuid | |
url = 'https://login.weixin.qq.com/jslogin' | |
params = { | |
'appid': 'wx782c26e4c19acffb', | |
'fun': 'new', | |
'lang': 'zh_CN', | |
'_': int(time.time()), | |
} | |
request = get_request(url=url, data=urlencode(params)) | |
response = urllib2.urlopen(request) | |
data = response.read().decode('utf-8', 'replace') | |
# print data | |
regx = r'window.QRLogin.code = (\d+); window.QRLogin.uuid = "(\S+?)"' | |
pm = re.search(regx, data) | |
code = pm.group(1) | |
uuid = pm.group(2) | |
if code == '200': | |
return uuid | |
return False | |
def show_QRImage(): | |
global tip | |
url = 'https://login.weixin.qq.com/qrcode/' + uuid | |
params = { | |
't': 'webwx', | |
'_': int(time.time()), | |
} | |
request = get_request(url=url, data=urlencode(params)) | |
response = urllib2.urlopen(request) | |
tip = 1 | |
f = open(QRImagePath, 'wb') | |
f.write(response.read()) | |
f.close() | |
if sys.platform.find('darwin') >= 0: | |
subprocess.call(['open', QRImagePath]) | |
elif sys.platform.find('linux') >= 0: | |
subprocess.call(['xdg-open', QRImagePath]) | |
else: | |
os.startfile(QRImagePath) | |
print u'请使用微信扫描二维码登录' | |
def wait_for_login(): | |
global tip, base_uri, redirect_uri | |
url = 'https://login.weixin.qq.com/cgi-bin/mmwebwx-bin/login?'\ | |
'tip=%s&uuid=%s&_=%s' % (tip, uuid, _) | |
# print url | |
request = get_request(url=url) | |
response = urllib2.urlopen(request) | |
data = response.read().decode('utf-8', 'replace') | |
# print data | |
regx = r'window.code=(\d+);' | |
pm = re.search(regx, data) | |
code = pm.group(1) | |
# 已扫描 | |
if code == '201': | |
print u'成功扫描,请在手机上点击确认以登录' | |
# 已登录 | |
elif code == '200': | |
print u'正在登录...' | |
regx = r'window.redirect_uri="(\S+?)";' | |
pm = re.search(regx, data) | |
redirect_uri = pm.group(1) + '&fun=new' | |
base_uri = redirect_uri[:redirect_uri.rfind('/')] | |
# print redirect_uri, base_uri | |
# 超时 | |
elif code == '408': | |
pass | |
# elif code == '400' or code == '500' | |
return code | |
def login(): | |
global skey, wxsid, wxuin, pass_ticket, BaseRequest | |
request = get_request(url=redirect_uri) | |
response = urllib2.urlopen(request) | |
data = response.read().decode('utf-8', 'replace') | |
# print(data) | |
''' | |
<error> | |
<ret>0</ret> | |
<message>OK</message> | |
<skey>xxx</skey> | |
<wxsid>xxx</wxsid> | |
<wxuin>xxx</wxuin> | |
<pass_ticket>xxx</pass_ticket> | |
<isgrayscale>1</isgrayscale> | |
</error> | |
''' | |
doc = xml.dom.minidom.parseString(data) | |
root = doc.documentElement | |
for node in root.childNodes: | |
if node.nodeName == 'skey': | |
skey = node.childNodes[0].data | |
elif node.nodeName == 'wxsid': | |
wxsid = node.childNodes[0].data | |
elif node.nodeName == 'wxuin': | |
wxuin = node.childNodes[0].data | |
elif node.nodeName == 'pass_ticket': | |
pass_ticket = node.childNodes[0].data | |
# print('skey: %s, wxsid: %s, wxuin: %s, pass_ticket: %s' %\ | |
# (skey, wxsid, wxuin, pass_ticket)) | |
# if skey == '' or wxsid == '' or wxuin == '' or pass_ticket == '': | |
# return False | |
if not all((skey, wxsid, wxuin, pass_ticket)): | |
return False | |
BaseRequest = { | |
'Uin': int(wxuin), | |
'Sid': wxsid, | |
'Skey': skey, | |
'DeviceID': device_id, | |
} | |
# print BaseRequest | |
return True | |
def webwx_init(): | |
url = base_uri + '/webwxinit?pass_ticket=%s&skey=%s&r=%s' %\ | |
(pass_ticket, skey, int(time.time())) | |
params = { | |
'BaseRequest': BaseRequest | |
} | |
request = get_request(url=url, data=json.dumps(params)) | |
request.add_header('ContentType', 'application/json; charset=UTF-8') | |
response = urllib2.urlopen(request) | |
data = response.read().decode('utf-8', 'replace') | |
if DEBUG: | |
f = open(os.path.join(os.getcwd(), 'webwxinit.json'), 'wb') | |
f.write(data) | |
f.close() | |
# print(data) | |
global ContactList, My, SyncKey | |
dic = json.loads(data) | |
ContactList = dic['ContactList'] | |
My = dic['User'] | |
SyncKey = dic['SyncKey'] | |
# print 'SyncKey', SyncKey | |
ErrMsg = dic['BaseResponse']['ErrMsg'] | |
if len(ErrMsg) > 0: | |
print(ErrMsg) | |
Ret = dic['BaseResponse']['Ret'] | |
if Ret != 0: | |
return False | |
return True | |
def sync_check(): | |
url = base_uri + '/synccheck?' | |
params = { | |
'skey': BaseRequest['Skey'], | |
'sid': BaseRequest['Sid'], | |
'uin': BaseRequest['Uin'], | |
'deviceId': BaseRequest['DeviceID'], | |
'synckey': gen_synckey(), | |
'r': int(time.time()*1000), | |
'_': _ + 1, | |
} | |
request = get_request(url=url + urlencode(params)) | |
response = urllib2.urlopen(request) | |
data = response.read().decode('utf-8', 'replace') | |
print data | |
# window.synccheck={retcode:"0",selector:"2"} | |
regx = 'window.synccheck={retcode:"(\d+)",selector:"(\d+)"}' | |
pm = re.search(regx, data) | |
code = pm.group(2) | |
if code != 0: | |
return int(code) | |
else: | |
return False | |
def gen_synckey(): | |
ret = [] | |
for sk in SyncKey['List']: | |
ret.append('%s_%s' % (sk['Key'], sk['Val'])) | |
return '|'.join(ret) | |
def statusNotify(): | |
url = base_uri + '/webwxstatusnotify?pass_ticket=%s' % pass_ticket | |
params = { | |
'BaseRequest': BaseRequest, | |
'ClientMsgId': int(time.time()*1000), | |
'Code': 3, | |
'FromUserName': My['UserName'], | |
'ToUserName': My['UserName'], | |
} | |
request = get_request(url=url, data=json.dumps(params)) | |
request.add_header('ContentType', 'application/json; charset=UTF-8') | |
response = urllib2.urlopen(request) | |
data = response.read().decode('utf-8', 'replace') | |
# print data | |
def webwx_sync(): | |
global SyncKey, ContactList | |
url = base_uri + '/webwxsync?sid=%s&skey%s&pass_ticket=%s' %\ | |
(wxsid, skey, pass_ticket) | |
params = { | |
'BaseRequest': BaseRequest, | |
'SyncKey': SyncKey, | |
'rr': int(time.time()), | |
} | |
request = get_request(url=url, data=json.dumps(params)) | |
request.add_header('ContentType', 'application/json; charset=UTF-8') | |
response = urllib2.urlopen(request) | |
data = response.read().decode('utf-8', 'replace') | |
dic = json.loads(data) | |
SyncKey = dic['SyncKey'] | |
for msg in dic['AddMsgList']: | |
if int(msg['StatusNotifyCode']) == 4: | |
is_modify = False | |
status_notify_user_name = msg['StatusNotifyUserName'].split(',') | |
for username in status_notify_user_name: | |
if username not in \ | |
[contact['UserName'] for contact in ContactList]: | |
ContactList.append({'UserName': username}) | |
is_modify = True | |
if is_modify: | |
webwxbatchgetcontact() | |
content = msg['Content'].split('<br/>') | |
if not content: | |
continue | |
try: | |
if content[1].startswith('twr'): | |
webwx_send_msg(content[1][3:], msg['FromUserName']) | |
except Exception, e: | |
if content[0].startswith('twr'): | |
if msg['FromUserName'] == My['UserName']: | |
webwx_send_msg(content[0][3:].strip(), msg['ToUserName']) | |
else: | |
webwx_send_msg(content[0][3:].strip(), msg['FromUserName']) | |
if not msg['FromUserName'].startswith('@@'): | |
continue | |
nickname, groupname = '', '' | |
for batch in BatchContactList: | |
if batch['UserName'] == msg['FromUserName']: | |
groupname = batch['NickName'] | |
for member in batch['MemberList']: | |
if member['UserName'] == content[0][:-1]: | |
nickname = member['NickName'] | |
# 文本信息 | |
if int(msg['MsgType']) == 1: | |
logger.info('[%s][%s][%s]%s' % | |
(nickname, | |
groupname, | |
time.strftime('%Y-%m-%d %H-%M-%S', | |
time.localtime(int(msg['CreateTime']))), | |
content[1])) | |
# 图片信息 | |
elif int(msg['MsgType']) == 3: | |
logger.info('[%s][%s][%s]./images/%s.jpg' % | |
(nickname, | |
groupname, | |
time.strftime('%Y-%m-%d %H-%M-%S', | |
time.localtime(int(msg['CreateTime']))), | |
msg['MsgId'])) | |
webwxgetmsgimg(msg['MsgId']) | |
if DEBUG: | |
f = open(os.path.join(os.getcwd(), 'webwxsync.json'), 'wb') | |
f.write(data) | |
f.close() | |
def webwx_send_msg(content, ToUserName): | |
url = base_uri + '/webwxsendmsg?pass_ticket=%s' % pass_ticket | |
params = { | |
'BaseRequest': BaseRequest, | |
'Msg': {'Type': 1, | |
'Content': simsimi(content), | |
'ClientMsgId': int(time.time()*1000), | |
'LocalID': int(time.time()*1000), | |
'FromUserName': My['UserName'], | |
'ToUserName': ToUserName, | |
} | |
} | |
request = get_request(url=url, data=json.dumps(params, ensure_ascii=False)) | |
request.add_header('ContentType', 'application/json; charset=UTF-8') | |
response = urllib2.urlopen(request) | |
data = response.read().decode('utf-8', 'replace') | |
def simsimi(msg): | |
url = 'http://simsimi.com/requestChat?lc=ch&ft=1.0&' | |
params = { | |
'req': msg, | |
} | |
print 'simsimi' | |
request = get_request(url=url+urlencode(params)) | |
try: | |
response = urllib2.urlopen(request) | |
data = response.read().decode('utf-8', 'replace') | |
print data | |
dic = json.loads(data) | |
if dic['res']['msg'].startswith('I HAVE NO RESPONSE'): | |
return '...' | |
if int(dic['status']) == 200: | |
return dic['res']['msg'] | |
else: | |
pass | |
except Exception, e: | |
print e | |
return '...' | |
def webwxgetcontact(): | |
url = base_uri + \ | |
'/webwxgetcontact?pass_ticket=%s&skey=%s&r=%s&seq=0' % ( | |
pass_ticket, skey, int(time.time())) | |
request = get_request(url=url) | |
request.add_header('ContentType', 'application/json; charset=UTF-8') | |
response = urllib2.urlopen(request) | |
data = response.read() | |
if DEBUG: | |
f = open(os.path.join(os.getcwd(), 'webwxgetcontact.json'), 'wb') | |
f.write(data) | |
f.close() | |
# print(data) | |
data = data.decode('utf-8', 'replace') | |
dic = json.loads(data) | |
global MemberList | |
MemberList = dic['MemberList'] | |
def webwxbatchgetcontact(): | |
url = base_uri + '/webwxbatchgetcontact?type=ex&r=%s&pass_ticket=%s' % \ | |
(int(time.time())*1000, pass_ticket) | |
chat_rooms = [] | |
for contact in ContactList: | |
if contact['UserName'].startswith('@@'): | |
chat_rooms.append({'UserName': contact['UserName']}) | |
params = { | |
'BaseRequest': BaseRequest, | |
'List': chat_rooms, | |
'Count': len(chat_rooms), | |
} | |
request = get_request(url=url, data=json.dumps(params)) | |
request.add_header('ContentType', 'application/json; charset=UTF-8') | |
response = urllib2.urlopen(request) | |
data = response.read().decode('utf-8', 'replace') | |
if DEBUG: | |
f = open(os.path.join(os.getcwd(), 'webwxbatchgetcontact.json'), 'wb') | |
f.write(data) | |
f.close() | |
global BatchContactList | |
dic = json.loads(data) | |
BatchContactList = dic['ContactList'] | |
def webwxgetmsgimg(MsgID): | |
url = base_uri + '/webwxgetmsgimg?&MsgID=%s&skey=%s' %\ | |
(MsgID, skey) | |
request = get_request(url=url) | |
response = urllib2.urlopen(request) | |
with open('./images/%s.jpg' % MsgID, 'wb') as f: | |
f.write(response.read()) | |
def main(): | |
try: | |
# ssl._create_default_https_context = ssl._create_unverified_context | |
opener = urllib2.build_opener( | |
urllib2.HTTPCookieProcessor(CookieJar())) | |
urllib2.install_opener(opener) | |
except Exception, e: | |
print e | |
return | |
uuid = get_uuid() | |
show_QRImage() | |
while wait_for_login() != '200': | |
pass | |
os.remove(QRImagePath) | |
if not login(): | |
print u'登录失败' | |
return | |
print u'登录成功' | |
if not webwx_init(): | |
print u'初始化失败' | |
return | |
statusNotify() | |
webwxgetcontact() | |
webwxbatchgetcontact() | |
while True: | |
if sync_check(): | |
webwx_sync() | |
time.sleep(1) | |
if __name__ == '__main__': | |
if not os.path.exists('images'): | |
os.mkdir('images') | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment