Skip to content

Instantly share code, notes, and snippets.

@iuridiniz
Created May 6, 2018 23:17
Show Gist options
  • Save iuridiniz/1f7f5d9bd8be999b1564d7fdc9a4e0e1 to your computer and use it in GitHub Desktop.
Save iuridiniz/1f7f5d9bd8be999b1564d7fdc9a4e0e1 to your computer and use it in GitHub Desktop.
pyscopg-testreplication
from __future__ import print_function
import sys
import psycopg2
import psycopg2.extras
conn = psycopg2.connect('dbname=psycopg2_test user=iuri',
connection_factory=psycopg2.extras.LogicalReplicationConnection)
cur = conn.cursor()
try:
# test_decoding produces textual output
cur.start_replication(slot_name='pytest', decode=True)
except psycopg2.ProgrammingError:
cur.create_replication_slot('pytest', output_plugin='test_decoding')
cur.start_replication(slot_name='pytest', decode=True)
class DemoConsumer(object):
def __call__(self, msg):
print(msg.payload)
msg.cursor.send_feedback(flush_lsn=msg.data_start)
democonsumer = DemoConsumer()
print("Starting streaming, press Control-C to end...", file=sys.stderr)
try:
cur.consume_stream(democonsumer)
except KeyboardInterrupt:
cur.close()
conn.close()
print("The slot 'pytest' still exists. Drop it with "
"SELECT pg_drop_replication_slot('pytest'); if no longer needed.",
file=sys.stderr)
print("WARNING: Transaction logs will accumulate in pg_xlog "
"until the slot is dropped.", file=sys.stderr)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment