Skip to content

Instantly share code, notes, and snippets.

@maltoze
Forked from apelif/wxtool.py
Created May 29, 2018 06:26
Show Gist options
  • Save maltoze/1749830d21a8ea0a960a9f60a1858c2d to your computer and use it in GitHub Desktop.
Save maltoze/1749830d21a8ea0a960a9f60a1858c2d to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
"""
两个功能:
1:保存微信群聊天信息(文本和图片信息)
2: 对话以'twr'开始时自动调用聊天机器人小黄鸡
致谢:
https://github.com/0x5e/wechat-deleted-friends
http://www.tanhao.me/talk/1466.html
"""
import os
import urllib2
import time
import re
import subprocess
import sys
import xml.dom.minidom
import json
import ssl
import logging
from urllib import urlencode, quote
from cookielib import CookieJar
from logging.handlers import RotatingFileHandler
reload(sys)
sys.setdefaultencoding('utf-8')
DEBUG = False
tip = 0
uuid = ''
base_uri = ''
redirect_uri = ''
skey = ''
wxsid = ''
wxuin = ''
pass_ticket = ''
device_id = 'e498011921532452'
SyncKey = {}
_ = 0
BaseRequest = {}
ContactList = []
My = {}
MemberList = []
BatchContactList = []
QRImagePath = os.path.join(os.getcwd(), 'qrcode.jpg')
# 文件大于20M自动分割,最多100个
rtf_handler = RotatingFileHandler('./msg.log',
maxBytes=20*1024*1024,
backupCount=100,
encoding='utf-8')
formatter = logging.Formatter('%(message)s')
rtf_handler.setFormatter(formatter)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(rtf_handler)
def get_request(url, data=None):
try:
data = data.encode('utf-8')
except:
pass
finally:
return urllib2.Request(url=url, data=data)
def get_uuid():
global uuid
url = 'https://login.weixin.qq.com/jslogin'
params = {
'appid': 'wx782c26e4c19acffb',
'fun': 'new',
'lang': 'zh_CN',
'_': int(time.time()),
}
request = get_request(url=url, data=urlencode(params))
response = urllib2.urlopen(request)
data = response.read().decode('utf-8', 'replace')
# print data
regx = r'window.QRLogin.code = (\d+); window.QRLogin.uuid = "(\S+?)"'
pm = re.search(regx, data)
code = pm.group(1)
uuid = pm.group(2)
if code == '200':
return uuid
return False
def show_QRImage():
global tip
url = 'https://login.weixin.qq.com/qrcode/' + uuid
params = {
't': 'webwx',
'_': int(time.time()),
}
request = get_request(url=url, data=urlencode(params))
response = urllib2.urlopen(request)
tip = 1
f = open(QRImagePath, 'wb')
f.write(response.read())
f.close()
if sys.platform.find('darwin') >= 0:
subprocess.call(['open', QRImagePath])
elif sys.platform.find('linux') >= 0:
subprocess.call(['xdg-open', QRImagePath])
else:
os.startfile(QRImagePath)
print u'请使用微信扫描二维码登录'
def wait_for_login():
global tip, base_uri, redirect_uri
url = 'https://login.weixin.qq.com/cgi-bin/mmwebwx-bin/login?'\
'tip=%s&uuid=%s&_=%s' % (tip, uuid, _)
# print url
request = get_request(url=url)
response = urllib2.urlopen(request)
data = response.read().decode('utf-8', 'replace')
# print data
regx = r'window.code=(\d+);'
pm = re.search(regx, data)
code = pm.group(1)
# 已扫描
if code == '201':
print u'成功扫描,请在手机上点击确认以登录'
# 已登录
elif code == '200':
print u'正在登录...'
regx = r'window.redirect_uri="(\S+?)";'
pm = re.search(regx, data)
redirect_uri = pm.group(1) + '&fun=new'
base_uri = redirect_uri[:redirect_uri.rfind('/')]
# print redirect_uri, base_uri
# 超时
elif code == '408':
pass
# elif code == '400' or code == '500'
return code
def login():
global skey, wxsid, wxuin, pass_ticket, BaseRequest
request = get_request(url=redirect_uri)
response = urllib2.urlopen(request)
data = response.read().decode('utf-8', 'replace')
# print(data)
'''
<error>
<ret>0</ret>
<message>OK</message>
<skey>xxx</skey>
<wxsid>xxx</wxsid>
<wxuin>xxx</wxuin>
<pass_ticket>xxx</pass_ticket>
<isgrayscale>1</isgrayscale>
</error>
'''
doc = xml.dom.minidom.parseString(data)
root = doc.documentElement
for node in root.childNodes:
if node.nodeName == 'skey':
skey = node.childNodes[0].data
elif node.nodeName == 'wxsid':
wxsid = node.childNodes[0].data
elif node.nodeName == 'wxuin':
wxuin = node.childNodes[0].data
elif node.nodeName == 'pass_ticket':
pass_ticket = node.childNodes[0].data
# print('skey: %s, wxsid: %s, wxuin: %s, pass_ticket: %s' %\
# (skey, wxsid, wxuin, pass_ticket))
# if skey == '' or wxsid == '' or wxuin == '' or pass_ticket == '':
# return False
if not all((skey, wxsid, wxuin, pass_ticket)):
return False
BaseRequest = {
'Uin': int(wxuin),
'Sid': wxsid,
'Skey': skey,
'DeviceID': device_id,
}
# print BaseRequest
return True
def webwx_init():
url = base_uri + '/webwxinit?pass_ticket=%s&skey=%s&r=%s' %\
(pass_ticket, skey, int(time.time()))
params = {
'BaseRequest': BaseRequest
}
request = get_request(url=url, data=json.dumps(params))
request.add_header('ContentType', 'application/json; charset=UTF-8')
response = urllib2.urlopen(request)
data = response.read().decode('utf-8', 'replace')
if DEBUG:
f = open(os.path.join(os.getcwd(), 'webwxinit.json'), 'wb')
f.write(data)
f.close()
# print(data)
global ContactList, My, SyncKey
dic = json.loads(data)
ContactList = dic['ContactList']
My = dic['User']
SyncKey = dic['SyncKey']
# print 'SyncKey', SyncKey
ErrMsg = dic['BaseResponse']['ErrMsg']
if len(ErrMsg) > 0:
print(ErrMsg)
Ret = dic['BaseResponse']['Ret']
if Ret != 0:
return False
return True
def sync_check():
url = base_uri + '/synccheck?'
params = {
'skey': BaseRequest['Skey'],
'sid': BaseRequest['Sid'],
'uin': BaseRequest['Uin'],
'deviceId': BaseRequest['DeviceID'],
'synckey': gen_synckey(),
'r': int(time.time()*1000),
'_': _ + 1,
}
request = get_request(url=url + urlencode(params))
response = urllib2.urlopen(request)
data = response.read().decode('utf-8', 'replace')
print data
# window.synccheck={retcode:"0",selector:"2"}
regx = 'window.synccheck={retcode:"(\d+)",selector:"(\d+)"}'
pm = re.search(regx, data)
code = pm.group(2)
if code != 0:
return int(code)
else:
return False
def gen_synckey():
ret = []
for sk in SyncKey['List']:
ret.append('%s_%s' % (sk['Key'], sk['Val']))
return '|'.join(ret)
def statusNotify():
url = base_uri + '/webwxstatusnotify?pass_ticket=%s' % pass_ticket
params = {
'BaseRequest': BaseRequest,
'ClientMsgId': int(time.time()*1000),
'Code': 3,
'FromUserName': My['UserName'],
'ToUserName': My['UserName'],
}
request = get_request(url=url, data=json.dumps(params))
request.add_header('ContentType', 'application/json; charset=UTF-8')
response = urllib2.urlopen(request)
data = response.read().decode('utf-8', 'replace')
# print data
def webwx_sync():
global SyncKey, ContactList
url = base_uri + '/webwxsync?sid=%s&skey%s&pass_ticket=%s' %\
(wxsid, skey, pass_ticket)
params = {
'BaseRequest': BaseRequest,
'SyncKey': SyncKey,
'rr': int(time.time()),
}
request = get_request(url=url, data=json.dumps(params))
request.add_header('ContentType', 'application/json; charset=UTF-8')
response = urllib2.urlopen(request)
data = response.read().decode('utf-8', 'replace')
dic = json.loads(data)
SyncKey = dic['SyncKey']
for msg in dic['AddMsgList']:
if int(msg['StatusNotifyCode']) == 4:
is_modify = False
status_notify_user_name = msg['StatusNotifyUserName'].split(',')
for username in status_notify_user_name:
if username not in \
[contact['UserName'] for contact in ContactList]:
ContactList.append({'UserName': username})
is_modify = True
if is_modify:
webwxbatchgetcontact()
content = msg['Content'].split('<br/>')
if not content:
continue
try:
if content[1].startswith('twr'):
webwx_send_msg(content[1][3:], msg['FromUserName'])
except Exception, e:
if content[0].startswith('twr'):
if msg['FromUserName'] == My['UserName']:
webwx_send_msg(content[0][3:].strip(), msg['ToUserName'])
else:
webwx_send_msg(content[0][3:].strip(), msg['FromUserName'])
if not msg['FromUserName'].startswith('@@'):
continue
nickname, groupname = '', ''
for batch in BatchContactList:
if batch['UserName'] == msg['FromUserName']:
groupname = batch['NickName']
for member in batch['MemberList']:
if member['UserName'] == content[0][:-1]:
nickname = member['NickName']
# 文本信息
if int(msg['MsgType']) == 1:
logger.info('[%s][%s][%s]%s' %
(nickname,
groupname,
time.strftime('%Y-%m-%d %H-%M-%S',
time.localtime(int(msg['CreateTime']))),
content[1]))
# 图片信息
elif int(msg['MsgType']) == 3:
logger.info('[%s][%s][%s]./images/%s.jpg' %
(nickname,
groupname,
time.strftime('%Y-%m-%d %H-%M-%S',
time.localtime(int(msg['CreateTime']))),
msg['MsgId']))
webwxgetmsgimg(msg['MsgId'])
if DEBUG:
f = open(os.path.join(os.getcwd(), 'webwxsync.json'), 'wb')
f.write(data)
f.close()
def webwx_send_msg(content, ToUserName):
url = base_uri + '/webwxsendmsg?pass_ticket=%s' % pass_ticket
params = {
'BaseRequest': BaseRequest,
'Msg': {'Type': 1,
'Content': simsimi(content),
'ClientMsgId': int(time.time()*1000),
'LocalID': int(time.time()*1000),
'FromUserName': My['UserName'],
'ToUserName': ToUserName,
}
}
request = get_request(url=url, data=json.dumps(params, ensure_ascii=False))
request.add_header('ContentType', 'application/json; charset=UTF-8')
response = urllib2.urlopen(request)
data = response.read().decode('utf-8', 'replace')
def simsimi(msg):
url = 'http://simsimi.com/requestChat?lc=ch&ft=1.0&'
params = {
'req': msg,
}
print 'simsimi'
request = get_request(url=url+urlencode(params))
try:
response = urllib2.urlopen(request)
data = response.read().decode('utf-8', 'replace')
print data
dic = json.loads(data)
if dic['res']['msg'].startswith('I HAVE NO RESPONSE'):
return '...'
if int(dic['status']) == 200:
return dic['res']['msg']
else:
pass
except Exception, e:
print e
return '...'
def webwxgetcontact():
url = base_uri + \
'/webwxgetcontact?pass_ticket=%s&skey=%s&r=%s&seq=0' % (
pass_ticket, skey, int(time.time()))
request = get_request(url=url)
request.add_header('ContentType', 'application/json; charset=UTF-8')
response = urllib2.urlopen(request)
data = response.read()
if DEBUG:
f = open(os.path.join(os.getcwd(), 'webwxgetcontact.json'), 'wb')
f.write(data)
f.close()
# print(data)
data = data.decode('utf-8', 'replace')
dic = json.loads(data)
global MemberList
MemberList = dic['MemberList']
def webwxbatchgetcontact():
url = base_uri + '/webwxbatchgetcontact?type=ex&r=%s&pass_ticket=%s' % \
(int(time.time())*1000, pass_ticket)
chat_rooms = []
for contact in ContactList:
if contact['UserName'].startswith('@@'):
chat_rooms.append({'UserName': contact['UserName']})
params = {
'BaseRequest': BaseRequest,
'List': chat_rooms,
'Count': len(chat_rooms),
}
request = get_request(url=url, data=json.dumps(params))
request.add_header('ContentType', 'application/json; charset=UTF-8')
response = urllib2.urlopen(request)
data = response.read().decode('utf-8', 'replace')
if DEBUG:
f = open(os.path.join(os.getcwd(), 'webwxbatchgetcontact.json'), 'wb')
f.write(data)
f.close()
global BatchContactList
dic = json.loads(data)
BatchContactList = dic['ContactList']
def webwxgetmsgimg(MsgID):
url = base_uri + '/webwxgetmsgimg?&MsgID=%s&skey=%s' %\
(MsgID, skey)
request = get_request(url=url)
response = urllib2.urlopen(request)
with open('./images/%s.jpg' % MsgID, 'wb') as f:
f.write(response.read())
def main():
try:
# ssl._create_default_https_context = ssl._create_unverified_context
opener = urllib2.build_opener(
urllib2.HTTPCookieProcessor(CookieJar()))
urllib2.install_opener(opener)
except Exception, e:
print e
return
uuid = get_uuid()
show_QRImage()
while wait_for_login() != '200':
pass
os.remove(QRImagePath)
if not login():
print u'登录失败'
return
print u'登录成功'
if not webwx_init():
print u'初始化失败'
return
statusNotify()
webwxgetcontact()
webwxbatchgetcontact()
while True:
if sync_check():
webwx_sync()
time.sleep(1)
if __name__ == '__main__':
if not os.path.exists('images'):
os.mkdir('images')
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment