Skip to content

Instantly share code, notes, and snippets.

@qiuyuzhou
Last active December 30, 2018 15:22
Show Gist options
  • Save qiuyuzhou/7843678b8f6178418f5f3c12b38fedef to your computer and use it in GitHub Desktop.
Save qiuyuzhou/7843678b8f6178418f5f3c12b38fedef to your computer and use it in GitHub Desktop.
Download latest gfwlist.txt then convert it to potatso rules config file. Python3
# -*- coding: utf-8
__author__ = 'Charlie Qiu <[email protected]>'
import logging
import urllib.parse
import base64
import os
import requests
import yaml
from django.conf import settings
from django.core.management.base import BaseCommand
gfwlist_url = "https://raw.githubusercontent.com/gfwlist/gfwlist/master/gfwlist.txt"
def get_data_from_file(file_path):
with open(file_path, 'r') as f:
builtin_rules = f.read()
return builtin_rules
def decode_gfwlist(content):
# decode base64 if have to
try:
if '.' in content:
raise Exception()
return base64.b64decode(content.encode('utf-8')).decode('utf-8')
except:
return content
def combine_lists(content, user_rule=None):
# gfwlist = get_data_from_file('resources/builtin.txt').splitlines(False)
gfwlist = content.splitlines(False)
if user_rule:
gfwlist.extend(user_rule.splitlines(False))
return gfwlist
def get_hostname(something):
try:
# quite enough for GFW
if not something.startswith('http:'):
something = 'http://' + something
r = urllib.parse.urlparse(something)
return r.hostname
except Exception as e:
logging.error(e)
return None
def add_domain_to_set(s, something):
hostname = get_hostname(something)
if hostname is not None:
s.add(hostname)
def parse_gfwlist(gfwlist):
domains = set()
domains_suffix = set()
for line in gfwlist:
suffix = False
if line.find('.*') >= 0:
continue
elif line.find('*') >= 0:
line = line.replace('*', '/')
if line.startswith('||'):
suffix = True
line = line.lstrip('||')
elif line.startswith('|'):
line = line.lstrip('|')
elif line.startswith('.'):
line = line.lstrip('.')
if line.startswith('!'):
continue
elif line.startswith('['):
continue
elif line.startswith('@'):
# ignore white list
continue
if suffix:
add_domain_to_set(domains_suffix, line)
else:
add_domain_to_set(domains, line)
return domains, domains_suffix
def reduce_domains(domains):
# reduce 'www.google.com' to 'google.com'
# remove invalid domains
tld_content = get_data_from_file(settings.PROJECT_ROOT + "/tld.txt")
tlds = set(tld_content.splitlines(False))
new_domains = set()
for domain in domains:
domain_parts = domain.split('.')
last_root_domain = None
for i in range(0, len(domain_parts)):
root_domain = '.'.join(domain_parts[len(domain_parts) - i - 1:])
if i == 0:
if not tlds.__contains__(root_domain):
# root_domain is not a valid tld
break
last_root_domain = root_domain
if tlds.__contains__(root_domain):
continue
else:
break
if last_root_domain is not None:
new_domains.add(last_root_domain)
return new_domains
class Command(BaseCommand):
help = 'Sync expired accounts status to redis.'
def add_arguments(self, parser):
pass
def handle(self, *args, **options):
content = self.download_gfwlist()
content = decode_gfwlist(content)
gfwlist = combine_lists(content)
domains, domains_suffix = parse_gfwlist(gfwlist)
domains = reduce_domains(domains)
domains_suffix = reduce_domains(domains_suffix)
rules = []
for domain in domains:
rules.append("DOMAIN, {}, PROXY".format(domain))
for domain in domains_suffix:
rules.append("DOMAIN-SUFFIX, {}, PROXY".format(domain))
cn_whitelist = [
'GEOIP, CN, DIRECT',
'URL, .cn, DIRECT',
]
local_whitelist = [
'IP-CIDR, 127.0.0.0/8, DIRECT',
'IP-CIDR, 172.16.0.0/12, DIRECT',
'IP-CIDR, 10.0.0.0/8, DIRECT',
'IP-CIDR, 192.168.0.0/16, DIRECT',
]
result = {
'ruleSets': [
{'name': 'GFW List', 'rules': rules},
{'name': 'CN White List', 'rules': cn_whitelist},
{'name': 'Local White List', 'rules': local_whitelist},
]
}
out_content = yaml.dump(result)
rules_file_path = settings.PROJECT_ROOT + "/potatso_rules.yaml"
f = open(rules_file_path + ".tmp", 'w')
f.write(out_content)
f.flush()
os.fsync(f.fileno())
f.close()
os.rename(rules_file_path + ".tmp", rules_file_path)
@staticmethod
def download_gfwlist():
r = requests.get(gfwlist_url)
if r.status_code != 200:
raise Exception('Download gfw list failed. {}'.format(r.status_code))
return r.text
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment