Created
June 7, 2012 19:46
-
-
Save leighmacdonald/2891135 to your computer and use it in GitHub Desktop.
MaxMind normalized Import Script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/python | |
| from __future__ import print_function | |
| from sys import exit | |
| from logging import getLogger, DEBUG, INFO, basicConfig | |
| from urllib2 import urlopen, HTTPError | |
| from urllib import urlretrieve | |
| from gzip import open as gzopen | |
| from datetime import datetime | |
| from collections import defaultdict | |
| from zipfile import ZipFile, is_zipfile | |
| import socket | |
| import struct | |
| import MySQLdb as dbapi | |
| def ip2int(val): | |
| """ Convert dotted quad string to long int representation | |
| :param val: Dotted quad (0.0.0.0) | |
| :type val: str | |
| :return: Decimal version of IP | |
| :rtype: int | |
| """ | |
| try: | |
| _str = socket.inet_pton(socket.AF_INET, val) | |
| except socket.error: | |
| raise ValueError | |
| return struct.unpack('!I', _str)[0] | |
| class CountryLoader(object): | |
| """ | |
| TODO: http://ip-to-country.webhosting.info/downloads/ip-to-country.csv.zip | |
| TODO: http://www.maxmind.com/app/country_continent | |
| Country Code | |
| ASCII City Name | |
| City Name | |
| Region | |
| Population | |
| Latitude | |
| Longitude | |
| """ | |
| _url = 'http://www.maxmind.com/download/worldcities/worldcitiespop.txt.gz' | |
| _region_url = 'http://www.maxmind.com/fips10-4.csv' | |
| _geoip_url = 'http://pkgs.fedoraproject.org/repo/pkgs/ntop/GeoIPASNum.dat.gz/32997e6d3b1df64cbbb3ecc0adfb9ad8/GeoIPASNum.dat.gz' | |
| _asn_urls = [ | |
| ('arin', 'ftp://ftp.arin.net/pub/stats/arin/delegated-arin-latest'), | |
| ('ripe', 'ftp://ftp.ripe.net/ripe/stats/delegated-ripencc-latest'), | |
| ('afrinic', 'ftp://ftp.afrinic.net/pub/stats/afrinic/delegated-afrinic-latest'), | |
| ('apnic', 'ftp://ftp.apnic.net/pub/stats/apnic/delegated-apnic-latest'), | |
| ('lacnic', 'ftp://ftp.lacnic.net/pub/stats/lacnic/delegated-lacnic-latest'), | |
| ] | |
| ''' | |
| "16777216","17367039","AU","AUS","AUSTRALIA" | |
| "17367040","17432575","MY","MYS","MALAYSIA" | |
| ''' | |
| _ip2country = 'http://ip-to-country.webhosting.info/downloads/ip-to-country.csv.zip' | |
| _country_cache = {} | |
| _region_cache = {} | |
| _asn_id_cache = {} | |
| _region_id_cache = defaultdict(dict) | |
| _insert_count = 0 | |
| _country_codes_raw = None | |
| _region_names_raw = None | |
| def __init__(self, db, url=None, country_file=None, region_file=None, schema_file=None): | |
| """ Setup the default import parameters | |
| :param db: Database connection | |
| :type db: dbapi.Connection | |
| :param url: URL for the world cities population file from maxmind | |
| :type url: str | |
| """ | |
| self._db = db | |
| if url: self._url = url | |
| if not country_file: country_file = './countries.csv' | |
| if not region_file: region_file = './regions.csv' | |
| if not schema_file: schema_file = './schema.sql' | |
| self.log = getLogger(self.__class__.__name__) | |
| self._country_codes_raw = open(country_file).read() | |
| self._region_names_raw = open(region_file).readlines() | |
| self._initDb(schema_file) | |
| def _initDb(self, schema): | |
| """ Initialize the database schema if it isnt already | |
| :param schema: Path to the schema file | |
| :type schema: str | |
| """ | |
| try: | |
| cur = self._db.cursor() | |
| cur.execute('SELECT * FROM country') | |
| except dbapi.ProgrammingError: | |
| cur = self._db.cursor() | |
| self.log.warn('Failed to find schema, creating.') | |
| sql = open(schema).read() | |
| try: | |
| cur.execute(sql) | |
| except Exception as err: | |
| self.log.exception(err) | |
| raise | |
| def importData(self): | |
| """ Main importing function """ | |
| start = datetime.now() | |
| self.loadCountries(sorted_countries) | |
| self.loadRegions() | |
| self.loadCities() | |
| self.loadIP2Country() | |
| self.log.info('Elapsed time: {0}'.format(datetime.now() - start)) | |
| def loadCities(self): | |
| """ Load city data from the CSV file """ | |
| [self.addCity(*city) for city in self.readCSV(self.fetchCSV())] | |
| def loadRegions(self): | |
| """ Load region data into cache for lookups """ | |
| self.log.info('Loading {0} regions'.format(len(self._region_names_raw))) | |
| for code, reg_id, name in (data.split(',', 2) for data in self._region_names_raw): | |
| self._region_id_cache[code][reg_id] = name.replace('"', '').strip() | |
| def loadIP2Country(self): | |
| """ Download the CSV, extract and parse it into usable format. Insert into db. """ | |
| ip2cdb = self.fetchCSV(self._ip2country) | |
| ipdata = self._getZippedData(ip2cdb) | |
| query = 'INSERT IGNORE INTO ip (start_ip, end_ip, country_id) VALUES(%s, %s, %s)' | |
| with self._db as cur: | |
| for start, end, country, _ in (ip_data.split(',', 3) for ip_data in ipdata.splitlines()): | |
| start, end, country = start.strip('"'), end.strip('"'), country.strip('"') | |
| country_id = self.getCountryId(country) | |
| cur.execute(query, (start, end, country_id,)) | |
| self.log.debug('{0} {1} {2} {3}'.format(start, end, country, country_id)) | |
| def _getZippedData(self, filename): | |
| """ Read and return the first file of the provided zip file path | |
| :param filename: Zip file path | |
| :type filename: str | |
| :return: zip file content | |
| :rtype: str | |
| """ | |
| if not is_zipfile(filename): | |
| raise IOError('Not a zipfile! {0}'.format(filename)) | |
| z = ZipFile(filename) | |
| return z.read(z.infolist()[0]) | |
| def loadASNum(self): | |
| query = 'INSERT IGNORE INTO asn_ip (asn_id, country_id, ip_block) VALUES(%s, %s, %s)' | |
| for asn, url in self._asn_urls: | |
| self.log.info('Starting import of {0} AS data'.format(asn)) | |
| file = self.fetchCSV(url) | |
| with self._db as cur: | |
| for line in (l.split('|') for l in open(file).readlines() if not l.startswith("#")): | |
| if line[1] == '*' or not line[6] in ("assigned\n", "allocated\n") or line[2] != 'ipv4': | |
| continue | |
| asn_id = self.getAsnId(asn) | |
| country_id = self.getCountryId(line[1]) | |
| ip_start = ip2int(line[3]) | |
| cur.execute(query, (asn_id, country_id, ip_start)) | |
| def getAsnId(self, asn_name): | |
| try: | |
| return self._asn_id_cache[asn_name] | |
| except KeyError: | |
| query = """SELECT asn_id FROM asn WHERE asn = %s""" | |
| with self._db as cur: | |
| cur.execute(query, (asn_name,)) | |
| res = cur.fetchone() | |
| if res: | |
| self._asn_id_cache[asn_name] = res[0] | |
| else: | |
| query = 'INSERT INTO asn (asn) VALUES(%s)' | |
| cur.execute(query, (asn_name,)) | |
| asn_id = cur.lastrowid | |
| self._asn_id_cache[asn_name] = asn_id | |
| self.log.info('Added new ASN: {0}'.format(asn_name)) | |
| return self._asn_id_cache[asn_name] | |
| def addCity(self, country, city, accentCity, region, population, lat, long): | |
| """ Insert city data into the database | |
| :param country: 2 Letter Country code | |
| :type country: str | |
| :param city: Simplified City Name | |
| :type city: str | |
| :param accentCity: Localized City Name | |
| :type accentCity: str | |
| :param region: 2 Letter Region COde | |
| :type region: str | |
| :param population: City population | |
| :type population: str | |
| :param lat: Latitude | |
| :type lat: float | |
| :param long: Longitude | |
| :type long: float | |
| :return: City ID | |
| :rtype: int | |
| """ | |
| if not region == 'Region': | |
| country_id = self.getCountryId(country) | |
| region_id = self.getRegionId(country_id, region, country) | |
| query = """INSERT IGNORE INTO city (country_id, region_id, city, `location`, `lat`, `long`) | |
| VALUES( %s, %s, %s , PointFromText('POINT({0}, {1})'), %s, %s)""".format(lat, long) | |
| with self._db as cur: | |
| cur.execute(query, (country_id, region_id, city, lat, long)) | |
| self._insert_count += 1 | |
| if self._insert_count % 10000 == 0: | |
| self.log.info('Cities added: {0}'.format(self._insert_count)) | |
| return cur.lastrowid | |
| def _parse_country_codes(self, raw_data): | |
| """ Parse country codes into a sortable list | |
| :param raw_data: Raw country code CSV data | |
| :type raw_data: str | |
| :return: Parsed country codes | |
| :rtype: list | |
| """ | |
| data = [] | |
| for cc_line in raw_data.splitlines(): | |
| try: | |
| cc, country = cc_line.split(',', 1) | |
| except Exception: | |
| cc, country, other = cc_line.split(',', 1) | |
| data.append((cc, country.replace('"', ''))) | |
| return data | |
| def sortCountries(self, countries): | |
| """ Sort the countries by their Code | |
| :param countries: Country list | |
| :type countries: list | |
| :return: Sorted country list | |
| :rtype: list | |
| """ | |
| return sorted(countries, key=lambda t: t[0]) | |
| def loadCountries(self, countries): | |
| """ Iterate over and insert the countries into the database | |
| :param countries: List of sorted countries | |
| :type countries: list | |
| """ | |
| self.log.info('Parsing countries') | |
| countries = self._parse_country_codes(self._country_codes_raw) | |
| sorted_countries = self.sortCountries(countries) | |
| [self._insertCountry(code, name) for code, name in sorted_countries] | |
| def _insertCountry(self, code, name): | |
| """ Insert country data into the database | |
| :param code: 2 Letter country code | |
| :type code: str | |
| :param name: Name of the country | |
| :type name: str | |
| :return: country_id of the inserted country | |
| :rtype: int | |
| """ | |
| query = 'INSERT INTO country (code, country) VALUES(%s, %s)\ | |
| ON DUPLICATE KEY UPDATE code=%s, country = %s' | |
| with self._db as cur: | |
| cur.execute(query, (code, name, code, name,)) | |
| self.log.info('Added country: ({0}) {1}'.format(code, name)) | |
| return cur.lastrowid | |
| def getCountryId(self, code): | |
| """ Lookup a country id first from the cache, then from the database if not found | |
| :param code: 2 Letter country code | |
| :type code: str | |
| :return: country_id | |
| :rtype: int | |
| """ | |
| if not code in self._country_cache: | |
| query = 'SELECT country_id FROM country WHERE code = %s' | |
| with self._db as cur: | |
| try: | |
| cur.execute(query, (code,)) | |
| res = cur.fetchone() | |
| try: | |
| self._country_cache[code] = res[0] | |
| return res[0] | |
| except TypeError: | |
| country_id = self._insertCountry(code, 'UNKNOWN') | |
| self._country_cache[code] = country_id | |
| except Exception: | |
| raise | |
| return self._country_cache[code] | |
| def getRegionId(self, country_id, region, country): | |
| """ Lookup the region_id from cache then from the db if not found | |
| :param country_id: country_id db key | |
| :type country_id: str | |
| :param region: Region Code | |
| :type region: str | |
| :param country: Long format country name | |
| :type country: str | |
| :return: region_id | |
| :rtype: int | |
| """ | |
| key = '_'.join(map(str, [region, country_id])) | |
| if not key in self._region_cache: | |
| query = 'SELECT region_id FROM region WHERE country_id = %s AND region = %s' | |
| try: | |
| cur = self._db.cursor() | |
| cur.execute(query, (country_id, region)) | |
| try: | |
| res = cur.fetchone() | |
| region_id = res[0] | |
| except (IndexError, TypeError) as err: | |
| region_id = self.createRegion(country_id, region, country) | |
| except Exception as err: | |
| raise | |
| if region_id: | |
| self._region_cache[key] = region_id | |
| else: | |
| raise Exception('Cannot create region: {0}'.format(region)) | |
| return self._region_cache[key] | |
| def createRegion(self, country_id, region, country, default_name='Unknown'): | |
| """ Create and return a new region_id | |
| :param country_id: Country_id code | |
| :type country_id: str | |
| :param region: Region name | |
| :type region: str | |
| :param country: 2 Letter Country Code | |
| :type country: str | |
| :param default_name: Default name for invalid region data | |
| :type default_name: str | |
| :return: region_id | |
| :rtype: int | |
| """ | |
| try: | |
| if not region: | |
| region_name = default_name | |
| else: | |
| try: | |
| region_name = self._region_id_cache[country.upper()][region] | |
| except KeyError: | |
| region_name = default_name | |
| query = 'INSERT INTO region (country_id, region, region_name) VALUES(%s, %s, %s)' | |
| with self._db as cur: | |
| cur.execute(query, (str(country_id), region, region_name,)) | |
| region_id = cur.lastrowid | |
| return region_id | |
| except KeyError as err: | |
| self.log.exception(err) | |
| def fetchCSV(self, url=None): | |
| """ Download the CSV file incrementally, showing progress. | |
| :param url: URL for the CSV data | |
| :type url: str | |
| :return: CSV File path | |
| :rtype: str | |
| """ | |
| if not url: | |
| url = self._url | |
| self.log.debug('Downloading URL: {0}'.format(url)) | |
| return urlretrieve(url)[0] | |
| def readCSV(self, file_name, sep=','): | |
| """ Decode and read in the CSV data into a usable list | |
| :param file_name: CSV File location | |
| :type file_name: str | |
| :param sep: Line separator | |
| :type sep: str | |
| :return: CSV Data | |
| :rtype: list | |
| """ | |
| return (line.split(sep) for line in gzopen(file_name, 'rb').read().splitlines()) | |
| if __name__ == "__main__": | |
| from optparse import OptionParser | |
| parser = OptionParser(description='Tool to import MaxMind data into a normalized (3NF) SQL format') | |
| parser.add_option('-H', '--host', help="MySQL host to connect to", dest='host', default='localhost') | |
| parser.add_option('-P', '--port', help="MySQL port to connect to", dest='port', default=3306) | |
| parser.add_option('-u', '--user', help="MySQL user name", dest='user', default=None) | |
| parser.add_option('-p', '--password', help='MySQL password', dest='password', default=None) | |
| parser.add_option('-d', '--db', help="Database to use", dest='db', default='geo') | |
| parser.add_option('-r', '--regions', help='Specify an alternate region source file', dest='region_file', default=None) | |
| parser.add_option('-c', '--countries', help='Specify an alternate country source file', dest='country_file', default=None) | |
| parser.add_option('-s', '--schema', help='Use an alternate schema file for database table creation.', dest='schema_file', default=None) | |
| parser.add_option('-U', '--url', help="Specify an alternate URL to the CSV data.", dest="url", default=None) | |
| opts, args = parser.parse_args() | |
| country_codes_raw = open('./countries.csv').read() | |
| region_names_raw = open('./regions.csv').readlines() | |
| basicConfig(level=DEBUG) | |
| try: | |
| db = dbapi.connect(host=opts.host, port=opts.port, user=opts.user, passwd=opts.password, db=opts.db) | |
| loader = CountryLoader(db, url=opts.url, region_file=opts.region_file, country_file=opts.country_file, schema_file=opts.schema_file) | |
| loader.importData() | |
| except dbapi.OperationalError as err: | |
| print(err) | |
| print('Cannot connect to MySQL server, please check your credentials') | |
| except Exception as err: | |
| print(err) | |
| finally: | |
| exit() | |
| else: | |
| basicConfig(level=DEBUG) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment