Skip to content

Instantly share code, notes, and snippets.

@adimyth
Created September 24, 2025 09:40
Show Gist options
  • Select an option

  • Save adimyth/52f1a997d5a058c8d933b92f63396204 to your computer and use it in GitHub Desktop.

Select an option

Save adimyth/52f1a997d5a058c8d933b92f63396204 to your computer and use it in GitHub Desktop.
Inbox & Outbox Pattern for message delivery

Inbox and Outbox Patterns: A Complete Guide

Core Differentiation Summary

Inbox Pattern: "Reliable Receiving"

  • Purpose: Ensure recipients can reliably consume messages
  • Location: Consumer/Recipient side
  • Problem Solved: Messages lost when recipient is unavailable
  • Use When: You need guaranteed message availability for consumers

Outbox Pattern: "Reliable Sending"

  • Purpose: Ensure senders reliably publish messages
  • Location: Producer/Sender side
  • Problem Solved: Messages lost when sender fails after business logic but before sending
  • Use When: You need transactional consistency between business operations and message sending

When to Use Each Pattern

Use Inbox Pattern When:

  • Recipients might be offline/unavailable
  • You need message persistence and replay capability
  • Multiple consumers need the same messages
  • You want to decouple message production from consumption timing
  • You need "at-least-once" delivery guarantees

Examples:

  • Chat applications (users offline)
  • Email systems
  • Task queues for workers
  • Event streams for analytics

Use Outbox Pattern When:

  • Message sending must be part of a database transaction
  • You need "exactly-once" delivery guarantees
  • You can't afford to lose messages due to network failures
  • You need audit trails of sent messages
  • Cross-service communication needs to be reliable

Examples:

  • Payment confirmations (must send exactly once)
  • Order processing workflows
  • Critical system notifications
  • Microservice event publishing

πŸ“₯ Inbox Pattern Architecture

sequenceDiagram
    participant S as Sender
    participant I as Bob's Inbox
    participant B as Bob (Recipient)
    participant N as Notification Service

    S->>I: 1. Send message to Bob's inbox
    I->>I: 2. Store message persistently
    I-->>S: 3. Acknowledge receipt
    I->>N: 4. Trigger notification
    N->>B: 5. Push notification to Bob
    B->>I: 6. Poll inbox for messages
    I->>B: 7. Return messages
    B->>I: 8. Acknowledge message read
Loading

Python Implementation - Inbox Pattern

import asyncio
import sqlite3
from datetime import datetime
from typing import List, Optional
from dataclasses import dataclass
from enum import Enum

class MessageStatus(Enum):
    UNREAD = "unread"
    READ = "read"

@dataclass
class Message:
    id: int
    sender_id: str
    recipient_id: str
    content: str
    created_at: datetime
    status: MessageStatus

