Created
November 8, 2018 17:25
-
-
Save shellexy/8d3e8a49bcd46915da48b403580b74ca to your computer and use it in GitHub Desktop.
导出微信聊天记录为 txt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python2.7 | |
# -*- coding: UTF-8 -*- | |
# vim:set shiftwidth=4 tabstop=4 expandtab textwidth=79: | |
'''导出微信聊天记录为 txt | |
@author: Shellexy Wang <[email protected]> | |
@license: LGPLv3+ | |
@see: | |
''' | |
import os | |
import sys | |
import re | |
import hashlib | |
import csv | |
import time | |
import locale | |
import getopt | |
WECHAT_DB_KEY = '' | |
def get_db_path(): | |
'''get wechat EnMicroMsg.db path | |
like /data/data/com.tencent.mm/MicroMsg/********************************/EnMicroMsg.db | |
''' | |
os.popen('adb root').close() | |
txt = os.popen("adb shell su -c 'ls /data/data/com.tencent.mm/MicroMsg/*/EnMicroMsg.db' ").read() | |
return txt.splitlines()[-1] if txt else '' | |
def get_uin(): | |
'''get wechat uin | |
''' | |
os.popen('adb root').close() | |
txt = os.popen("adb shell su -c 'cat /data/data/com.tencent.mm/shared_prefs/system_config_prefs.xml' ").read() | |
uin = re.findall('name="default_uin" value="([0-9]+)"', txt) | |
return uin[0] if uin else 0 | |
def get_imei(): | |
'''get IMEI | |
''' | |
txt = os.popen('adb shell dumpsys iphonesubinfo').read() | |
imei = re.findall('Device ID = ([0-9]+)', txt) | |
return imei[0] if imei else 0 | |
def get_key(): | |
'''get key | |
''' | |
global WECHAT_DB_KEY | |
if WECHAT_DB_KEY: | |
return WECHAT_DB_KEY | |
uin = get_uin() | |
imei = get_imei() | |
if uin and imei: | |
return hashlib.md5(imei + uin).hexdigest()[0:7] | |
return '' | |
def messagecsv2chat(msgcsv): | |
'''message csv to chatlog | |
header: msgId,msgSvrId,type,status,isSend,isShowTimer,createTime,talker,content,imgPath,reserved,lvbuffer | |
''' | |
locale.setlocale(locale.LC_ALL, '') | |
if hasattr(msgcsv, 'title'): | |
msgcsv = [line + '\n' for line in msgcsv.splitlines()] | |
pass | |
lines = csv.reader(msgcsv) | |
lines.next() | |
# 聊天记录用 createTime 来重新排序,因为数据库里的序号可能有乱的,目前 csv 格式 创建时间 是在 [6] | |
lines = sorted(list(lines), key = lambda line: line[6]) | |
for line in lines: | |
try: | |
msgId,msgSvrId,type_,status,isSend,isShowTimer,createTime,talker,content,imgPath,reserved,lvbuffer = line[:12] | |
pass | |
except: | |
continue | |
sender = 'me' if (isSend == '1') else talker | |
sendtime = time.localtime(int(createTime)/1000) | |
sendat = time.strftime("%Y-%m-%d 周%a %H:%M:%S", sendtime) | |
yield [talker, sendat, sender, content, imgPath] | |
pass | |
pass | |
def chat2talkers(chat): | |
'''get talker list from chat | |
''' | |
names = {} | |
for talker, sendat, sender, content, imgPath in chat: | |
names[talker] = 1 | |
pass | |
return names.keys() | |
def chat2txt(chat, name = ''): | |
'''chatlog to txt | |
''' | |
txt = [] | |
name = name.lower() | |
for talker, sendat, sender, content, imgPath in chat: | |
# 去掉群聊记录里 content 第一行 name: 后额外的换行 | |
if '@chatroom' in name: content = content.replace(':\n', ': ', 1) | |
# 其他换行在行首加空格 | |
content = content.replace('\n', '\n ') | |
imgPath = ('\t' + imgPath) if imgPath else '' | |
if not name: | |
txt.append('%s: %s %s: %s %s' % (talker, sendat, sender, content, imgPath)) | |
pass | |
elif talker.lower() == name: | |
txt.append('%s %s: %s %s' % (sendat, sender, content, imgPath)) | |
pass | |
pass | |
return '\n'.join(txt) + '\n' | |
def chat2txts(chat, names = []): | |
'''chat log to txts for names | |
''' | |
txts = {} | |
names = [name.lower() for name in names] | |
for talker, sendat, sender, content, imgPath in chat: | |
name = talker.lower() | |
if name in names: | |
# 去掉群聊记录里 content 第一行 name: 后额外的换行 | |
if '@chatroom' in name: content = content.replace(':\n', ': ', 1) | |
# 其他换行在行首加空格 | |
content = content.replace('\n', '\n ') | |
imgPath = ('\t' + imgPath) if imgPath else '' | |
txt = txts.get(name, []) | |
if not txt: txts[name] = txt | |
txt.append('%s %s: %s %s' % (sendat, sender, content, imgPath)) | |
pass | |
pass | |
txts = { name: '\n'.join(txt) + '\n' for name, txt in txts.items() } | |
return txts | |
def get_sqlc_tables(dbn, key = ''): | |
'''list tables of db | |
''' | |
i, o = os.popen2(['sqlcipher', dbn]) | |
if key: | |
i.write('PRAGMA key=%s;\n' % `key`) | |
i.write('PRAGMA cipher_migrate;\n') | |
pass | |
i.write('.tables\n') | |
i.close() | |
## 由于加入 PRAGMA cipher_migrate; 后输出开头会多了 "cipher_migrate\r\n0\r\n",所以需要去掉头两行 | |
return o.read().split()[2:] | |
def sqlc2csv(dbn, key = '', table = 'message'): | |
'''export csv of db | |
''' | |
table = table or 'message' | |
i, o = os.popen2(['sqlcipher', dbn]) | |
i.write('.header on\n') | |
i.write('.mode csv\n') | |
if key: | |
i.write('PRAGMA key=%s;\n' % `key`) | |
i.write('PRAGMA cipher_migrate;\n') | |
pass | |
i.write('select * from %s;\n' % `table`) | |
i.close() | |
## 由于加入 PRAGMA cipher_migrate; 后输出开头会多了 "cipher_migrate\r\n0\r\n",所以需要去掉头两行 | |
o.readline() | |
o.readline() | |
return o.read() | |
def wechat2csv(tables = []): | |
'''export db to csv | |
''' | |
ldbn = 'EnMicroMsg.db' | |
dbn = get_db_path() | |
key = get_key() | |
os.popen('adb wait-for-device') | |
#os.popen('adb pull %s %s' % (`dbn`, `ldbn`)).close() | |
''' | |
dbbin = os.popen("adb shell su -c 'cat %s' " % `dbn`).read().replace("\r\n", "\n") | |
if len(dbbin) < 10: | |
return {} | |
file(ldbn, 'w').write(dbbin) | |
''' | |
os.popen('adb shell su -c "cp %s /storage/sdcard1/EnMicroMsg.db" ; adb pull /storage/sdcard1/EnMicroMsg.db %s && adb shell rm /storage/sdcard1/EnMicroMsg.db ' % (`dbn`, `ldbn`)).close() | |
if not tables: | |
tables = get_sqlc_tables(ldbn, key) | |
pass | |
for table in tables: | |
csvfn = '%s.csv' % table | |
csvtxt = sqlc2csv(ldbn, key, table) | |
if csvtxt.find('\n') > 0: | |
file(csvfn, 'w').write(csvtxt) | |
pass | |
pass | |
return | |
def wechat2csvtxt(tables = []): | |
'''export db to csv | |
''' | |
ldbn = 'EnMicroMsg.db' | |
dbn = get_db_path() | |
key = get_key() | |
csvtxts = {} | |
os.popen('adb wait-for-device') | |
''' | |
dbbin = os.popen("adb shell su -c 'cat %s' " % `dbn`).read().replace("\r\n", "\n") | |
if len(dbbin) < 10: | |
return {} | |
file(ldbn, 'w').write(dbbin) | |
''' | |
os.popen('adb shell su -c "cp %s /storage/sdcard1/EnMicroMsg.db" ; adb pull /storage/sdcard1/EnMicroMsg.db %s && adb shell rm /storage/sdcard1/EnMicroMsg.db ' % (`dbn`, `ldbn`)).close() | |
if not tables: | |
tables = get_sqlc_tables(ldbn, key) | |
pass | |
for table in tables: | |
csvtxt = sqlc2csv(ldbn, key, table) | |
if csvtxt.find('\n') > 0: | |
csvtxts[table] = csvtxt | |
pass | |
pass | |
return csvtxts | |
def wechat2txt(names = []): | |
'''export chatlog to txt | |
''' | |
csvtxt = wechat2csvtxt(tables=['message']).get('message') | |
file('message.csv', 'w').write(csvtxt) | |
chat = list(messagecsv2chat(csvtxt)) | |
if not chat: | |
return 1 | |
if not names: | |
names = chat2talkers(chat) | |
pass | |
txts = chat2txts(chat, names) | |
for name in names: | |
fn = 'message.%s.txt' % name | |
txt = txts.get(name.lower()) | |
if len(txt) > 4: | |
file(fn, 'w').write(txt) | |
pass | |
pass | |
pass | |
USAGE = '''Usage: wechat2txt.py [OPTIONS] [NAME]... | |
OPTIONS: | |
-h display this help and exit | |
-t export csv of database tables | |
-k <KEY> set the wechat db sqlcipher key | |
''' | |
def main(): | |
try: | |
opts, args = getopt.getopt(sys.argv[1:], 'htk:') | |
except getopt.error, msg: | |
print USAGE | |
return 1 | |
global WECHAT_DB_KEY | |
tables = [] | |
for opt, arg in opts: | |
if opt == '-h': | |
print USAGE | |
return 1 | |
elif opt == '-t': | |
tables = args | |
pass | |
elif opt == '-k': | |
key = arg | |
WECHAT_DB_KEY = arg | |
pass | |
pass | |
if tables: | |
wechat2csv(tables) | |
return | |
names = args | |
txt = wechat2txt(names) | |
return not txt | |
if __name__=="__main__": | |
sys.exit(main()) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment