Skip to content

Instantly share code, notes, and snippets.

@quiver
Last active September 19, 2024 12:21
Show Gist options
  • Save quiver/4240546 to your computer and use it in GitHub Desktop.
Save quiver/4240546 to your computer and use it in GitHub Desktop.
Who says PostgreSQL can't Pub/Sub like Redis?

Pub/Sub pattern with PostgreSQL's LISTEN/NOTIFY command

This is a simple chat-like program using pub-sub pattern, backed by PostgreSQL's LISTEN/NOTIFY command.

Publish

publish message to foo channel from user nickname.

$ python pub.py foo nickname
PUBLISH to channel #foo
test message
what's up?
<Ctrl-D>
$

publish message to bar channel from user mike.

$ python pub.py bar mike
PUBLISH to channel #bar
pub/sub rules!
<Ctrl-D>
$

Subscribe

subscribe to foo or bar channel.

$ python sub.py foo bar
SUBSCRIBE TO channel #foo
SUBSCRIBE TO channel #bar
#foo - [nickname]:test message
#foo - [nickname]:what's up?
#bar - [mike]:pub/sub rules!
<Ctrl-C>
UNSUBSCRIBE FROM channel #foo
UNSUBSCRIBE FROM channel #bar
$
# vim: set fileencoding=utf8
# publisher of pub/sub pattern
# Usage:
# $ python pub.py <channel> <nickname>
# http://initd.org/psycopg/docs/advanced.html#asynchronous-notifications
import sys
import psycopg2
import psycopg2.extensions
DSN = "dbname=dev user=jsmith"
conn = psycopg2.connect(DSN)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
curs = conn.cursor()
channel = sys.argv[1]
nickname = sys.argv[2]
print "PUBLISH to channel #%s" % channel
while True:
try:
message = raw_input()
chat_message = psycopg2.extensions.QuotedString(
'[%s]:%s' % (nickname, message)).getquoted()
curs.execute("NOTIFY %s, %s;" % (channel, chat_message))
# If you want to use pg_notify function
#curs.execute("SELECT pg_notify('%s', %s);" % (channel, chat_message))
except EOFError:
break
# vim: set fileencoding=utf8
# subscriber of pub/sub pattern
# Usage:
# $ python sub.py <channel> <channel> ...
# http://initd.org/psycopg/docs/advanced.html#asynchronous-notifications
import select
import sys
import psycopg2
import psycopg2.extensions
DSN = "dbname=dev user=jsmith"
conn = psycopg2.connect(DSN)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
curs = conn.cursor()
for channel in sys.argv[1:]:
print 'SUBSCRIBE TO channel #%s' % channel
curs.execute("LISTEN %s;" % channel)
epoll = select.epoll()
epoll.register(conn, select.EPOLLIN)
while True:
try:
events = epoll.poll()
conn.poll()
while conn.notifies:
notify = conn.notifies.pop()
print "#%s - %s" % (notify.channel, notify.payload)
except BaseException, err:
print err
break
for channel in sys.argv[1:]:
print 'UNSUBSCRIBE FROM channel #%s' % channel
curs.execute("UNLISTEN %s;" % channel)
@reignmaker
Copy link

Keep in mind that PostgreSQL's LISTEN/NOTIFY wont send/receive until the current transaction is committed with success. So, for example, if you have some long transaction and several NOTIFYs within it in different life time, you won't get them instantly one after another, you'll get them at once as soon as the transaction is committed successfully. So, yeah, PostgreSQL can't REALLY Pub/Sub like Redis, sadly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment