Skip to content

Instantly share code, notes, and snippets.

@exzhawk
Last active November 11, 2023 10:01
Show Gist options
  • Save exzhawk/5c30b532f01fd7058f32ac976503f0c1 to your computer and use it in GitHub Desktop.
Save exzhawk/5c30b532f01fd7058f32ac976503f0c1 to your computer and use it in GitHub Desktop.
# -*- encoding: utf-8 -*-
# Author: Epix
#
# Ban Xunlei IP for deluge
# This script will check deluge connected peers per 30 seconds.
# When an IP with UA in `BAN_PATTERNS` is found, the IP will be added to `BLOCK_LIST_FILE`.
# However if the IP is uploading or reporting 100% progress, it will say "WTF?"
import logging
import re
from random import randint
from time import sleep
import coloredlogs
from requests import session
logger = logging.getLogger(__name__)
coloredlogs.install(level='DEBUG', logger=logger)
# blocklist file, serve it in HTTP way and add its URL to deluge preference
BLOCK_LIST_FILE = '/mnt/tmp/scripts/blocklist.txt'
DELUGE_API_URL = 'http://nas.exz.me:8112/json'
DELUGE_PASSWORD = ''
# UA patterns to ban
BAN_PATTERNS = [
re.compile(r'Xunlei .*'),
re.compile(r'7\.10\.\d+\.\d+'),
re.compile(r'7\.9\.\d+\.\d+'),
re.compile(r'-XL0012-.*'),
]
s = session()
def call(method: str, params: list):
_id = randint(1, 10000)
r = s.post(DELUGE_API_URL, json={'id': _id, 'method': method, 'params': params})
j = r.json()
assert j['id'] == _id
assert j['error'] is None
return j['result']
def login():
login_r = call('auth.login', [''])
assert login_r
connected = call('web.connected', [])
if not connected:
hosts = call('web.get_hosts', [])
assert len(hosts) > 0, 'No host detected'
host_id = hosts[0][0]
host_status = call('web.get_host_status', [host_id])
assert host_status[1] == 'Online', 'Host not online'
call('web.connect', [host_id])
def check():
ui = call('web.update_ui', [['name'], {'state': 'Active'}])
torrents = ui['torrents']
if torrents is None:
logger.error('No torrents found!')
return
bad_ips = set()
for torrent in torrents.keys():
peers = call('web.get_torrent_status', [torrent, ['peers']])['peers']
for peer in peers:
for pattern in BAN_PATTERNS:
client = peer['client']
client = client.encode('ascii', 'ignore').decode('ascii').replace('\n', '').replace('\r', '')
if pattern.match(client):
down_speed = peer['down_speed']
progress = peer['progress']
if down_speed > 0 or progress == 100:
logger.error('WTF client {client} is uploading at {speed} Bytes/s'.format(client=client,
speed=down_speed))
else:
bad_ip = peer['ip'].split(':')[0]
bad_ips.add(bad_ip)
logger.warning(
'Found bad client {client} from IP {bad_ip}'.format(client=client, bad_ip=bad_ip))
break
if bad_ips:
logger.warning('Found {count} bad ip'.format(count=len(bad_ips)))
with open(BLOCK_LIST_FILE, 'a') as black_list:
for bad_ip in bad_ips:
black_list.write('{ip}-{ip}\n'.format(ip=bad_ip))
black_list.flush()
call('blocklist.check_import', [])
else:
pass
# logger.info('Everyone is good')
def main():
while True:
try:
login()
break
except Exception as e:
logger.error(e)
sleep(30)
while True:
check()
sleep(30)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment