Last active
February 11, 2022 21:16
-
-
Save scarlson/5165866 to your computer and use it in GitHub Desktop.
Build and save objects based on Psycopg2 tables.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
### Postgres: | |
# | |
# CREATE TABLE film ( | |
# film_id varchar(40) CONSTRAINT firstkey PRIMARY KEY, | |
# did integer NOT NULL, | |
# date_prod date, | |
# imdb text, | |
# kind varchar(10), | |
# len interval hour to minute | |
# ); | |
# | |
# | |
### Python: | |
# | |
# class Film(BaseObject): | |
# pass | |
# | |
# Memento = Film('Memento') | |
# Memento.imdb = 'http://www.imdb.com/title/tt0209144/' | |
# if Memento.save(): | |
# print "Success!" | |
# | |
import psycopg2 | |
DATABASE = 'psycopg2 connect val' | |
class BaseObject(object): | |
def __init__(self, value, table=None, key=None): | |
self._table = table or self.__class__.__name__ | |
self._key = key or self._table + '_id' | |
self._getVals() | |
self._fetch(value) | |
@property | |
def conn(self): | |
if not hasattr(self, '_conn'): | |
self._conn = psycopg2.connect(DATABASE) | |
self._conn.set_client_encoding('UTF8') | |
if self._conn.closed: | |
self._conn = psycopg2.connect(DATABASE) | |
self._conn.set_client_encoding('UTF8') | |
return self._conn | |
@property | |
def cur(self): | |
if not hasattr(self, '_cur'): | |
self._cur = self.conn.cursor() | |
if self._cur.closed: | |
self._cur = self.conn.cursor() | |
return self._cur | |
def select(self, query, *params): | |
if params: | |
query = self.cur.mogrify(query, params) | |
else: | |
query = self.cur.mogrify(query) | |
self.cur.execute(query) | |
res = self.cur.fetchall() | |
self.cur.close() | |
self.conn.close() | |
if res: | |
if len(res) > 1: | |
return res | |
else: | |
return res[0] | |
def update(self, query, *params): | |
if params: | |
query = self.cur.mogrify(query, params) | |
else: | |
query = self.cur.mogrify(query) | |
try: | |
self.cur.execute(query) | |
self.conn.commit() | |
self.cur.close() | |
self.conn.close() | |
return True | |
except: | |
self.conn.rollback() | |
self.cur.close() | |
self.conn.close() | |
return False | |
def _getVals(self): | |
q = "select * from %s limit 1;" % self._table | |
q = self.cur.mogrify(q) | |
self.cur.execute(q) | |
vals = [val[0] for val in self.cur.description] | |
self._vals = vals | |
for val in vals: | |
self.__dict__[val] = None | |
self.cur.close() | |
self.conn.close() | |
@property | |
def exists(self): | |
if not hasattr(self, '_exists'): | |
if hasattr(self, '_key') and hasattr(self, '_table'): | |
q = 'select * from %(_table)s where %(_key)s =' % self.__dict__ | |
q += ' %s;' | |
if self.select(q, self.__dict__[self._key]): | |
return True | |
return False | |
def _fetch(self, unid): | |
if hasattr(self, '_key') and hasattr(self, '_table'): | |
q = 'select * from %(_table)s where %(_key)s =' % self.__dict__ | |
q += ' %s;' | |
vals = self.select(q, unid) | |
if vals: | |
vals = list(vals[::-1]) | |
for val in self._vals: | |
self.__dict__[val] = vals.pop() | |
else: | |
for val in self._vals: | |
self.__dict__[val] = None | |
self.__dict__[self._key] = unid | |
def save(self): | |
if hasattr(self, '_key') and hasattr(self, '_table'): | |
if self.exists: | |
keys = tuple(self._vals) | |
values = tuple(self.__dict__[val] for val in self._vals) | |
q = 'update ' + self._table + ' set (' + ",".join(keys) + ') = %s where ' + self._key + ' = %s;' | |
return self.update(q, values, self.__dict__[self._key]) | |
else: | |
q = "insert into " + self._table + " values %s;" | |
values = tuple(self.__dict__[val] for val in self._vals) | |
return self.update(q, values) | |
return False |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment