Last active
May 6, 2020 23:20
-
-
Save Sg4Dylan/b4b6b67d3d2cc50b3a0135c5822e5a96 to your computer and use it in GitHub Desktop.
uTorrent 自动屏蔽迅雷
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import os | |
import json | |
import time | |
import base64 | |
import requests | |
ipfilter_path = r'D:\Program Files (x86)\uTorrent\ipfilter.dat' # ipfilter.dat 路径 | |
remote_url = 'http://127.0.0.1:23333/gui/' # uTorrent 使用的 WebUI 地址 | |
username = b'admin' # WebUI 使用的用户名 | |
password = b'password' # WebUI 使用的密码 | |
block_keywords_include = ['XL0012','Xunlei','Xfplay','QQDownload'] # 客户端名称包含这些关键字即屏蔽 | |
block_keywords_startswith = ['7.'] # 客户端名称以这些关键字开始即屏蔽 | |
headers = { | |
'Authorization': f'Basic {base64.b64encode(username+b":"+password).decode()}' | |
} | |
session = requests.Session() | |
def get_token(): | |
token_url = f'{remote_url}token.html' | |
token_rsp = session.get(token_url, headers=headers).text | |
token_rsp = re.sub(r'<html><div id=\'token\' style=\'display:none;\'>', "", token_rsp) | |
return re.sub(r'</div></html>', "", token_rsp) | |
def get_download_torrents(token): | |
items_rsp = session.get(remote_url,headers=headers,params={'token': token,'list' : 1}) | |
torrents = json.loads(items_rsp.content).get('torrents','') | |
return [t[0] for t in torrents] | |
def get_black_peer_list(torrent_hash_list): | |
def check_client(client): | |
for k in block_keywords_include: | |
if k in client: | |
return True | |
for k in block_keywords_startswith: | |
if client.startswith(k): | |
return True | |
return False | |
peer_list = {} | |
for hash in torrent_hash_list: | |
params = {'token':token,'action':"getpeers",'hash':hash,'t':int(time.time())} | |
info_raw = session.get(remote_url,params=params,headers=headers) | |
info_json = json.loads(info_raw.content) | |
for peer in info_json['peers'][1]: | |
if check_client(peer[5]): | |
peer_list[peer[1]] = peer[5] | |
return peer_list | |
def reset_ipfilter(): | |
with open(ipfilter_path, 'w') as wp: | |
wp.write('') | |
print('ipfilter 重置完成') | |
def update_ipfilter(peer_list): | |
exist_ipfilter = open(ipfilter_path, 'rb').read().decode('UTF-8').split('\n') | |
updated = False | |
for k,v in peer_list.items(): | |
if k not in exist_ipfilter: | |
print(f'新目标发现:{k},使用的客户端:{v}') | |
exist_ipfilter.append(k) | |
updated = True | |
if not updated: | |
return False | |
with open(ipfilter_path, 'wb') as wp: | |
wp.write('\n'.join(list(set(exist_ipfilter))).strip().encode('UTF-8')) | |
print('ipfilter 更新完毕') | |
return True | |
def reload_ipfilter(): | |
params = {'token':token,'action':"setsetting",'s':'ipfilter.enable','v':1} | |
rsp = session.get(remote_url,params=params,headers=headers) | |
if rsp.status_code == 200: | |
print('ipfilter 重载成功') | |
else: | |
print('ipfilter 重载失败') | |
reset_ipfilter() | |
token = get_token() | |
print(f'获取 Token:{token}') | |
while True: | |
peer_list = get_black_peer_list(get_download_torrents(token)) | |
if update_ipfilter(peer_list): | |
reload_ipfilter() | |
time.sleep(30) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment