Created
January 14, 2020 21:15
-
-
Save abelardojarab/a55dc4d237e9927a51fd502c02fb66d7 to your computer and use it in GitHub Desktop.
This python script extract data from SQLite DB and pass it to into Elasticsearch. ZVJS DB is available on : https://morph.io/Pytlicek/SK_Prison_and_Court_Guard and it is part od OpenData project to scraping data from Goverment sites.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import sqlite3 | |
import json | |
import unicodedata | |
from elasticsearch import Elasticsearch | |
conn = sqlite3.connect('SK_Prison_and_Court_Guard.sqlite') | |
conn.row_factory = sqlite3.Row | |
curs = conn.cursor() | |
cursor = conn.execute('select * from data;') | |
last_sqlite_id = len(cursor.fetchall()) | |
print "Last SQLite ID from DB: ", last_sqlite_id | |
es = Elasticsearch(host='elk.myserver.sk', port=9200, http_auth=('auth_user', 'auth_pass'),) | |
response = es.search( | |
index="testpy", | |
body={ | |
"query": { | |
"bool": { | |
"must": [{"match": {"_type": "testpy"}}] | |
} | |
}, | |
"aggs": { | |
"max_price": {"max": {"field": "invoice_id"}}}, | |
"sort": {"invoice_id": {"order": "desc"}}, | |
"size": 1 | |
} | |
) | |
for hit in response['hits']['hits']: | |
last_elastic_id = (hit['_source']['invoice_id']) | |
print "Last Elasticsearch ID is: ",(last_elastic_id) | |
# Temp WorkAround if isn't nothing in index | |
# If is empty use counter with number 1 | |
# last_sqlite_id = 999999 | |
counter = last_elastic_id + 1 | |
try: | |
while counter < last_sqlite_id : | |
conn = sqlite3.connect('SK_Prison_and_Court_Guard.sqlite') | |
conn.row_factory = sqlite3.Row | |
d = str(counter) | |
curs.execute("SELECT * FROM data WHERE invoice_id=" + d + "") | |
recs = curs.fetchall() | |
rows = [dict(rec) for rec in recs] | |
rows_json = json.dumps(rows) | |
chunk = unicodedata.normalize('NFKD', unicode(rows_json, 'utf-8', 'ignore')).encode('ASCII', 'ignore').replace('[','').replace(']', '').replace('"null"', 'null').replace('999999.99', '0.0') | |
print "Next try to push Document to ELK with ID: ",(counter) | |
es = Elasticsearch(host='elk.myserver.sk', port=9200, http_auth=('auth_user', 'auth_pass'),) | |
res = es.index(index="testpy", doc_type='testpy', body=chunk) | |
print "Result - Created : ", (res['created']), "OK\n" | |
counter = counter + 1 | |
except: | |
print "Finish" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment