-
-
Save rendarz/33010f9ef4f1f20632a2da9ede3b9caa to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SQLInserter: | |
def __init__(self): | |
self.queries = [] | |
self.values = [] | |
def storeDict( | |
self, | |
dict_object, | |
tablename, | |
schemaname = None, | |
primary_keys = None, | |
additional_tuples = None, | |
defaults = None, | |
whitelist = None, | |
update_on_conflict = False, | |
returning_id = False | |
): | |
# CHECK if we have both to update | |
# conflict, but primary key hasn't | |
# been specified. | |
if not primary_keys and update_on_conflict == True: | |
return False | |
# CREATE primary key(s) format string. | |
# Usually, if the primary keys are in the | |
# tuple form `("id", "year", "name")`, the | |
# resulting format, will be the string: | |
# `id, year, name". | |
primary_keys_fmt = ", ".join(primary_keys) | |
# CREATE a formatted string for the query. | |
queryfmt = "INSERT INTO {tablename_fmt} ({colnames_fmt}) VALUES ({placeholders}) " | |
if update_on_conflict: | |
queryfmt += " ON CONFLICT ({primary_keys_fmt}) DO UPDATE SET {colnames_update_fmt} " | |
if returning_id: | |
queryfmt += " RETURNING id" | |
# ADD final strings to the query one. | |
queryfmt += ";" | |
# Create a couple of lists, keys (or, column names) and values. | |
colnames = [] | |
values = [] | |
# ADD additional (key,value) tuples, if we have them. | |
if additional_tuples: | |
for additional_tuple in additional_tuples: | |
if not whitelist or (whitelist and additional_tuple[0] in whitelist): | |
colnames.append(additional_tuple[0]) | |
values.append(additional_tuple[1]) | |
assert len(colnames) == len(values) | |
# INSERT only whitelisted column names, if a whitelist | |
# is specified. | |
if whitelist: | |
checked_column_names = [ | |
cname for cname in dict_object | |
if cname in whitelist | |
] | |
else: | |
checked_column_names = [ | |
cname for cname in dict_object | |
] | |
# CREATE the list of values. | |
corresponding_values = [ | |
dict_object[cname] for cname in checked_column_names | |
] | |
# ADD the lists to previous added data. | |
colnames.extend(checked_column_names) | |
values.extend(corresponding_values) | |
assert len(colnames) == len(values) | |
# ADD the schema name to the name of the table. | |
if schemaname: | |
tablename_fmt = "{schemaname}.{tablename}".format(schemaname=schemaname, tablename=tablename) | |
else: | |
tablename_fmt = tablename | |
# CREATE a placeholders string. | |
placeholders = ['%s'] * len(colnames) | |
# CREATE the update string, if necessary. | |
if update_on_conflict: | |
colnames_update_fmt = ", ".join("{0} = EXCLUDED.{0}".format(colname) for colname in colnames if colname not in primary_keys) | |
else: | |
colnames_update_fmt = None | |
# ADD those columns that must be updated on conflicts, | |
# by their DEFAULT values specified in sql table creation. | |
# CONVERT ['a', 'b', 'c'] to "a=DEFAULT, b=DEFAULT, c=DEFAULT" | |
if defaults: | |
default_colnames_fmt = ", ".join("{0}=DEFAULT".format(default) for default in defaults if default not in primary_keys) | |
# ADD the columns to the final update string. | |
colnames_update_fmt += ", " + default_colnames_fmt | |
# CREATE the final SQL query string. | |
query = queryfmt.format( | |
tablename_fmt = tablename_fmt, | |
colnames_fmt = ", ".join(colnames), | |
placeholders = ", ".join(placeholders), | |
primary_keys_fmt = primary_keys_fmt, | |
colnames_update_fmt = colnames_update_fmt | |
) | |
# -- ADD {QUERIES, VALUES} couples. | |
self.queries.append(query) | |
self.values.append(values) | |
return True | |
def storeList( | |
self, | |
list_object, # This is a list of tuples e.g. `(123, "Mark", 35),(124, "Bob", 41)` | |
names_tuple, # This is a tuple of names e.g. `("id", "name", "age")` | |
tablename, | |
schemaname = None, | |
primary_keys = None, | |
whitelist = None, | |
defaults = None, | |
update_on_conflict = False, | |
returning_id = False | |
): | |
# CHECK if we have both to update | |
# conflict, but primary key hasn't | |
# been specified. | |
if not primary_keys and update_on_conflict == True: | |
return False | |
# CREATE primary key(s) format string. | |
# Usually, if the primary keys are in the | |
# tuple form `("id", "year", "name")`, the | |
# resulting format, will be the string: | |
# `id, year, name". | |
primary_keys_fmt = ", ".join(primary_keys) | |
# CREATE a new list of names. | |
colnames = [] | |
# CREATE a formatted string for the query. | |
queryfmt = "INSERT INTO {tablename_fmt} ({colnames_fmt}) VALUES ({placeholders}) " | |
if update_on_conflict: | |
queryfmt += " ON CONFLICT ({primary_keys_fmt}) DO UPDATE SET {colnames_update_fmt} " | |
if returning_id: | |
queryfmt += " RETURNING id" | |
# ADD final strings to the query one. | |
queryfmt += ";" | |
# IMPLEMENT whitelist filter. | |
if whitelist: | |
for correct_name in names_tuple: | |
if correct_name in whitelist: | |
colnames.append(correct_name) | |
else: | |
colnames = names_tuple | |
# ADD the schema name to the name of the table. | |
if schemaname: | |
tablename_fmt = "{schemaname}.{tablename}".format(schemaname=schemaname, tablename=tablename) | |
else: | |
tablename_fmt = tablename | |
# CREATE a placeholders string. | |
placeholders = ['%s'] * len(colnames) | |
# CREATE the update string, if necessary. | |
if update_on_conflict: | |
colnames_update_fmt = ", ".join("{0} = EXCLUDED.{0}".format(colname) for colname in colnames if colname not in primary_keys) | |
else: | |
colnames_update_fmt = None | |
# ADD those columns that must be updated on conflicts, | |
# by their DEFAULT values specified in sql table creation. | |
# CONVERT ['a', 'b', 'c'] to "a=DEFAULT, b=DEFAULT, c=DEFAULT" | |
if defaults: | |
default_colnames_fmt = ", ".join("{0}=DEFAULT".format(default) for default in defaults if default not in primary_keys) | |
# ADD the columns to the final update string. | |
colnames_update_fmt += ", " + default_colnames | |
# CREATE the final SQL query string. | |
query = queryfmt.format( | |
tablename_fmt = tablename, | |
colnames_fmt = ", ".join(colnames), | |
placeholders = ", ".join(placeholders), | |
primary_keys_fmt = primary_key, | |
colnames_update_fmt = colnames_update_fmt | |
) | |
# ADD {QUERIES, VALUES} couples. | |
for tuple_values in list_object: | |
self.queries.append(query) | |
self.values.append(tuple_values) | |
return True | |
def size(self): | |
return len(self.queries) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment