Created
September 30, 2024 13:21
-
-
Save dincosman/ef1e97f3f4e31ee84968da37afa302cd to your computer and use it in GitHub Desktop.
a Python script that listens for notifications on the new_email channel and sends emails
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import psycopg2 | |
import select | |
from datetime import datetime | |
import subprocess | |
import logging | |
import time | |
#Configure logging | |
logging.basicConfig( | |
filename='/var/log/postgresql_email_listener/postgresql_email_listener.log', | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s' | |
) | |
def send_mail(email_data): | |
# This section is Docstring. Not a madatory section but a best practice for code quality, maintainability and usability. | |
""" | |
Sends an email using the `sendmail`command. | |
Args: | |
email_data(str): Concatenated email data using '€€' as delimiter. | |
Returns: | |
bool: True if the email is sent succesfully, False otherwise. | |
""" | |
# Split the email data using '€€' delimiter | |
email_from, email_to, subject, body = email_data.split('€€') | |
# Construct the email message in the `sendmail` format | |
message = f"""From: {email_from} | |
To: {email_to} | |
Subject: {subject} | |
Content-Type: text/plain; charset="UTF-8" | |
{body} | |
""" | |
try: | |
# Use the sendmail command with -t option | |
process = subprocess.Popen(["/usr/sbin/sendmail", "-t"], stdin=subprocess.PIPE, stderr=subprocess.PIPE) | |
stdout, stderr = process.communicate(message.encode("utf-8")) | |
if process.returncode == 0: | |
logging.info(f"Email sent succesfully to {email_to}") | |
return True | |
else: | |
logging.error(f"Failed to send email to {email_to}. Error: {stderr.decode('utf-8').strip()}") | |
return False | |
except Exception as e: | |
logging.error(f"An error occurred while sending email: {e}") | |
return False | |
def process_email(cur, email_id): | |
""" Fetch and process a single email by ID.""" | |
cur.execute(""" | |
SELECT email_from || '€€' || email_to || '€€' || email_subject || '€€' || email_body | |
FROM mailer.t_email_notifications where id = %s; | |
""", (email_id,)) | |
email = cur.fetchone() | |
if email: | |
# Email data will be in the format of '€€' concatenated string | |
email_data = email[0] | |
# Send the email using sendmail | |
success = send_mail(email_data) | |
if success: | |
# Update the record to mark it as sent and set the sent_time | |
cur.execute("UPDATE mailer.t_email_notifications SET sent= TRUE, sent_time = %s WHERE id = %s;", | |
(datetime.now(), email_id)) | |
logging.info(f"Email ID {email_id} marked as sent.") | |
def process_unsent_emails(cur): | |
""" Fetch and process all unsent emails.""" | |
cur.execute("SELECT id FROM mailer.t_email_notifications WHERE sent = FALSE;") | |
unsent_emails = cur.fetchall() | |
for email in unsent_emails: | |
process_email(cur,email[0]) | |
def listen_notifications(conn_params): | |
"""Main loop to listen for PostgreSQL notifications.""" | |
while True: | |
try: | |
# Establish a connection to the database | |
logging.info("Connecting to the database...") | |
# Connect to the PostgreSQL database via Unix socket | |
conn = psycopg2.connect(**conn_params) | |
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) | |
cur = conn.cursor() | |
# Listen to the new_email channel | |
cur.execute("LISTEN new_email;") | |
logging.info("Waiting for notications on channel 'new_email'...") | |
while True: | |
# Use select to wait for 60 seconds(like sleep) any input on the connection | |
if select.select([conn],[],[],60) == ([],[],[]): | |
continue # No notification received, loop back and wait again | |
conn.poll() | |
while conn.notifies: | |
notify = conn.notifies.pop(0) | |
logging.info(f"Received notification: {notify.payload}") | |
# Process the email corresponding to the received ID | |
process_email(cur, notify.payload) | |
# After processing the first notified email, process all unsent emails | |
process_unsent_emails(cur) | |
except (psycopg2.OperationalError,psycopg2.DatabaseError) as e: | |
logging.error(f"Database connection error: {e}. Reconnecting in 60 seconds...") | |
time.sleep(60) # Wait before trying to reconnect | |
except KeyboardInterrupt: | |
logging.info("Listener stopped.") | |
break | |
finally: | |
# Ensure the cursor and connection are closed if open | |
try: | |
cur.close() | |
conn.close() | |
logging.info("Connection closed.") | |
except NameError: | |
pass # If cur/conn not defined, skip closing | |
if __name__ == "__main__": | |
conn_params = { | |
'dbname': 'postgres', | |
'host': 'pgcluster.localdomain', | |
'port': 5000, | |
'user': 'mailer', | |
'password': 'XXXXX' | |
} | |
listen_notifications(conn_params) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment