Last active
September 13, 2023 13:15
-
-
Save Akuma-U1/a4662fa3bf0115218cbcd0d2ee3ff95f to your computer and use it in GitHub Desktop.
Postgres DB Listener in Python with exponential backoff - asyncpg and asyncio
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import asyncpg | |
async def listen_for_notifications(): | |
backoff_times = [1, 2, 4, 8, 16] # Backoff times in seconds | |
backoff_counter = 0 | |
while True: | |
try: | |
conn: asyncpg.Connection = await asyncpg.connect( | |
host="your_host", | |
port=5432, #this can be a different port if you are running your postgres on a different port | |
database="db_name", | |
user="db_user", #typically a default user, like 'postgres' user can be used | |
password="your_password" #password for the db_user to log in and make requests on the db | |
) | |
print("Connected to database") | |
backoff_counter = 0 | |
# Register a listener for the "mychannel" channel | |
def on_notification(connection, pid, channel, payload): | |
print(f"Received notification on channel {channel}: {payload}") | |
def on_termination(connection): | |
print("Connection terminated") | |
await conn.add_listener("mychannel", on_notification) | |
conn.add_termination_listener(on_termination) | |
# Wait for notifications | |
while True: | |
await asyncio.sleep(1) | |
if conn.is_closed(): | |
print("Connection closed. Retrying...") | |
break | |
await conn.close() | |
except (asyncpg.exceptions.PostgresConnectionError, ConnectionRefusedError): | |
if backoff_counter < len(backoff_times): | |
backoff_time = backoff_times[backoff_counter] | |
print(f"Connection lost. Retrying in {backoff_time} seconds...") | |
await asyncio.sleep(backoff_time) | |
backoff_counter += 1 | |
else: | |
print("Connection lost. Maximum retry attempts reached.") | |
break | |
finally: | |
await conn.remove_listener("mychannel", on_notification) | |
conn.remove_termination_listener(on_termination) | |
if __name__ == '__main__': | |
asyncio.run(listen_for_notifications()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
his code sets up an asynchronous PostgreSQL database listener using asyncpg, continuously waiting for real-time notifications on the "mychannel" channel. When a notification is received, it prints the channel and payload, making it suitable for applications that require immediate reactions to database events, such as real-time updates or logging.
In a real use case you will add more functionality to the
on_notification
andon_termination
parts of the code, adding your updates or logging functionality as needed.