class InboxService:
    def __init__(self, db_path: str):
        self.db_path = db_path
        self._init_db()

    def _init_db(self):
        conn = sqlite3.connect(self.db_path)
        conn.execute('''
            CREATE TABLE IF NOT EXISTS inbox_messages (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                sender_id TEXT NOT NULL,
                recipient_id TEXT NOT NULL,
                content TEXT NOT NULL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                status TEXT DEFAULT 'unread'
            )
        ''')
        conn.commit()
        conn.close()

    async def deliver_message(self, sender_id: str, recipient_id: str, content: str) -> int:
        """Deliver message to recipient's inbox"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.execute('''
            INSERT INTO inbox_messages (sender_id, recipient_id, content, status)
            VALUES (?, ?, ?, ?)
        ''', (sender_id, recipient_id, content, MessageStatus.UNREAD.value))

        message_id = cursor.lastrowid
        conn.commit()
        conn.close()

        # Trigger notification
        await self._notify_user(recipient_id, message_id)
        return message_id

    async def get_unread_messages(self, user_id: str) -> List[Message]:
        """Get all unread messages for a user"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.execute('''
            SELECT id, sender_id, recipient_id, content, created_at, status
            FROM inbox_messages
            WHERE recipient_id = ? AND status = ?
            ORDER BY created_at DESC
        ''', (user_id, MessageStatus.UNREAD.value))

        messages = []
        for row in cursor.fetchall():
            messages.append(Message(
                id=row[0],
                sender_id=row[1],
                recipient_id=row[2],
                content=row[3],
                created_at=datetime.fromisoformat(row[4]),
                status=MessageStatus(row[5])
            ))

        conn.close()
        return messages

    async def mark_messages_read(self, user_id: str, message_ids: List[int]):
        """Mark messages as read"""
        conn = sqlite3.connect(self.db_path)
        placeholders = ','.join('?' * len(message_ids))
        conn.execute(f'''
            UPDATE inbox_messages
            SET status = ?
            WHERE recipient_id = ? AND id IN ({placeholders})
        ''', [MessageStatus.READ.value, user_id] + message_ids)

        conn.commit()
        conn.close()

    async def _notify_user(self, user_id: str, message_id: int):
        """Trigger real-time notification"""
        print(f"πŸ”” Notification sent to user {user_id} for message {message_id}")

# Usage Example
async def inbox_pattern_example():
    inbox_service = InboxService("inbox.db")

    # Alice sends message to Bob
    message_id = await inbox_service.deliver_message("alice", "bob", "Hello Bob!")

    # Bob gets his unread messages
    unread = await inbox_service.get_unread_messages("bob")
    print(f"Bob has {len(unread)} unread messages")

    # Bob marks messages as read
    await inbox_service.mark_messages_read("bob", [message_id])

πŸ“€ Outbox Pattern Architecture

sequenceDiagram
    participant A as Alice (Sender)
    participant DB as Alice's Database
    participant O as Alice's Outbox
    participant BP as Background Processor
    participant B as Bob's Service

    A->>DB: 1. Start transaction
    A->>DB: 2. Execute business logic
    A->>O: 3. Write to outbox (same transaction)
    DB-->>A: 4. Commit transaction

    loop Background Processing
        BP->>O: 5. Poll outbox for pending messages
        O->>BP: 6. Return pending messages
        BP->>B: 7. Deliver message to Bob's service
        B-->>BP: 8. Acknowledge delivery
        BP->>O: 9. Mark message as completed
    end
Loading

Note

Here, the critical piece is that the business logic and message to outbox (steps 2 & 3) are written in the same transaction.

Python Implementation - Outbox Pattern

import asyncio
import sqlite3
import json
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from enum import Enum

class OutboxStatus(Enum):
    PENDING = "pending"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class OutboxMessage:
    id: Optional[int]
    event_type: str
    payload: Dict
    created_at: datetime
    status: OutboxStatus
    retry_count: int = 0
    error_message: Optional[str] = None

class OutboxService:
    def __init__(self, db_path: str):
        self.db_path = db_path
        self._init_db()

    def _init_db(self):
        conn = sqlite3.connect(self.db_path)

        # Business data table
        conn.execute('''
            CREATE TABLE IF NOT EXISTS messages (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                sender_id TEXT NOT NULL,
                recipient_id TEXT NOT NULL,
                content TEXT NOT NULL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')

        # Outbox table
        conn.execute('''
            CREATE TABLE IF NOT EXISTS outbox (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                event_type TEXT NOT NULL,
                payload TEXT NOT NULL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                status TEXT DEFAULT 'pending',
                retry_count INTEGER DEFAULT 0,
                error_message TEXT
            )
        ''')

        conn.commit()
        conn.close()

    async def send_message_with_outbox(self, sender_id: str, recipient_id: str, content: str):
        """Send message using outbox pattern for reliability"""
        conn = sqlite3.connect(self.db_path)

        try:
            # Start transaction
            conn.execute("BEGIN TRANSACTION")

            # 1. Business logic - save message
            cursor = conn.execute('''
                INSERT INTO messages (sender_id, recipient_id, content)
                VALUES (?, ?, ?)
            ''', (sender_id, recipient_id, content))

            message_id = cursor.lastrowid

            # 2. Write to outbox in same transaction
            outbox_payload = {
                "message_id": message_id,
                "sender_id": sender_id,
                "recipient_id": recipient_id,
                "content": content
            }

            conn.execute('''
                INSERT INTO outbox (event_type, payload, status)
                VALUES (?, ?, ?)
            ''', ("message.send", json.dumps(outbox_payload), OutboxStatus.PENDING.value))

            # 3. Commit transaction - both succeed or both fail
            conn.commit()
            print(f"βœ… Message {message_id} saved with outbox entry")

        except Exception as e:
            conn.rollback()
            print(f"❌ Transaction failed: {e}")
            raise
        finally:
            conn.close()

    async def get_pending_messages(self) -> List[OutboxMessage]:
        """Get pending messages from outbox"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.execute('''
            SELECT id, event_type, payload, created_at, status, retry_count, error_message
            FROM outbox
            WHERE status = ?
            ORDER BY created_at ASC
        ''', (OutboxStatus.PENDING.value,))

        messages = []
        for row in cursor.fetchall():
            messages.append(OutboxMessage(
                id=row[0],
                event_type=row[1],
                payload=json.loads(row[2]),
                created_at=datetime.fromisoformat(row[3]),
                status=OutboxStatus(row[4]),
                retry_count=row[5],
                error_message=row[6]
            ))

        conn.close()
        return messages

    async def mark_completed(self, message_id: int):
        """Mark outbox message as completed"""
        conn = sqlite3.connect(self.db_path)
        conn.execute('''
            UPDATE outbox
            SET status = ?, error_message = NULL
            WHERE id = ?
        ''', (OutboxStatus.COMPLETED.value, message_id))
        conn.commit()
        conn.close()

    async def mark_failed(self, message_id: int, error: str):
        """Mark outbox message as failed"""
        conn = sqlite3.connect(self.db_path)
        conn.execute('''
            UPDATE outbox
            SET status = ?, retry_count = retry_count + 1, error_message = ?
            WHERE id = ?
        ''', (OutboxStatus.FAILED.value, error, message_id))
        conn.commit()
        conn.close()

class OutboxProcessor:
    def __init__(self, outbox_service: OutboxService):
        self.outbox_service = outbox_service

    async def process_outbox(self):
        """Background process to handle outbox messages"""
        pending_messages = await self.outbox_service.get_pending_messages()

        for message in pending_messages:
            try:
                # Simulate message delivery
                await self._deliver_message(message.payload)
                await self.outbox_service.mark_completed(message.id)
                print(f"βœ… Message {message.id} delivered successfully")

            except Exception as e:
                await self.outbox_service.mark_failed(message.id, str(e))
                print(f"❌ Failed to deliver message {message.id}: {e}")

    async def _deliver_message(self, payload: Dict):
        """Simulate message delivery to external service"""
        # Simulate network call
        await asyncio.sleep(0.1)

        # Simulate occasional failures
        import random
        if random.random() < 0.1:  # 10% failure rate
            raise Exception("Network timeout")

        print(f"πŸ“€ Delivered message to {payload['recipient_id']}: {payload['content']}")

# Usage Example
async def outbox_pattern_example():
    outbox_service = OutboxService("outbox.db")
    processor = OutboxProcessor(outbox_service)

    # Alice sends message (stored in outbox)
    await outbox_service.send_message_with_outbox("alice", "bob", "Hello from outbox!")

    # Background processor handles delivery
    await processor.process_outbox()

βœ‰οΈ Combined Inbox + Outbox Pattern Architecture

sequenceDiagram
    participant A as Alice (Sender)
    participant ADB as Alice's DB + Outbox
    participant BP as Background Processor
    participant BInbox as Bob's Inbox
    participant B as Bob (Recipient)
    participant Push as Push Service

    A->>ADB: 1. Business logic + Outbox (transaction)
    ADB-->>A: 2. Acknowledge local save

    loop Reliable Delivery
        BP->>ADB: 3. Poll Alice's outbox
        ADB->>BP: 4. Return pending messages
        BP->>BInbox: 5. Deliver to Bob's inbox
        BInbox-->>BP: 6. Acknowledge inbox delivery
        BP->>ADB: 7. Mark outbox as completed
    end

    BInbox->>Push: 8. Trigger push notification
    Push->>B: 9. Notify Bob of new message
    B->>BInbox: 10. Fetch messages from inbox
    BInbox->>B: 11. Return messages
    B->>BInbox: 12. Mark messages as read
Loading

Chat Application Benefits

Outbox Pattern Provides:

  • βœ… Messages never lost due to server crashes
  • βœ… Exactly-once sending (no duplicate messages)
  • βœ… Audit trail of all sent messages
  • βœ… Can retry failed deliveries

Inbox Pattern Provides:

  • βœ… Messages available when user comes online
  • βœ… Multi-device synchronization
  • βœ… Message history and search
  • βœ… Push notification triggers
  • βœ… Unread message counts

Real-Time Flow:

1. Alice types message β†’ Outbox (local transaction)
2. Background process β†’ Reads outbox β†’ Delivers to Bob's inbox
3. Bob's inbox updated β†’ WebSocket notification sent
4. Bob's device receives notification β†’ Fetches from inbox
5. Acknowledgments flow back through the system

Push Notifications Integration:

Bob's Inbox Service:
  ↓ (when new message arrives)
APNS/FCM Service ← Reads from inbox events
  ↓ (push notification)
Bob's Device ← Gets notification
  ↓ (app opens)
Bob's Inbox ← App fetches actual messages

Key Takeaway

Use both patterns together for mission-critical messaging:

  • Outbox = "I guarantee to send this message exactly once"
  • Inbox = "I guarantee you can receive this message when ready"
  • Together = End-to-end reliable messaging with exactly-once semantics

For chat applications specifically, this combination ensures:

  • No messages are lost
  • Users can access message history offline
  • Real-time delivery works reliably
  • System can handle network failures gracefully
  • Messages are delivered exactly once
  • Complete audit trail exists
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment