Last active
November 19, 2016 21:51
-
-
Save ergo70/dbda4bee9bfac8706365f3317b1dbddc to your computer and use it in GitHub Desktop.
psycopg2 asynchronous cache updates w. LISTEN/NOTIFY and tcn
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -*- coding: utf-8 -*- | |
| """ | |
| Created on Sun Nov 13 00:00:06 2016 | |
| @author: ergo | |
| """ | |
| import select | |
| import psycopg2 | |
| import psycopg2.extensions | |
| import threading | |
| from time import sleep | |
| def populate_cache(conn): | |
| curs = conn.cursor() | |
| curs.execute('SELECT face_name, id FROM cats;') | |
| cats = curs.fetchall() | |
| curs.close() | |
| cats = dict(cats) | |
| return cats | |
| def insert(conn, cats, v): | |
| curs = conn.cursor() | |
| curs.execute('SELECT face_name, id FROM cats WHERE face_name = %s;', (v,)) | |
| val = curs.fetchone() | |
| curs.close() | |
| cats[val[0]] = val[1] | |
| def update(conn, cats, v): | |
| curs = conn.cursor() | |
| curs.execute('SELECT face_name FROM cats WHERE id = %s;', (cats[v],)) | |
| val = curs.fetchone() | |
| curs.close() | |
| cats[val[0]] = cats.pop(v) | |
| def notify(conn, cats): | |
| while True: | |
| if select.select([conn],[],[],1) != ([],[],[]): | |
| conn.poll() | |
| if conn.notifies: | |
| #print "Must refresh cache" | |
| while conn.notifies: | |
| notify = conn.notifies.pop(0) | |
| #print "Got NOTIFY:", notify.pid, notify.channel, notify.payload | |
| ret = notify.payload.split(',') | |
| #print ret | |
| op = ret[1] | |
| v = ret[2].split('=')[1].replace("'",'') | |
| if op == 'I': | |
| insert(conn, cats, v) | |
| elif op == 'D': | |
| del cats[v] | |
| elif op == 'U': | |
| update(conn, cats, v) | |
| #print cats.keys() | |
| conn = psycopg2.connect("host=<host> dbname=<database> user=<user> password=<password>") | |
| conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) | |
| curs = conn.cursor() | |
| curs.execute("LISTEN cats;") | |
| curs.close() | |
| print "Waiting for notifications on channel 'cats'" | |
| cats = populate_cache(conn) | |
| t = threading.Thread(target=notify, args=(conn,cats)) | |
| t.daemon = True | |
| t.start() | |
| while True: | |
| print cats.keys() | |
| sleep(10) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment