-
-
Save MitsuhaMiyamizu/f1e408ef12ab5d2e4f2710616e242bba to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- encoding: utf-8 -*- | |
# Author: Epix | |
# | |
# Ban Xunlei IP for deluge | |
# This script will check deluge connected peers per 30 seconds. | |
# When an IP with UA in `BAN_PATTERNS` is found, the IP will be added to `BLOCK_LIST_FILE`. | |
# However if the IP is uploading or reporting 100% progress, it will say "WTF?" | |
import logging | |
import re | |
from random import randint | |
from time import sleep | |
import coloredlogs | |
from requests import session | |
logger = logging.getLogger(__name__) | |
coloredlogs.install(level='DEBUG', logger=logger) | |
# blocklist file, serve it in HTTP way and add its URL to deluge preference | |
BLOCK_LIST_FILE = '/mnt/tmp/scripts/blocklist.txt' | |
DELUGE_API_URL = 'http://nas.exz.me:8112/json' | |
DELUGE_PASSWORD = '' | |
# UA patterns to ban | |
BAN_PATTERNS = [ | |
re.compile(r'Xunlei .*'), | |
re.compile(r'7\.10\.\d+\.\d+'), | |
re.compile(r'7\.9\.\d+\.\d+'), | |
re.compile(r'-XL0012-.*'), | |
] | |
s = session() | |
def call(method: str, params: list): | |
_id = randint(1, 10000) | |
r = s.post(DELUGE_API_URL, json={'id': _id, 'method': method, 'params': params}) | |
j = r.json() | |
assert j['id'] == _id | |
assert j['error'] is None | |
return j['result'] | |
def login(): | |
login_r = call('auth.login', ['']) | |
assert login_r | |
connected = call('web.connected', []) | |
if not connected: | |
hosts = call('web.get_hosts', []) | |
assert len(hosts) > 0, 'No host detected' | |
host_id = hosts[0][0] | |
host_status = call('web.get_host_status', [host_id]) | |
assert host_status[3] == 'Online', 'Host not online' | |
call('web.connect', [host_id]) | |
def check(): | |
ui = call('web.update_ui', [['name'], {'state': 'Active'}]) | |
torrents = ui['torrents'] | |
if torrents is None: | |
logger.error('No torrents found!') | |
return | |
bad_ips = set() | |
for torrent in torrents.keys(): | |
peers = call('web.get_torrent_status', [torrent, ['peers']])['peers'] | |
for peer in peers: | |
for pattern in BAN_PATTERNS: | |
client = peer['client'] | |
client = client.encode('ascii', 'ignore').decode('ascii').replace('\n', '').replace('\r', '') | |
if pattern.match(client): | |
down_speed = peer['down_speed'] | |
progress = peer['progress'] | |
if down_speed > 0 or progress == 100: | |
logger.error('WTF client {client} is uploading at {speed} Bytes/s'.format(client=client, | |
speed=down_speed)) | |
else: | |
bad_ip = peer['ip'].split(':')[0] | |
bad_ips.add(bad_ip) | |
logger.warning( | |
'Found bad client {client} from IP {bad_ip}'.format(client=client, bad_ip=bad_ip)) | |
break | |
if bad_ips: | |
logger.warning('Found {count} bad ip'.format(count=len(bad_ips))) | |
with open(BLOCK_LIST_FILE, 'a') as black_list: | |
for bad_ip in bad_ips: | |
black_list.write('{ip}-{ip}\n'.format(ip=bad_ip)) | |
black_list.flush() | |
call('blocklist.check_import', []) | |
else: | |
pass | |
# logger.info('Everyone is good') | |
def main(): | |
while True: | |
try: | |
login() | |
break | |
except Exception as e: | |
logger.error(e) | |
sleep(30) | |
while True: | |
check() | |
sleep(30) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment