Created March 27, 2013 19:08
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# For the people of Smubworld!
import urllib2
import os
import time
import getopt
import sys
from math import floor, log
import sqlite3
import hashlib
from xml.dom import minidom
__program__ = 'blockfinder'
__url__ = ''
___author__ = 'Jacob Appelbaum <[email protected]>, dave b. <[email protected]>'
__copyright__ = 'Copyright (c) 2010'
__license__ = 'See LICENSE for licensing information'
__version__ = '3.1415'
import GeoIP
import gzip
except ImportError:
GeoIP = None
from future import antigravity
except ImportError:
antigravity = None
def update_progress_bar(percent_done, caption=""):
"""Write a progress bar to the console"""
rows, columns = map(int, os.popen('stty size', 'r').read().split())
width = columns - 4 - len(caption)
sys.stdout.write("[%s>%s] %s\x1b[G" % (
"=" * int(percent_done*width),
"." * (width - int(percent_done * width)),
caption) )
# XXX TODO:allow the use of a proxy
# Set up a proper Request object, set the user agent and if desired, a proxy
def fetch(url, useragent):
""" Fetch (with progress meter) and return the contents of a url. """
req = urllib2.Request(url)
req.add_header('User-agent', useragent)
#req.set_proxy(host, type)
fetcher = urllib2.urlopen(req)
length_header = fetcher.headers.get("content-length")
if length_header == None:
raise Exception("Missing content-length header in reply from server.")
length = int(length_header)
print "Fetching ", str (round(float(length/1024),2)) , " kilobytes"
ret = ""
t_start = time.time()
while True:
t_delta = time.time() - t_start
float(len(ret)) / length,
"%.2f K/s" % (len(ret) / 1024 / t_delta) )
tmp =
if len(tmp) == 0:
if len(ret) != length:
raise Exception("Expected %s bytes, only received %s" % (
len(ret), length ))
print ""
return ret
ret += tmp
def cache_delegation(cache_dir, delegation_url, useragent):
""" Attempt to cache the contents of a delegation url in our cache dir. """
except OSError, e:
if e.errno == 2:
if verbose:
print "Initializing the cache directory..."
raise e
delegation = ""
print "Fetching " + delegation_url
delegation = fetch(delegation_url,useragent)
tmp = delegation_url.split('/')
delegation_file = str(cache_dir) + str(tmp[-1])
f = open(delegation_file, 'w')
return True
except Exception, e:
print repr(e)
return False
def cache_is_dated(cache_dir, cached_files):
""" Returns True if the mtime of any files in cache dir is > 24hrs."""
except OSError, e:
print "\nDid you initialize the cache directory?\n"
raise e
for file in cached_files:
fstat = os.stat(cache_dir + file)
if (time.time() - fstat.st_mtime) > 86400:
return True
return False
def create_sql_database(cache_dir):
""" Creates a new sqlite database.
If there is a previous sqlite database it will be deleted. """
os.remove(cache_dir +"sqlitedb")
conn = sqlite3.connect(cache_dir +"sqlitedb")
cursor = conn.cursor()
cursor.execute("""create table asn(registry text, cc text, start text, value INTEGER, date text, status text)""")
cursor.execute("""create table ipv4(registry text, cc text, start text, value INTEGER, date text, status text)""")
cursor.execute("""create table ipv6(registry text, cc text, start text, value INTEGER, date text, status text)""")
def insert_into_sql_database(delegations,cache_dir):
""" inserts delegation information into the sqlite database"""
conn = sqlite3.connect(cache_dir +"sqlitedb")
cursor = conn.cursor()
table = ""
for delegation in delegations:
for entry in delegation:
registry = str(entry['registry'])
if not registry.isdigit() and str (entry['cc']) !="*":
if entry['type'] == "ipv6":
table = "ipv6"
if entry['type'] == "ipv4":
table = "ipv4"
if entry['type'] == "asn":
table = "asn"
text = """INSERT INTO """ + table + """ ( registry, cc, start, value, date,status) VALUES (?,?,?,?,?,?)"""
data = [entry['registry'], entry['cc'], entry['start'], entry['value'], entry['date'], entry['status'] ]
cursor.execute(text, data )
def get_total_delegations_from_db(cache_dir):
""" Returns the total count of the number of entries in the ipv4, ipv6 and asn table """
conn = sqlite3.connect(cache_dir +"sqlitedb")
cursor = conn.cursor()
count = 0
table_names = ["ipv4", "ipv6", "asn"]
for table in table_names:
cursor.execute("""select count (*) from """ + table)
count += int (cursor.fetchone()[0] )
return count
def get_possible_match_entries(cc,cache_dir):
""" Get the count of 'possible' matching delegation entries"""
conn = sqlite3.connect(cache_dir +"sqlitedb")
cursor = conn.cursor()
count = 0
table_names =["ipv4", "ipv6", "asn"]
for table in table_names:
cursor.execute("""select count (*) from """ + table + """ where cc=?""",cc)
count += int (cursor.fetchone()[0] )
return count
def use_sql_database(request, cc, cache_dir):
""" Use the sqlite database that is created after fetching delegations
to output information for a given request """
conn = sqlite3.connect(cache_dir + "sqlitedb")
cursor = conn.cursor()
if verbose:
print "We have %d entries in our delegation cache." %get_total_delegations_from_db(cache_dir)
text ="""select start,value from """ + request + """ where cc=?"""
cc = (cc,)
for row in cursor:
if request == "ipv4":
print str(row[0]) + "/" + str(calculate_ipv4_subnet(int(row[1])))
elif request == "ipv6":
print str(row[0]) + "/" + str(int(row[1]))
print str(int(row[0]))
if verbose:
print "We found %d possible entries in our delegation cache." % get_possible_match_entries(cc, cache_dir)
cursor.execute("""select count(*) from """ + request + """ where cc=?""", cc )
print "We found %d matching entries in our delegation cache." % int (cursor.fetchone()[0] )
def get_md5_from_delegation_md5_file(cache_dir, delegation_file):
""" Returns the md5sum from the delegation md5 file
if it doesn't exist it returns an empty string"""
checksum = ""
f = open(cache_dir + delegation_file + ".md5", "r")
checksum =
if delegation_file == "delegated-afrinic-latest":
pos = checksum.find(" ")
checksum = str (checksum[:pos])
pos = checksum.find("=") +2
checksum = str (checksum[pos:-1])
except Exception, e:
print repr(e)
return checksum
def verify_delegation_file(cache_dir, delegation_file):
""" compares the delegation file md5sum to that of the provided md5sum
returns True if they match otherwise returns False """
checksum = ""
checksum_of_file = ""
f = open(cache_dir + delegation_file, "rb")
checksum_of_file = str (hashlib.md5( )
except Exception, e:
print repr(e)
checksum = get_md5_from_delegation_md5_file(cache_dir,delegation_file)
if checksum != checksum_of_file:
return False
if checksum == checksum_of_file and checksum != "":
return True
return False
def verify_cache(cache_dir, delegation_files):
""" if in verbose mode prints the result of checking the checksum of the
delegation files """
for file in delegation_files:
if verbose:
print "verifying " + file
if verify_delegation_file(cache_dir,file):
if verbose:
print "the md5 checksum of " + file + " *matches* the provided checksum"
if verbose:
print "the md5 checksum of " + file + " does *not* match the provided checksum"
def update_delegation_cache(cache_dir, delegation_urls, useragent):
""" Fetch multiple delegation urls and cache the contents. """
print "Updating delegation cache..."
for url in delegation_urls.split():
cache_delegation(cache_dir, url + ".md5",useragent)
if verify_delegation_file(cache_dir, url.rpartition('/')[-1]):
cache_delegation(cache_dir, url,useragent)
return True
def unpack_geoip_cache(cache_dir, geoip_url):
""" Unpack the fetched GeoIP file into the blockfinder cache. """
# This probably should unlink the gzip'ed file if we care about space...
gzip_filename = geoip_url.rpartition('/')[-1]
gunziped_filename = gzip_filename.rpartition('.')[0]
if verbose:
print "Unpacking GeoIP file " + gzip_filename + " into our cache as " + gunziped_filename
gzip_file = + gzip_filename, 'rb')
gunzipped_data =
gunzipped_file = open(cache_dir + gunziped_filename, 'w')
return True
def update_geoip_cache(cache_dir, geoip_url, useragent):
""" Fetch country level resolution GeoIP file from a given url and cache
the contents. Unpack it if it's compressed. """
print "Updating GeoIP cache..."
cache_delegation(cache_dir, geoip_url, useragent)
unpack_geoip_cache(cache_dir, geoip_url)
def load_delegation(delegation_file):
""" Load, parse and store the delegation file contents as a list. """
keys = "registry cc type start value date status"
f = open(delegation_file, "r")
delegations = [ dict((k,v) for k,v in zip(keys.split(), line.split("|")))
for line in f.readlines() if not line.startswith("#")]
return delegations
except OSError, e:
print repr(e)
def load_all_delegations(cache_dir, delegation_urls):
""" Load all delegations into memory. """
delegations = []
for url in delegation_urls.split():
filename = url.rpartition('/')[-1]
if verbose:
print "Attempting to load delegation file into memory: " + filename
delegations.append(load_delegation(cache_dir + filename))
return delegations
def calculate_ipv4_subnet(host_count):
return 32 - int(floor(log(host_count,2)))
def download_country_code_file(cache_dir, useragent):
""" Download and save the latest opencountrycode XML file """
# Google frontend will not return content-length for some reason...
url = ""
ul = urllib2.urlopen(url)
xml =
f = open(cache_dir + "countrycodes.xml",'w')
return True
except Exception,e:
print repr(e)
return False
def build_country_code_dictionary(cache_dir):
""" Return a dictionary mapping country name to the country code"""
map_co = {}
xml_file = str(cache_dir) + "countrycodes.xml"
clist = minidom.parse(xml_file)
for country in clist.getElementsByTagName("country"):
code = country.attributes["code"]
name = country.attributes["name"]
map_co[name.value] = code.value
return map_co
def build_country_code_dictionary_rev(cache_dir):
""" Return a dictionary mapping country code to the country name"""
map_co = {}
xml_file = str(cache_dir) + "countrycodes.xml"
clist = minidom.parse(xml_file)
for country in clist.getElementsByTagName("country"):
code = country.attributes["code"]
name = country.attributes["name"]
map_co[code.value] = name.value
return map_co
def get_country_code_from_name(cache_dir, country_name):
""" Return the country code for a given country name. """
map_co = build_country_code_dictionary(cache_dir)
cc_code = [map_co[key] for key in map_co.keys() if key.upper().startswith(country_name.upper())]
if len(cc_code) > 0:
return cc_code[0]
def ip_address_to_dec(ip_addr):
ipar = ip_addr.split('.')
a = ['','','','']
for i in range(4):
a[i] = hex(int(ipar[i]))[2:]
if(int(ipar[i]) < 15):
a[i] = """0""" + a[i]
total = '0x'+a[0]+a[1]+a[2]+a[3]
decimal = int(total,16)
return decimal
def geoip_lookup(cache_dir, ip_addr):
gi = + "GeoIP.dat",GeoIP.GEOIP_STANDARD)
cc = gi.country_code_by_addr(ip_addr)
cc_name = gi.country_name_by_addr(ip_addr)
return cc,cc_name
def lookup_ip_address(ip_addr,cache_dir):
""" Return the country code and name for a given ip address. Attempts to
use GeoIP if available."""
print "Reverse lookup for: " + ip_addr
if GeoIP:
geoip_cc, geoip_cc_name = geoip_lookup(cache_dir, ip_addr)
print "GeoIP country code: " + str(geoip_cc)
print "GeoIP country name: " + str(geoip_cc_name)
conn = sqlite3.connect(cache_dir +"sqlitedb")
cursor = conn.cursor()
ipv4arr = ip_addr.split('.')
if len(ipv4arr) < 4:
print """doesn't look like an ipv4 address.."""
cursor.execute('select * from ipv4 WHERE start LIKE ?', (ipv4arr[0]+'%',))
cc_map = build_country_code_dictionary_rev(cache_dir)
for row in cursor:
if(ip_address_to_dec(row[2]) <= ip_address_to_dec(ip_addr) <= (ip_address_to_dec(row[2])+row[3])):
rir_cc = row[1]
rir_cc_name = cc_map[row[1]]
print 'RIR country code: ' + rir_cc
print 'RIR country: ' + rir_cc_name
if GeoIP:
if geoip_cc != rir_cc:
print "It appears that the RIR data conflicts with the GeoIP data"
print "The GeoIP data is likely closer to being correct due to " \
"sub-delegation issues with LIR databases"
def return_first_ip_and_number_in_inetnum(line):
start_ip = line.split("-")[0].strip()
end_ip = line.split("-")[1].strip()
print line
print start_ip, end_ip
num_ips = ip_address_to_dec(end_ip) - ip_address_to_dec(start_ip)
return start_ip, num_ips
def extract_info_from_lir_file(name):
import IPy
block = []
country = ""
entry = False
for line in open(name, "r"):
line = line.replace("\n", "")
if line == "":
entry = False
country, block = "", []
elif "inetnum:" in line:
entry = True
line = line.replace("inetnum:", "").strip()
start_ip, num_ips = return_first_ip_and_number_in_inetnum(line)
if not num_ips:
num_ips = 1
print start_ip, num_ips
block = [start_ip, calculate_ipv4_subnet(num_ips)]
elif "inet6num:" in line:
entry = True
block = line.replace("inet6num:", "").strip().split("/")
elif entry and "country:" in line:
entry = False
country = line.replace("country:", "").strip()
print block, country
def usage():
""" Print usage information. """
print >> sys.stderr, """
blockfinder [-c DIR] -i
blockfinder [options] -t COUNTRY
The first form initializes the local cache. The second form queries it.
Understood options (not all of which are implemented yet):
-h, --help Show this help and exit
-v Be verbose
-c, --cachedir DIR Set the cache directory
-u, --useragent
-p, --progress
-o, --output FILE
-4, --ipv4 Search IPv4 allocations
-6, --ipv6 Search IPv6 allocation
-a, --asn Search ASN allocations
-t, --nation-state CC Set the country to search (given as a two-letter code)
-n, --country-name "Costa Rica" Set country to search (full name)
-x, --hack-the-internet Hack the internet
-r, --reverse-lookup Return the county name for the specified IP
At least one of -t or -i is required, and when in -t mode, at least one of -4,
-6, and -a is required in order to do anything sensible.
def main():
""" Where the magic starts. """
opts, args = getopt.getopt(sys.argv[1:],
["hack-the-internet", "verbose", "help", "cachedir=", "useragent=", "progress",
"silent", "output=", "ipv4", "ipv6", "asn", "nation-state=",
"country-name", "initialize-delegation","reverse-lookup"])
except getopt.GetoptError, err:
print str(err)
global verbose
verbose = False
output = None
silent = True
cache_dir = str(os.path.expanduser('~')) + "/.blockfinder/"
update_delegations = False
delegation_urls = """
geoip_country_url = ""
delegation_files = []
for url in delegation_urls.split():
filename = url.rpartition('/')
update_delegations = False
requests = []
country = ""
useragent = "Mozilla/5.0"
ipaddress = ""
if not os.path.exists(cache_dir + "countrycodes.xml"):
for o, a in opts:
if o in ("-x", "--hack-the-internet"):
print "all your bases are belong to us!"
if o == "-v":
verbose = True
elif o in ("-h", "--help"):
elif o in ("-c", "--cachedir"):
cache_dir = a
elif o in ("-u", "--useragent"):
useragent = a
elif o in ("-p", "--progress"):
progress = True
elif o in ("-s", "--silent"):
silent = True
elif o in ("-o", "--output"):
output = a
elif o in ("-4", "--ipv4"):
elif o in ("-6", "--ipv6"):
elif o in ("-a", "--asn"):
# XXX TODO: This should be a positional argument as it's the only manditory one...
elif o in ("-r", "--reverse-lookup"):
ipaddress = a
elif o in ("-t", "--nation-state"):
country = a.upper()
elif o in ("-n", "--country-name"):
country = get_country_code_from_name(cache_dir, a)
elif o in ("-i", "--initialize-delegations"):
update_delegations = True
print "Unhandled option; Sorry!"
# Update and quit.
if update_delegations:
if GeoIP:
if verbose:
verify_cache(cache_dir, delegation_files)
delegations = load_all_delegations(cache_dir, delegation_urls)
insert_into_sql_database(delegations, cache_dir)
if not requests:
print "Nothing to do. Have you requested anything?"
print "Example usage: blockfinder -v --ipv4 -t mm"
if ipaddress:
if not country:
print "It appears your search did not match a country."
# Check our cache age and warn if it's aged
if cache_is_dated(cache_dir, delegation_files) and verbose:
print "Your delegation cache is older than 24 hours; you probably want to update it."
if verbose:
print "Using country code: %s" % country
for request in requests:
use_sql_database(request, country, cache_dir)
except IOError: sys.exit()
if __name__ == "__main__":
