Skip to content

Instantly share code, notes, and snippets.

@ad-m
Last active July 12, 2017 11:22
Show Gist options
  • Select an option

  • Save ad-m/454e40538504f9a8a73d74dc1a7c9876 to your computer and use it in GitHub Desktop.

Select an option

Save ad-m/454e40538504f9a8a73d74dc1a7c9876 to your computer and use it in GitHub Desktop.
# coding=utf-8
from __future__ import print_function
import sys
from os.path import isfile
import dataset
from lxml import etree
from tqdm import tqdm
from itertools import imap, ifilter
# Input file can be downloaded at https://epuap.gov.pl/ -> Strefa urzędnika -> Dla integratorów ->
# Książka adresowa ESP -> XML ( https://s.jawne.info.pl/ksiazka-esp )
class Downloader(object):
def __init__(self, argv):
self.argv = argv
@staticmethod
def delete_id(data):
data['nazwa_podmiotu'] = data['uri'].split('/')[1].lower()
del data['uri']
return tuple(data.items())
def generate_data(self, input_filename):
tree = etree.parse(input_filename, etree.XMLParser(remove_blank_text=True))
items = tree.iterfind('esp')
# items = [items.next(), items.next(), items.next()]
for esp in items:
data = {el.tag: el.text.strip() for el in esp.getchildren()}
data['location'] = u"{} {} {}".format(data['adres'], data['miejscowosc'], data['kod_pocztowy'])
yield data
def main(self):
if len(self.argv) != 5:
print("%s [input_xml] [output_csv] [output_csv_unique] [output_ESP]".format(self.argv[0]))
sys.exit(2)
input_filename = self.argv[1]
output_filename = self.argv[2]
output_uniq_filename = self.argv[3]
output_esp_filename = self.argv[4]
self.extract_data(lambda data: data, input_filename, output_filename)
self.extract_data(lambda data: imap(dict, set(imap(self.delete_id, data))), input_filename, output_uniq_filename)
self.extract_data(lambda data: imap(lambda item: dict(self.delete_id(item)), ifilter(lambda item: 'skrytkaesp' in item['uri'].lower(), data)), input_filename,
output_esp_filename)
def extract_data(self, filter, input_filename, output_filename):
table = dataset.connect('sqlite:///:memory:')['esp']
table.insert_many(tqdm(filter(self.generate_data(input_filename)),desc="Load for {}".format(output_filename)))
dataset.freeze(tqdm(table.all(), total=table.count(), desc="Save to {}".format(output_filename)), format='csv', filename=output_filename)
print("Saved {} items in {}".format(table.count(), output_filename))
if __name__ == '__main__':
Downloader(sys.argv).main()
dataset==0.8.0
lxml==3.8.0
tqdm==4.14.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment