Skip to content

Instantly share code, notes, and snippets.

@Akuma-U1
Last active September 13, 2023 13:15
Show Gist options
  • Save Akuma-U1/a4662fa3bf0115218cbcd0d2ee3ff95f to your computer and use it in GitHub Desktop.
Save Akuma-U1/a4662fa3bf0115218cbcd0d2ee3ff95f to your computer and use it in GitHub Desktop.
Postgres DB Listener in Python with exponential backoff - asyncpg and asyncio
import asyncio
import asyncpg
async def listen_for_notifications():
backoff_times = [1, 2, 4, 8, 16] # Backoff times in seconds
backoff_counter = 0
while True:
try:
conn: asyncpg.Connection = await asyncpg.connect(
host="your_host",
port=5432, #this can be a different port if you are running your postgres on a different port
database="db_name",
user="db_user", #typically a default user, like 'postgres' user can be used
password="your_password" #password for the db_user to log in and make requests on the db
)
print("Connected to database")
backoff_counter = 0
# Register a listener for the "mychannel" channel
def on_notification(connection, pid, channel, payload):
print(f"Received notification on channel {channel}: {payload}")
def on_termination(connection):
print("Connection terminated")
await conn.add_listener("mychannel", on_notification)
conn.add_termination_listener(on_termination)
# Wait for notifications
while True:
await asyncio.sleep(1)
if conn.is_closed():
print("Connection closed. Retrying...")
break
await conn.close()
except (asyncpg.exceptions.PostgresConnectionError, ConnectionRefusedError):
if backoff_counter < len(backoff_times):
backoff_time = backoff_times[backoff_counter]
print(f"Connection lost. Retrying in {backoff_time} seconds...")
await asyncio.sleep(backoff_time)
backoff_counter += 1
else:
print("Connection lost. Maximum retry attempts reached.")
break
finally:
await conn.remove_listener("mychannel", on_notification)
conn.remove_termination_listener(on_termination)
if __name__ == '__main__':
asyncio.run(listen_for_notifications())
@Akuma-U1
Copy link
Author

his code sets up an asynchronous PostgreSQL database listener using asyncpg, continuously waiting for real-time notifications on the "mychannel" channel. When a notification is received, it prints the channel and payload, making it suitable for applications that require immediate reactions to database events, such as real-time updates or logging.

In a real use case you will add more functionality to the on_notification and on_termination parts of the code, adding your updates or logging functionality as needed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment