Skip to content

Instantly share code, notes, and snippets.

@Poorvak
Created July 20, 2017 09:22
Show Gist options
  • Save Poorvak/39b04b0892ff7f8fb9787a1d4089c7c9 to your computer and use it in GitHub Desktop.
Save Poorvak/39b04b0892ff7f8fb9787a1d4089c7c9 to your computer and use it in GitHub Desktop.
Index data from Postgres to ElasticSearch using the arguments.
#!/usr/bin/python
import math
import argparse
import traceback
import psycopg2
import psycopg2.extras
import elasticsearch
def ResultIter(cursor, arraysize=100):
# An iterator that uses fetchmany to keep memory usage down
while True:
results = cursor.fetchmany(arraysize)
if not results:
break
for result in results:
yield result
def PGDatabaseConn():
try:
conn = psycopg2.connect(database=args.db_name,
user=args.db_username,
password=args.db_password,
host=args.db_host)
except Exception:
print "Unable to connect to postgres - host: %s" % args.db_host
traceback.print_exc()
exit(1)
return conn.cursor(name='mycursor',
cursor_factory=psycopg2.extras.RealDictCursor)
def ESConn():
print "Conecting to elasticsearch"
try:
es = elasticsearch.Elasticsearch(hosts=[args.es_host],
auth=(args.es_user, args.es_password),
port=args.es_port)
except Exception:
print "Unable to connect to elasticsearch - host: ", args.es_host
traceback.print_exc()
exit(1)
return es
def ESInsert(es, row):
try:
es.index(index=args.es_index, doc_type=args.es_indextype, body=row,
id=row.get('author_id'))
except Exception as exc:
raise exc
parser = argparse.ArgumentParser(description='Convert postgres to es')
parser.add_argument('-q', '--query',
required=True, help='Query to retrieve data')
parser.add_argument('--db_name',
required=True, help='Postgres database name')
parser.add_argument('--db_username',
required=True, help='Postgres database username')
parser.add_argument('--db_password',
required=True, help='Postgres database password')
parser.add_argument('--db_host',
required=True, help='Postgres host')
parser.add_argument('--es_host',
required=True, help='Elastisearch host:port')
parser.add_argument('--es_port',
required=False, help='Elasticsearch Port')
parser.add_argument('--es_index',
required=True, help='Elastisearch index')
parser.add_argument('--es_indextype',
required=True, help='Elastisearch index type')
parser.add_argument('--es_user',
required=False, help='Elasticsearch User')
parser.add_argument('--es_password',
required=False, help='Elasticsearch User and Password')
args = parser.parse_args()
# Conn to PG and open a cursor
cur = PGDatabaseConn()
# Execute the query
print "Executing query -> %s" % args.query
cur.execute(args.query)
# Connect to elasticsearch
es = ESConn()
# Insert results
print "Inserting data into elasticsearch"
error = 0
row_replace = dict()
for row in ResultIter(cursor=cur, arraysize=250):
try:
ESInsert(es=es, row=row)
except Exception:
try:
# Try replace float nan values to None
row_replace.clear()
row_replace.update(row)
for key, value in row.items():
if isinstance(value, float) and math.isnan(value):
row_replace[key] = None
print row_replace
ESInsert(es, row_replace)
except Exception:
print "Error inserting row"
print row
error = 1
traceback.print_exc()
continue
if error == 0:
print "Done!!! :D "
exit(0)
else:
print "Done with errors :( "
exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment