Skip to content

Instantly share code, notes, and snippets.

@rendarz
Created May 5, 2020 14:55
Show Gist options
  • Save rendarz/33010f9ef4f1f20632a2da9ede3b9caa to your computer and use it in GitHub Desktop.
Save rendarz/33010f9ef4f1f20632a2da9ede3b9caa to your computer and use it in GitHub Desktop.
class SQLInserter:
def __init__(self):
self.queries = []
self.values = []
def storeDict(
self,
dict_object,
tablename,
schemaname = None,
primary_keys = None,
additional_tuples = None,
defaults = None,
whitelist = None,
update_on_conflict = False,
returning_id = False
):
# CHECK if we have both to update
# conflict, but primary key hasn't
# been specified.
if not primary_keys and update_on_conflict == True:
return False
# CREATE primary key(s) format string.
# Usually, if the primary keys are in the
# tuple form `("id", "year", "name")`, the
# resulting format, will be the string:
# `id, year, name".
primary_keys_fmt = ", ".join(primary_keys)
# CREATE a formatted string for the query.
queryfmt = "INSERT INTO {tablename_fmt} ({colnames_fmt}) VALUES ({placeholders}) "
if update_on_conflict:
queryfmt += " ON CONFLICT ({primary_keys_fmt}) DO UPDATE SET {colnames_update_fmt} "
if returning_id:
queryfmt += " RETURNING id"
# ADD final strings to the query one.
queryfmt += ";"
# Create a couple of lists, keys (or, column names) and values.
colnames = []
values = []
# ADD additional (key,value) tuples, if we have them.
if additional_tuples:
for additional_tuple in additional_tuples:
if not whitelist or (whitelist and additional_tuple[0] in whitelist):
colnames.append(additional_tuple[0])
values.append(additional_tuple[1])
assert len(colnames) == len(values)
# INSERT only whitelisted column names, if a whitelist
# is specified.
if whitelist:
checked_column_names = [
cname for cname in dict_object
if cname in whitelist
]
else:
checked_column_names = [
cname for cname in dict_object
]
# CREATE the list of values.
corresponding_values = [
dict_object[cname] for cname in checked_column_names
]
# ADD the lists to previous added data.
colnames.extend(checked_column_names)
values.extend(corresponding_values)
assert len(colnames) == len(values)
# ADD the schema name to the name of the table.
if schemaname:
tablename_fmt = "{schemaname}.{tablename}".format(schemaname=schemaname, tablename=tablename)
else:
tablename_fmt = tablename
# CREATE a placeholders string.
placeholders = ['%s'] * len(colnames)
# CREATE the update string, if necessary.
if update_on_conflict:
colnames_update_fmt = ", ".join("{0} = EXCLUDED.{0}".format(colname) for colname in colnames if colname not in primary_keys)
else:
colnames_update_fmt = None
# ADD those columns that must be updated on conflicts,
# by their DEFAULT values specified in sql table creation.
# CONVERT ['a', 'b', 'c'] to "a=DEFAULT, b=DEFAULT, c=DEFAULT"
if defaults:
default_colnames_fmt = ", ".join("{0}=DEFAULT".format(default) for default in defaults if default not in primary_keys)
# ADD the columns to the final update string.
colnames_update_fmt += ", " + default_colnames_fmt
# CREATE the final SQL query string.
query = queryfmt.format(
tablename_fmt = tablename_fmt,
colnames_fmt = ", ".join(colnames),
placeholders = ", ".join(placeholders),
primary_keys_fmt = primary_keys_fmt,
colnames_update_fmt = colnames_update_fmt
)
# -- ADD {QUERIES, VALUES} couples.
self.queries.append(query)
self.values.append(values)
return True
def storeList(
self,
list_object, # This is a list of tuples e.g. `(123, "Mark", 35),(124, "Bob", 41)`
names_tuple, # This is a tuple of names e.g. `("id", "name", "age")`
tablename,
schemaname = None,
primary_keys = None,
whitelist = None,
defaults = None,
update_on_conflict = False,
returning_id = False
):
# CHECK if we have both to update
# conflict, but primary key hasn't
# been specified.
if not primary_keys and update_on_conflict == True:
return False
# CREATE primary key(s) format string.
# Usually, if the primary keys are in the
# tuple form `("id", "year", "name")`, the
# resulting format, will be the string:
# `id, year, name".
primary_keys_fmt = ", ".join(primary_keys)
# CREATE a new list of names.
colnames = []
# CREATE a formatted string for the query.
queryfmt = "INSERT INTO {tablename_fmt} ({colnames_fmt}) VALUES ({placeholders}) "
if update_on_conflict:
queryfmt += " ON CONFLICT ({primary_keys_fmt}) DO UPDATE SET {colnames_update_fmt} "
if returning_id:
queryfmt += " RETURNING id"
# ADD final strings to the query one.
queryfmt += ";"
# IMPLEMENT whitelist filter.
if whitelist:
for correct_name in names_tuple:
if correct_name in whitelist:
colnames.append(correct_name)
else:
colnames = names_tuple
# ADD the schema name to the name of the table.
if schemaname:
tablename_fmt = "{schemaname}.{tablename}".format(schemaname=schemaname, tablename=tablename)
else:
tablename_fmt = tablename
# CREATE a placeholders string.
placeholders = ['%s'] * len(colnames)
# CREATE the update string, if necessary.
if update_on_conflict:
colnames_update_fmt = ", ".join("{0} = EXCLUDED.{0}".format(colname) for colname in colnames if colname not in primary_keys)
else:
colnames_update_fmt = None
# ADD those columns that must be updated on conflicts,
# by their DEFAULT values specified in sql table creation.
# CONVERT ['a', 'b', 'c'] to "a=DEFAULT, b=DEFAULT, c=DEFAULT"
if defaults:
default_colnames_fmt = ", ".join("{0}=DEFAULT".format(default) for default in defaults if default not in primary_keys)
# ADD the columns to the final update string.
colnames_update_fmt += ", " + default_colnames
# CREATE the final SQL query string.
query = queryfmt.format(
tablename_fmt = tablename,
colnames_fmt = ", ".join(colnames),
placeholders = ", ".join(placeholders),
primary_keys_fmt = primary_key,
colnames_update_fmt = colnames_update_fmt
)
# ADD {QUERIES, VALUES} couples.
for tuple_values in list_object:
self.queries.append(query)
self.values.append(tuple_values)
return True
def size(self):
return len(self.queries)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment