Created
July 5, 2013 13:07
-
-
Save draganHR/5934417 to your computer and use it in GitHub Desktop.
Extracting city data from GeoIP city edition binary database.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import math | |
import pygeoip | |
class GeoCityReader(pygeoip.GeoIP): | |
""" | |
Extract city data from GeoIP database | |
Example: | |
reader = GeoCityReader('~/GeoLiteCity.dat', pygeoip.MEMORY_CACHE) | |
for record in reader.get_cities(): | |
print record | |
""" | |
def get_chunk(self, pos, size): | |
if self._flags & pygeoip.const.MEMORY_CACHE: | |
buf = self._memoryBuffer[pos:pos + size] | |
else: | |
self._lock.acquire() | |
self._filehandle.seek(pos, os.SEEK_SET) | |
buf = self._filehandle.read(size) | |
self._lock.release() | |
return buf | |
def get_record(self, seek_country): | |
read_length = (2 * self._recordLength - 1) * self._databaseSegments | |
buf = self.get_chunk(seek_country + read_length, pygeoip.const.FULL_RECORD_LENGTH) | |
if pygeoip.PY3 and type(buf) is bytes: | |
buf = buf.decode(pygeoip.ENCODING) | |
if ord(buf[1]) == 0 and ord(buf[2])== 0 and ord(buf[3]) == 0: | |
return {} | |
record = { | |
'dma_code': 0, | |
'area_code': 0, | |
'metro_code': '', | |
'postal_code': '' | |
} | |
latitude = 0 | |
longitude = 0 | |
buf_pos = 0 | |
# Get country | |
char = ord(buf[buf_pos]) | |
record['country_code'] = pygeoip.const.COUNTRY_CODES[char] | |
record['country_code3'] = pygeoip.const.COUNTRY_CODES3[char] | |
record['country_name'] = pygeoip.const.COUNTRY_NAMES[char] | |
record['continent'] = pygeoip.const.CONTINENT_NAMES[char] | |
buf_pos += 1 | |
def get_data(buf, buf_pos): | |
offset = buf_pos | |
char = ord(buf[offset]) | |
while (char != 0): | |
offset += 1 | |
char = ord(buf[offset]) | |
if offset > buf_pos: | |
return (offset, buf[buf_pos:offset]) | |
return (offset, '') | |
offset, record['region_name'] = get_data(buf, buf_pos) | |
offset, record['city'] = get_data(buf, offset + 1) | |
offset, record['postal_code'] = get_data(buf, offset + 1) | |
buf_pos = offset + 1 | |
for j in range(3): | |
char = ord(buf[buf_pos]) | |
buf_pos += 1 | |
latitude += (char << (j * 8)) | |
for j in range(3): | |
char = ord(buf[buf_pos]) | |
buf_pos += 1 | |
longitude += (char << (j * 8)) | |
record['latitude'] = (latitude / 10000.0) - 180.0 | |
record['longitude'] = (longitude / 10000.0) - 180.0 | |
if self._databaseType == pygeoip.const.CITY_EDITION_REV1: | |
dmaarea_combo = 0 | |
if record['country_code'] == 'US': | |
for j in range(3): | |
char = ord(buf[buf_pos]) | |
dmaarea_combo += (char << (j * 8)) | |
buf_pos += 1 | |
record['dma_code'] = int(math.floor(dmaarea_combo / 1000)) | |
record['area_code'] = dmaarea_combo % 1000 | |
record['metro_code'] = pygeoip.const.DMA_MAP.get(record['dma_code']) | |
params = (record['country_code'], record['region_name']) | |
record['time_zone'] = pygeoip.time_zone_by_country_and_region(*params) | |
return record | |
def get_cities(self): | |
offset = 0 | |
while offset < self._databaseSegments: | |
record = None | |
buf = self.get_chunk(2 * self._recordLength * offset, 2 * self._recordLength) | |
if not buf: | |
break | |
if pygeoip.PY3 and type(buf) is bytes: | |
buf = buf.decode(pygeoip.ENCODING) | |
x = [0, 0] | |
for i in range(2): | |
for j in range(self._recordLength): | |
byte = buf[self._recordLength * i + j] | |
x[i] += ord(byte) << (j * 8) | |
offset = offset + 1 | |
if x[1] >= self._databaseSegments: | |
record = self.get_record(x[1]) | |
if not record.get('city'): | |
record = None | |
if record is None and x[0] >= self._databaseSegments: | |
record = self.get_record(x[0]) | |
if not record.get('city'): | |
record = None | |
if record: | |
yield record | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment