Skip to content

Instantly share code, notes, and snippets.

@Sg4Dylan
Last active May 6, 2020 23:20
Show Gist options
  • Save Sg4Dylan/b4b6b67d3d2cc50b3a0135c5822e5a96 to your computer and use it in GitHub Desktop.
Save Sg4Dylan/b4b6b67d3d2cc50b3a0135c5822e5a96 to your computer and use it in GitHub Desktop.
uTorrent 自动屏蔽迅雷
import re
import os
import json
import time
import base64
import requests
ipfilter_path = r'D:\Program Files (x86)\uTorrent\ipfilter.dat' # ipfilter.dat 路径
remote_url = 'http://127.0.0.1:23333/gui/' # uTorrent 使用的 WebUI 地址
username = b'admin' # WebUI 使用的用户名
password = b'password' # WebUI 使用的密码
block_keywords_include = ['XL0012','Xunlei','Xfplay','QQDownload'] # 客户端名称包含这些关键字即屏蔽
block_keywords_startswith = ['7.'] # 客户端名称以这些关键字开始即屏蔽
headers = {
'Authorization': f'Basic {base64.b64encode(username+b":"+password).decode()}'
}
session = requests.Session()
def get_token():
token_url = f'{remote_url}token.html'
token_rsp = session.get(token_url, headers=headers).text
token_rsp = re.sub(r'<html><div id=\'token\' style=\'display:none;\'>', "", token_rsp)
return re.sub(r'</div></html>', "", token_rsp)
def get_download_torrents(token):
items_rsp = session.get(remote_url,headers=headers,params={'token': token,'list' : 1})
torrents = json.loads(items_rsp.content).get('torrents','')
return [t[0] for t in torrents]
def get_black_peer_list(torrent_hash_list):
def check_client(client):
for k in block_keywords_include:
if k in client:
return True
for k in block_keywords_startswith:
if client.startswith(k):
return True
return False
peer_list = {}
for hash in torrent_hash_list:
params = {'token':token,'action':"getpeers",'hash':hash,'t':int(time.time())}
info_raw = session.get(remote_url,params=params,headers=headers)
info_json = json.loads(info_raw.content)
for peer in info_json['peers'][1]:
if check_client(peer[5]):
peer_list[peer[1]] = peer[5]
return peer_list
def reset_ipfilter():
with open(ipfilter_path, 'w') as wp:
wp.write('')
print('ipfilter 重置完成')
def update_ipfilter(peer_list):
exist_ipfilter = open(ipfilter_path, 'rb').read().decode('UTF-8').split('\n')
updated = False
for k,v in peer_list.items():
if k not in exist_ipfilter:
print(f'新目标发现:{k},使用的客户端:{v}')
exist_ipfilter.append(k)
updated = True
if not updated:
return False
with open(ipfilter_path, 'wb') as wp:
wp.write('\n'.join(list(set(exist_ipfilter))).strip().encode('UTF-8'))
print('ipfilter 更新完毕')
return True
def reload_ipfilter():
params = {'token':token,'action':"setsetting",'s':'ipfilter.enable','v':1}
rsp = session.get(remote_url,params=params,headers=headers)
if rsp.status_code == 200:
print('ipfilter 重载成功')
else:
print('ipfilter 重载失败')
reset_ipfilter()
token = get_token()
print(f'获取 Token:{token}')
while True:
peer_list = get_black_peer_list(get_download_torrents(token))
if update_ipfilter(peer_list):
reload_ipfilter()
time.sleep(30)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment