Skip to content

Instantly share code, notes, and snippets.

@MisakaMikoto-35c5
Created December 8, 2019 13:03
Show Gist options
  • Save MisakaMikoto-35c5/89f19bdb0717e753e9f4ccfc6903e68a to your computer and use it in GitHub Desktop.
Save MisakaMikoto-35c5/89f19bdb0717e753e9f4ccfc6903e68a to your computer and use it in GitHub Desktop.
筛选运营商
#!/usr/bin/python3
# Data Source: https://iptoasn.com/
# Format: range_start range_end AS_number country_code AS_description
# Split: \t
def data_reader(filename):
filepipe = open(filename, 'r', encoding='utf-8')
data_str = filepipe.read()
filepipe.close()
data_line = data_str.split('\n')
result = []
for data in data_line:
if data == '':
continue
data_array = data.split('\t')
result.append({
'range_start': data_array[0],
'range_end': data_array[1],
'asn': data_array[2],
'country': data_array[3],
'as_description': data_array[4]
})
return result
def country_filter(country, data):
result = []
for i in data:
if i['country'] == country:
result.append(i)
return result
def china_isp_filter(data):
checked_asn = [
'4611', # CNNIC Member
'4859', # 中国国家金融网
'9306', # 中国国家金融网
'7640', # 中信集团
'7641', # 广电网
'9298', # 光环讯通
'9308', # 世纪互联(21viamail)
'9802', # 世纪互联(21viamail)
'9391', # 广东广电
'9814', # FibrLINK
'9535', # INTERNET-SOLUTION -HK
'9801', # 263
'9803', # 263
'9807', # 国家统计局
'9809', # 深圳南凌
'9810', # 北京时代网星科技有限公司
'9811', # 国研科技(DRCNET)
'9812', # 东方有线(广电网)
'9939', # 安莱 / 第一线 (DYXnet)
'9805', # 西门子中国
'9401', # National Library of China(教育网)
'4847',
'139018',
'139007',
'137702',
'139203'
]
china_telecom = [
'4847',
'139018',
'137702',
'139203'
]
china_unicom = [
'139007'
]
china_mobile = []
cernet = [
'9401'
]
drpeng = []
others = [
'4611', # CNNIC Member
'4859', # 中国国家金融网
'9306', # 中国国家金融网
'7640', # 中信集团
'7641', # 广电网
'9298', # 光环讯通
'9308', # 世纪互联(21viamail)
'9802', # 世纪互联(21viamail)
'9391', # 广东广电
'9814', # FibrLINK
'9535', # INTERNET-SOLUTION -HK
'9801', # 263
'9803', # 263
'9807', # 国家统计局
'9809', # 深圳南凌
'9810', # 北京时代网星科技有限公司
'9811', # 国研科技(DRCNET)
'9812', # 东方有线(广电网)
'9939', # 安莱 / 第一线 (DYXnet)
'9805', # 西门子中国
]
for i in data:
asn = i['asn']
as_description = i['as_description'].lower()
if asn in checked_asn:
continue
checked_asn.append(asn)
# 判断移动
if (
as_description.find('cmnet') != -1 or
as_description.find('crnet') != -1 or
as_description.find('lingtong') != -1 or
as_description.find('china') != -1 and
as_description.find('mobile') != -1 or
as_description.find('china') != -1 and
as_description.find('tietong') != -1 or
as_description.find('china') != -1 and
as_description.find('railway') != -1
):
china_mobile.append(asn)
continue
# 判断联通
if (
as_description.find('china169') != -1 or
as_description.find('shenzhou greatwall') != -1 or
as_description.find('china') != -1 and
as_description.find('unicom') != -1
):
china_unicom.append(asn)
continue
# 判断电信
if (
as_description.find('china') != -1 and
as_description.find('telecom') != -1 or
as_description.find('chinanet') != -1 or
as_description.find('ct-') != -1
):
china_telecom.append(asn)
continue
# 判断教育网 / 科技网
if (
as_description.find('cernet') != -1 or
as_description.find('cstnet') != -1 or
as_description.find('stn-') != -1 or
as_description.find('test platform') != -1 or
as_description.find('education') != -1
):
cernet.append(asn)
continue
# 判断鹏博士 长城宽带 电信通
if (
as_description.find('dxt') != -1 or
as_description.find('gwbn') != -1 or
as_description.find('peng') != -1 or
as_description.find('guoxin') != -1
):
drpeng.append(asn)
continue
#print('Warn: Unknown AS Description: ' + as_description)
others.append(asn)
return {
'china_telecom': china_telecom,
'china_unicom': china_unicom,
'china_mobile': china_mobile,
'cernet': cernet,
'others': others
}
def get_16bit_empty_array():
return [[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]]
def build_route_command(asn_list, ip_range, command_template):
result = ''
route_prefixes = get_route_by_asn(asn_list, ip_range)
print('Prefixes: ' + str(len(route_prefixes)))
route_prefixes = merge_network_prefix(route_prefixes)
print('Minified prefixes: '+ str(len(route_prefixes)))
for i in route_prefixes:
route_cmd = command_template.replace('$(dst)', i['range_start'])
route_cmd = route_cmd.replace('$(prefix)', str(i['length']))
result += route_cmd
return result
def get_ip_number(ip):
ip_array = ip.split('.')
return (
(int(ip_array[0]) << 24) +
(int(ip_array[1]) << 16) +
(int(ip_array[2]) << 8) +
int(ip_array[3])
)
def merge_network_prefix(route_prefixes):
result = []
route_prefixes = sort_ip_address(route_prefixes)
range_start_ip_number = 0
range_start_ip = None
range_end_ip_number = 0
range_end_ip = None
for i in route_prefixes:
current_ip_number = get_ip_number(i['range_start'])
now_ip_is_next_ip = (
range_end_ip_number + 1 == current_ip_number or
range_start_ip_number + 1 == current_ip_number
)
if range_start_ip == None:
range_start_ip = i
range_start_ip_number = get_ip_number(i['range_end'])
continue
if now_ip_is_next_ip:
range_end_ip = i
range_end_ip_number = get_ip_number(i['range_end'])
continue
elif range_end_ip == None:
if now_ip_is_next_ip:
range_end_ip = i
range_end_ip_number = get_ip_number(i['range_end'])
continue
else:
end_ip_number = get_ip_number(range_start_ip['range_end'])
end_ip = range_start_ip
result += calc_cidr(range_start_ip['range_start'], end_ip['range_end'])
range_start_ip_number = get_ip_number(i['range_end'])
range_start_ip = i
range_end_ip_number = 0
range_end_ip = None
continue
end_ip_number = get_ip_number(range_end_ip['range_end'])
end_ip = range_end_ip
result += calc_cidr(range_start_ip['range_start'], end_ip['range_end'])
range_start_ip_number = get_ip_number(i['range_end'])
range_start_ip = i
range_end_ip_number = 0
range_end_ip = None
return result
def calc_cidr(first_ip, last_ip):
result = []
current_first_ip = first_ip
current_first_ip_number = get_ip_number(first_ip)
current_last_ip = last_ip
current_last_ip_number = get_ip_number(last_ip)
need_continue = True
__times = 0
while need_continue:
__times += 1
if __times >= 30:
raise Exception('Too many subnets. Please check the source code.')
xor_suffix_length = current_first_ip_number ^ current_last_ip_number
bits = 0
times = 0
while times < 32:
if ((xor_suffix_length >> times) & 1) == 0:
cidr = 32 - times
padding_bits = xor_suffix_length >> times
padding = count_of_bits(padding_bits) - 1
if padding > 0:
cidr = cidr - padding
suffix_length = 32 - cidr + 1
padding_check_path = 0
while padding_check_path < suffix_length:
if (
(current_first_ip_number
>> padding_check_path) & 1
== 1
):
cidr = 32 - padding_check_path
break
padding_check_path += 1
last_ip = calc_last_ip(current_first_ip, cidr)
result.append({
'range_start': current_first_ip,
'range_end': last_ip,
'length': cidr
})
if int(last_ip.split('.')[0]) >= 240:
raise Exception('IP Address is out of range.')
if last_ip != current_last_ip:
current_first_ip_number = get_ip_number(last_ip) + 1
current_first_ip = convert_number_to_ip(current_first_ip_number)
break
need_continue = False
THE_MAGIC_LOOP_MUST_TO_BE_STOP = True
break
times += 1
bits = (bits << 1) + 1
return result
def convert_number_to_ip(ip_number):
ip = (
str(ip_number >> 24) + '.' +
str((ip_number >> 16) & 255) + '.' +
str((ip_number >> 8) & 255) + '.' +
str(ip_number & 255)
)
return ip
def calc_last_ip(ip, cidr):
cdir_append = 0
times = 0
while times < 32 - cidr:
times += 1
cdir_append = (cdir_append << 1) + 1
ip_number = get_ip_number(ip) + cdir_append
return convert_number_to_ip(ip_number)
def count_of_one(n):
count = 0
n = n & 0xffffffff
while n != 0:
if (n & 1) == 1:
count += 1
n = n >> 1
return count
def count_of_bits(n):
count = 0
n = n & 0xffffffff
while n != 0:
count += 1
n = n >> 1
return count
def count_of_zero(n):
count = 0
n = n & 0xffffffff
while n != 0:
if (n & 1) != 1:
count += 1
n = n >> 1
return count
def get_route_by_asn(asn_list, ip_range):
route_prefixes = []
for i in ip_range:
if i['asn'] in asn_list:
route_prefixes.append(i)
return route_prefixes
def sort_ip_address(ip_range):
def sort_byte(path, ip_list):
ips_index = get_16bit_empty_array()
for i in ip_list:
ip = i['range_start'].split('.')
b = ip[path]
b = int(b) & 15
ips_index[b].append(i)
ips_prefix = ips_index
ips_index = get_16bit_empty_array()
for suffix in ips_prefix:
for i in suffix:
ip = i['range_start'].split('.')
b = ip[path]
b = int(b) >> 4
ips_index[b].append(i)
result = []
for i in ips_index:
for ii in i:
result.append(ii)
return result
result = sort_byte(3, ip_range)
result = sort_byte(2, ip_range)
result = sort_byte(1, ip_range)
result = sort_byte(0, ip_range)
return result
if __name__ == '__main__':
asn_data = data_reader('ip2asn-v4.tsv')
china_ip_range = country_filter('CN', asn_data)
china_asn_list = china_isp_filter(china_ip_range)
route_cmd = build_route_command(
china_asn_list['china_telecom'],
asn_data,
'ip route add $(dst)/$(prefix) dev pppoe-ctcn2 metric 10\n'
)
route_cmd_file_pipe = open('./route_cmd.txt', 'w')
route_cmd_file_pipe.write(route_cmd)
route_cmd_file_pipe.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment