Skip to content

Instantly share code, notes, and snippets.

@TransparentLC
Last active August 20, 2024 06:52
Show Gist options
  • Save TransparentLC/b6b07a8db34c4a6006179a6960e15e69 to your computer and use it in GitHub Desktop.
Save TransparentLC/b6b07a8db34c4a6006179a6960e15e69 to your computer and use it in GitHub Desktop.
import asyncio
import httpx
import ipaddress
import orjson
import re
import sys
import urllib.parse
from datetime import datetime, timezone, timedelta
async def hookRaiseForStatus(r: httpx.Response):
r.raise_for_status()
s = httpx.AsyncClient(
http2=True,
timeout=30,
follow_redirects=True,
event_hooks={
'response': [
hookRaiseForStatus,
],
},
)
def cleanText(content: str):
content = orjson.loads(content)
content = content['commands'][0]['mutations'][0]['s']
content = urllib.parse.unquote(content)
content = re.sub(r'%u([\dA-F]{1,4})', lambda m: chr(int(m[1], 16)), content)
content = re.sub(r'\\x([\dA-F]{1,2})', lambda m: chr(int(m[1], 16)), content)
content = content.replace('\r', '\n')
content = content.replace('\x07', '')
content = content.replace('\x06', '')
return content
async def getGeneralList():
result = '#\n# Source: https://docs.qq.com/doc/DQnJBTGJjSFZBR2JW\n'
r = await s.get(
'https://docs.qq.com/dop-api/opendoc',
params={'id': 'DQnJBTGJjSFZBR2JW'},
headers={'Referer': 'https://docs.qq.com/'},
)
content = cleanText(urllib.parse.unquote(max(r.text.splitlines(), key=len)))
if m := re.search(r'规则更新于(\d+)年(\d+)月(\d+)日(\d+)时', content):
d = datetime(int(m.group(1)), int(m.group(2)), int(m.group(3)), int(m.group(4)), tzinfo=timezone(timedelta(hours=8))).astimezone(timezone.utc)
result += f'# Update: {d.isoformat().replace("+00:00", "Z")}\n#\n'
if m := re.search(r'\x1e\x1c([\da-f:\.\-\n]+?)\x1d', content):
result += m.group(1)
return result
async def getTransactionList():
result = '#\n# Source: https://docs.qq.com/doc/DQmpBZmNMRFpiZUhZ\n'
r = await s.get(
'https://docs.qq.com/dop-api/opendoc',
params={'id': 'DQmpBZmNMRFpiZUhZ'},
headers={'Referer': 'https://docs.qq.com/'},
)
content = cleanText(urllib.parse.unquote(max(r.text.splitlines(), key=len)))
if m := re.search(r'规则更新于(\d+)年(\d+)月(\d+)日(\d+)时', content):
d = datetime(int(m.group(1)), int(m.group(2)), int(m.group(3)), int(m.group(4)), tzinfo=timezone(timedelta(hours=8))).astimezone(timezone.utc)
result += f'# Update: {d.isoformat().replace("+00:00", "Z")}\n#\n'
if m := re.search(r'\x1e\x1c([\da-f:\.\-\n]+?)\x1d', content):
result += m.group(1)
return result
async def getBTNList():
result = '#\n# Source: https://raw.githubusercontent.com/PBH-BTN/BTN-Collected-Rules/main/combine/all.txt\n'
r = await s.get('https://api.github.com/repos/PBH-BTN/BTN-Collected-Rules/commits', params={'path': 'combine/all.txt'})
r = await s.get(r.json()[0]['url'])
result += f'# Update: {r.json()["commit"]["author"]["date"]}\n# Note: IPv6 single addresses are transformed into /64 CIDRs.\n#\n'
r = await s.get('https://raw.githubusercontent.com/PBH-BTN/BTN-Collected-Rules/main/combine/all.txt')
t = set()
for line in r.text.splitlines():
line = line.strip()
if not line or line.startswith('#'):
continue
try:
address = ipaddress.ip_address(line)
if isinstance(address, ipaddress.IPv6Address):
network = ipaddress.IPv6Network(address).supernet(new_prefix=64)
rule = f'{network[0]}-{network[-1]}\n'
else:
rule = f'{address}-{address}\n'
except ValueError:
try:
network = ipaddress.ip_network(line)
rule = f'{network[0]}-{network[-1]}\n'
except ValueError:
continue
if rule in t:
continue
t.add(rule)
result += rule
return result
async def main():
blacklist = ''.join(await asyncio.gather(
getGeneralList(),
getTransactionList(),
getBTNList(),
))
blacklistCIDR = ''
for line in blacklist.splitlines():
line = line.strip()
if not line or line.startswith('#'):
blacklistCIDR += line + '\n'
continue
if m := re.match(r'([\da-f:\.]+)-([\da-f:\.]+)', line):
ipRangeStart = ipaddress.ip_address(m.group(1))
ipRangeEnd = ipaddress.ip_address(m.group(2))
else:
continue
if ipRangeStart == ipRangeEnd:
blacklistCIDR += str(ipRangeStart) + '\n'
else:
for cidr in ipaddress.summarize_address_range(ipRangeStart, ipRangeEnd):
blacklistCIDR += str(cidr) + '\n'
with open(sys.argv[1], 'w', encoding='utf-8') as f:
f.write('# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -\n')
f.write(f'# 爬取时间:{datetime.now().replace(microsecond=0).astimezone(timezone.utc).isoformat().replace("+00:00", "Z")}\n')
f.write('# 爬取用源代码:https://gist.github.com/TransparentLC/b6b07a8db34c4a6006179a6960e15e69\n')
f.write('# CIDR版:https://i.akarin.dev/bittorrent-ip-blacklist-cidr.txt\n')
f.write('#\n')
f.write('# qBittorrent用户可以使用以下命令自动下载和加载IP黑名单(需要开启Web UI):\n')
f.write('# curl --http3 --compressed --output /path/to/your/ip-filter.dat https://i.akarin.dev/bittorrent-ip-blacklist.txt\n')
f.write('# curl --http3 --compressed --data "json={\\"ip_filter_enabled\\":false}" http://your.web.ui/api/v2/app/setPreferences\n')
f.write('# curl --http3 --compressed --data "json={\\"ip_filter_enabled\\":true}" http://your.web.ui/api/v2/app/setPreferences\n')
f.write('#\n')
f.write('# 请在这里反馈你遇到的无限下载IP地址(由Bilibili@Damn☆You整理和维护):\n')
f.write('# https://t.bilibili.com/886509991839662137\n')
f.write('# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -\n')
f.write(blacklist)
with open(sys.argv[2], 'w', encoding='utf-8') as f:
f.write('# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -\n')
f.write(f'# 爬取时间:{datetime.now().replace(microsecond=0).astimezone(timezone.utc).isoformat().replace("+00:00", "Z")}\n')
f.write('# 爬取用源代码:https://gist.github.com/TransparentLC/b6b07a8db34c4a6006179a6960e15e69\n')
f.write('# IP范围版:https://i.akarin.dev/bittorrent-ip-blacklist.txt\n')
f.write('#\n')
f.write('# 本规则可以配合qBittorrent-ClientBlocker的ipBlockListURL选项使用。\n')
f.write('#\n')
f.write('# 请在这里反馈你遇到的无限下载IP地址(由Bilibili@Damn☆You整理和维护):\n')
f.write('# https://t.bilibili.com/886509991839662137\n')
f.write('# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -\n')
f.write(blacklistCIDR)
if __name__ == '__main__':
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment