Skip to content

Instantly share code, notes, and snippets.

@quiver
Last active September 19, 2024 12:21
Show Gist options
  • Save quiver/4240546 to your computer and use it in GitHub Desktop.
Save quiver/4240546 to your computer and use it in GitHub Desktop.
Who says PostgreSQL can't Pub/Sub like Redis?

Pub/Sub pattern with PostgreSQL's LISTEN/NOTIFY command

This is a simple chat-like program using pub-sub pattern, backed by PostgreSQL's LISTEN/NOTIFY command.

Publish

publish message to foo channel from user nickname.

$ python pub.py foo nickname
PUBLISH to channel #foo
test message
what's up?
<Ctrl-D>
$

publish message to bar channel from user mike.

$ python pub.py bar mike
PUBLISH to channel #bar
pub/sub rules!
<Ctrl-D>
$

Subscribe

subscribe to foo or bar channel.

$ python sub.py foo bar
SUBSCRIBE TO channel #foo
SUBSCRIBE TO channel #bar
#foo - [nickname]:test message
#foo - [nickname]:what's up?
#bar - [mike]:pub/sub rules!
<Ctrl-C>
UNSUBSCRIBE FROM channel #foo
UNSUBSCRIBE FROM channel #bar
$
# vim: set fileencoding=utf8
# publisher of pub/sub pattern
# Usage:
# $ python pub.py <channel> <nickname>
# http://initd.org/psycopg/docs/advanced.html#asynchronous-notifications
import sys
import psycopg2
import psycopg2.extensions
DSN = "dbname=dev user=jsmith"
conn = psycopg2.connect(DSN)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
curs = conn.cursor()
channel = sys.argv[1]
nickname = sys.argv[2]
print "PUBLISH to channel #%s" % channel
while True:
try:
message = raw_input()
chat_message = psycopg2.extensions.QuotedString(
'[%s]:%s' % (nickname, message)).getquoted()
curs.execute("NOTIFY %s, %s;" % (channel, chat_message))
# If you want to use pg_notify function
#curs.execute("SELECT pg_notify('%s', %s);" % (channel, chat_message))
except EOFError:
break
# vim: set fileencoding=utf8
# subscriber of pub/sub pattern
# Usage:
# $ python sub.py <channel> <channel> ...
# http://initd.org/psycopg/docs/advanced.html#asynchronous-notifications
import select
import sys
import psycopg2
import psycopg2.extensions
DSN = "dbname=dev user=jsmith"
conn = psycopg2.connect(DSN)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
curs = conn.cursor()
for channel in sys.argv[1:]:
print 'SUBSCRIBE TO channel #%s' % channel
curs.execute("LISTEN %s;" % channel)
epoll = select.epoll()
epoll.register(conn, select.EPOLLIN)
while True:
try:
events = epoll.poll()
conn.poll()
while conn.notifies:
notify = conn.notifies.pop()
print "#%s - %s" % (notify.channel, notify.payload)
except BaseException, err:
print err
break
for channel in sys.argv[1:]:
print 'UNSUBSCRIBE FROM channel #%s' % channel
curs.execute("UNLISTEN %s;" % channel)
@hazardland
Copy link

Your code showed me a true path, it is outdated for python 3.6 already but there is a pgpubsub module now (which by itself uses select inside). I use this approach to deliver news events and notficiation to chat server clients via websocket from database (chat server handles its own clients and messages with websocket only):

import os
import pgpubsub

from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())

import websocket
import json

server = websocket.WebSocket()
server.connect('ws://127.0.0.1:7000/event')

pubsub = pgpubsub.connect(os.getenv("DB"))
pubsub.listen('chat')

for event in pubsub.events(yield_timeouts=True):
    if event is None:
        # keep websocket connection alive
        print('ping')
        server.send('{"type":"ping"}')
    else:
        # send event payload to chat server via websocket
        print(event.payload)
        server.send(event.payload)

@chokosabe
Copy link

Your code showed me a true path, it is outdated for python 3.6 already but there is a pgpubsub module now (which by itself uses select inside). I use this approach to deliver news events and notficiation to chat server clients via websocket from database (chat server handles its own clients and messages with websocket only):

import os
import pgpubsub

from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())

import websocket
import json

server = websocket.WebSocket()
server.connect('ws://127.0.0.1:7000/event')

pubsub = pgpubsub.connect(os.getenv("DB"))
pubsub.listen('chat')

for event in pubsub.events(yield_timeouts=True):
    if event is None:
        # keep websocket connection alive
        print('ping')
        server.send('{"type":"ping"}')
    else:
        # send event payload to chat server via websocket
        print(event.payload)
        server.send(event.payload)

This is really nice! Thx.

@reignmaker
Copy link

Keep in mind that PostgreSQL's LISTEN/NOTIFY wont send/receive until the current transaction is committed with success. So, for example, if you have some long transaction and several NOTIFYs within it in different life time, you won't get them instantly one after another, you'll get them at once as soon as the transaction is committed successfully. So, yeah, PostgreSQL can't REALLY Pub/Sub like Redis, sadly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment