Last active
April 21, 2020 16:12
-
-
Save linuxluigi/8c0b8594815b80aa856b2d407e570777 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import psycopg2 | |
from psycopg2.extensions import connection, cursor | |
import sys | |
import datetime | |
class DestDB: | |
dbname = "gis" | |
user = "docker" | |
host = "localhost" | |
password = "Dyx8lXMKIGggiQXTzSrAuZ3UsDt8YmLy53WEIAga6EkkVc2GK9lmiRfJxzx7Oahw" | |
class SrcDB: | |
dbname = "ohdm" | |
user = "user" | |
host = "localhost" | |
password = "pass" | |
class CopyOhdmDB: | |
tables = { | |
"classification": ["id", "class", "subclassname"], | |
# "content": ["id", "name", "value", "mimetype", "source_user_id"], | |
# "external_systems": ["id", "name", "description"], | |
# "external_users": ["id", "userid", "username", "external_system_id"], | |
"geoobject": ["id", "name", "source_user_id"], | |
# "geoobject_content": ["id", "valid_since", "valid_until", "valid_since_offset", "valid_until_offset", "geoobject_id", "content_id"], | |
"geoobject_geometry": ["id", "id_target", "type_target", "id_geoobject_source", "role", "classification_id", "tags", "valid_since", "valid_until", "valid_since_offset", "valid_until_offset", "source_user_id"], | |
# "geoobject_url": ["id", "geoobject_id", "url_id", "valid_since", "valid_until", "valid_since_offset", "valid_until_offset"], | |
# "import_updates": ["id", "externalsystemid", "initial", "lastupdate"], | |
"lines": ["id", "line", "source_user_id"], | |
"points": ["id", "point", "source_user_id"], | |
"polygons": ["id", "polygon", "source_user_id"], | |
# "subsequent_geom_user": ["id", "target_id", "point_id", "line_id", "polygon_id"], | |
# "url": ["id", "url", "source_user_id"] | |
} | |
def __init__(self): | |
print("Connect to database...") | |
try: | |
self.con_source: connection = psycopg2.connect("dbname='{0}' user='{1}' host='{2}' password='{3}'".format( | |
SrcDB.dbname, | |
SrcDB.user, | |
SrcDB.host, | |
SrcDB.password, | |
)) | |
self.con_dest: connection = psycopg2.connect("dbname='{0}' user='{1}' host='{2}' password='{3}'".format( | |
DestDB.dbname, | |
DestDB.user, | |
DestDB.host, | |
DestDB.password, | |
)) | |
except psycopg2.OperationalError: | |
print ("I am unable to connect to the database") | |
exit(1) | |
self.cur_source: cursor = self.con_source.cursor() | |
self.cur_dest: cursor = self.con_dest.cursor() | |
self.schema_source: str = "ohdm" | |
self.schema_dest: str = "public" | |
self.cache_size: int = 100000 | |
def count_rows(self, schema: str, table: str, cur: cursor) -> int: | |
print("Count rows for {}".format(table)) | |
cur.execute("SELECT COUNT(*) FROM {0}.{1};".format(schema, table)) | |
rows = cur.fetchall() | |
for row in rows: | |
return int(row[0]) | |
return 0 | |
def column_2_str(self, column: List[str]) -> str: | |
return "{} ".format(", ".join(column)) | |
def row_2_insert(self, row) -> str: | |
value_query: str = "" | |
for value in row: | |
if isinstance(value, int): | |
value_query += "{}, ".format(value) | |
else: | |
value_query += "'{}', ".format(value) | |
return "{} ".format(value_query[:-2]) | |
def copy_table(self, table: str, offset_value: Optional[int], target_rows_amount: int) -> Optional[int]: | |
insert_query: str = "" | |
if not offset_value: | |
offset_value: int = self.count_rows(schema=self.schema_dest, table=table, cur=self.cur_dest) | |
if offset_value / 1000000 > 0: | |
offset_str: str = "{0:.2f}m".format(offset_value / 1000000) | |
else: | |
offset_str: str = "{0:.2f}k".format(offset_value / 1000) | |
if target_rows_amount / 1000000 > 0: | |
target_rows: str = "{0:.2f}m".format(target_rows_amount / 1000000) | |
else: | |
target_rows: str = "{0:.2f}k".format(target_rows_amount / 1000) | |
done_percent: int = offset_value / target_rows_amount *100 | |
print("Import {0} from {1} of {2} ({3:.2f}%) @ {4}".format(table, offset_str, target_rows, done_percent, datetime.datetime.now())) | |
self.cur_source.execute("SELECT * FROM {0}.{1} ORDER BY id ASC LIMIT {2} OFFSET {3};".format( | |
self.schema_source, | |
table, | |
self.cache_size, | |
offset_value | |
)) | |
rows = self.cur_source.fetchall() | |
for row in rows: | |
insert_query += "INSERT INTO {2}.{3}({0}) VALUES ({1}); \n".format( | |
self.column_2_str(CopyOhdmDB.tables[table]), | |
self.row_2_insert(row=row), | |
self.schema_dest, | |
table | |
) | |
# check if done | |
if insert_query == "": | |
None | |
self.cur_dest.execute(insert_query) | |
self.con_dest.commit() | |
return offset_value + self.cache_size | |
def copy_all(self): | |
for table in CopyOhdmDB.tables: | |
self.copy_table_loop(table=table) | |
def copy_table_loop(self, table: str): | |
offset_value: Optional[int] = None | |
print("start copy {}".format(table)) | |
target_rows_amount: int = self.count_rows(schema=self.schema_source, table=table, cur=self.cur_source) | |
while True: | |
offset_value = self.copy_table(table=table, offset_value=offset_value, target_rows_amount=target_rows_amount) | |
if not offset_value: | |
break | |
# start copy | |
copy_ohdm_db: CopyOhdmDB = CopyOhdmDB() | |
if len(sys.argv) > 1: | |
while True: | |
try: | |
copy_ohdm_db.copy_table_loop(table=str(sys.argv[1])) | |
except psycopg2.errors.InFailedSqlTransaction: | |
copy_ohdm_db.con_dest.commit() | |
except (psycopg2.errors.UniqueViolation, psycopg2.errors.InFailedSqlTransaction) as e: | |
print(e) | |
except psycopg2.DatabaseError: | |
print("Database connection error!") | |
exit(1) | |
else: | |
while True: | |
try: | |
copy_ohdm_db.copy_all() | |
except psycopg2.errors.InFailedSqlTransaction: | |
copy_ohdm_db.con_dest.commit() | |
except (psycopg2.errors.UniqueViolation, psycopg2.errors.InFailedSqlTransaction) as e: | |
print(e) | |
except psycopg2.DatabaseError: | |
print("Database connection error!") | |
exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment