- Purpose: Ensure recipients can reliably consume messages
- Location: Consumer/Recipient side
- Problem Solved: Messages lost when recipient is unavailable
- Use When: You need guaranteed message availability for consumers
- Purpose: Ensure senders reliably publish messages
- Location: Producer/Sender side
- Problem Solved: Messages lost when sender fails after business logic but before sending
- Use When: You need transactional consistency between business operations and message sending
- Recipients might be offline/unavailable
- You need message persistence and replay capability
- Multiple consumers need the same messages
- You want to decouple message production from consumption timing
- You need "at-least-once" delivery guarantees
Examples:
- Chat applications (users offline)
- Email systems
- Task queues for workers
- Event streams for analytics
- Message sending must be part of a database transaction
- You need "exactly-once" delivery guarantees
- You can't afford to lose messages due to network failures
- You need audit trails of sent messages
- Cross-service communication needs to be reliable
Examples:
- Payment confirmations (must send exactly once)
- Order processing workflows
- Critical system notifications
- Microservice event publishing
sequenceDiagram
participant S as Sender
participant I as Bob's Inbox
participant B as Bob (Recipient)
participant N as Notification Service
S->>I: 1. Send message to Bob's inbox
I->>I: 2. Store message persistently
I-->>S: 3. Acknowledge receipt
I->>N: 4. Trigger notification
N->>B: 5. Push notification to Bob
B->>I: 6. Poll inbox for messages
I->>B: 7. Return messages
B->>I: 8. Acknowledge message read
import asyncio
import sqlite3
from datetime import datetime
from typing import List, Optional
from dataclasses import dataclass
from enum import Enum
class MessageStatus(Enum):
UNREAD = "unread"
READ = "read"
@dataclass
class Message:
id: int
sender_id: str
recipient_id: str
content: str
created_at: datetime
status: MessageStatus
class InboxService:
def __init__(self, db_path: str):
self.db_path = db_path
self._init_db()
def _init_db(self):
conn = sqlite3.connect(self.db_path)
conn.execute('''
CREATE TABLE IF NOT EXISTS inbox_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sender_id TEXT NOT NULL,
recipient_id TEXT NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status TEXT DEFAULT 'unread'
)
''')
conn.commit()
conn.close()
async def deliver_message(self, sender_id: str, recipient_id: str, content: str) -> int:
"""Deliver message to recipient's inbox"""
conn = sqlite3.connect(self.db_path)
cursor = conn.execute('''
INSERT INTO inbox_messages (sender_id, recipient_id, content, status)
VALUES (?, ?, ?, ?)
''', (sender_id, recipient_id, content, MessageStatus.UNREAD.value))
message_id = cursor.lastrowid
conn.commit()
conn.close()
# Trigger notification
await self._notify_user(recipient_id, message_id)
return message_id
async def get_unread_messages(self, user_id: str) -> List[Message]:
"""Get all unread messages for a user"""
conn = sqlite3.connect(self.db_path)
cursor = conn.execute('''
SELECT id, sender_id, recipient_id, content, created_at, status
FROM inbox_messages
WHERE recipient_id = ? AND status = ?
ORDER BY created_at DESC
''', (user_id, MessageStatus.UNREAD.value))
messages = []
for row in cursor.fetchall():
messages.append(Message(
id=row[0],
sender_id=row[1],
recipient_id=row[2],
content=row[3],
created_at=datetime.fromisoformat(row[4]),
status=MessageStatus(row[5])
))
conn.close()
return messages
async def mark_messages_read(self, user_id: str, message_ids: List[int]):
"""Mark messages as read"""
conn = sqlite3.connect(self.db_path)
placeholders = ','.join('?' * len(message_ids))
conn.execute(f'''
UPDATE inbox_messages
SET status = ?
WHERE recipient_id = ? AND id IN ({placeholders})
''', [MessageStatus.READ.value, user_id] + message_ids)
conn.commit()
conn.close()
async def _notify_user(self, user_id: str, message_id: int):
"""Trigger real-time notification"""
print(f"π Notification sent to user {user_id} for message {message_id}")
# Usage Example
async def inbox_pattern_example():
inbox_service = InboxService("inbox.db")
# Alice sends message to Bob
message_id = await inbox_service.deliver_message("alice", "bob", "Hello Bob!")
# Bob gets his unread messages
unread = await inbox_service.get_unread_messages("bob")
print(f"Bob has {len(unread)} unread messages")
# Bob marks messages as read
await inbox_service.mark_messages_read("bob", [message_id])sequenceDiagram
participant A as Alice (Sender)
participant DB as Alice's Database
participant O as Alice's Outbox
participant BP as Background Processor
participant B as Bob's Service
A->>DB: 1. Start transaction
A->>DB: 2. Execute business logic
A->>O: 3. Write to outbox (same transaction)
DB-->>A: 4. Commit transaction
loop Background Processing
BP->>O: 5. Poll outbox for pending messages
O->>BP: 6. Return pending messages
BP->>B: 7. Deliver message to Bob's service
B-->>BP: 8. Acknowledge delivery
BP->>O: 9. Mark message as completed
end
Note
Here, the critical piece is that the business logic and message to outbox (steps 2 & 3) are written in the same transaction.
import asyncio
import sqlite3
import json
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from enum import Enum
class OutboxStatus(Enum):
PENDING = "pending"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class OutboxMessage:
id: Optional[int]
event_type: str
payload: Dict
created_at: datetime
status: OutboxStatus
retry_count: int = 0
error_message: Optional[str] = None
class OutboxService:
def __init__(self, db_path: str):
self.db_path = db_path
self._init_db()
def _init_db(self):
conn = sqlite3.connect(self.db_path)
# Business data table
conn.execute('''
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sender_id TEXT NOT NULL,
recipient_id TEXT NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Outbox table
conn.execute('''
CREATE TABLE IF NOT EXISTS outbox (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_type TEXT NOT NULL,
payload TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status TEXT DEFAULT 'pending',
retry_count INTEGER DEFAULT 0,
error_message TEXT
)
''')
conn.commit()
conn.close()
async def send_message_with_outbox(self, sender_id: str, recipient_id: str, content: str):
"""Send message using outbox pattern for reliability"""
conn = sqlite3.connect(self.db_path)
try:
# Start transaction
conn.execute("BEGIN TRANSACTION")
# 1. Business logic - save message
cursor = conn.execute('''
INSERT INTO messages (sender_id, recipient_id, content)
VALUES (?, ?, ?)
''', (sender_id, recipient_id, content))
message_id = cursor.lastrowid
# 2. Write to outbox in same transaction
outbox_payload = {
"message_id": message_id,
"sender_id": sender_id,
"recipient_id": recipient_id,
"content": content
}
conn.execute('''
INSERT INTO outbox (event_type, payload, status)
VALUES (?, ?, ?)
''', ("message.send", json.dumps(outbox_payload), OutboxStatus.PENDING.value))
# 3. Commit transaction - both succeed or both fail
conn.commit()
print(f"β
Message {message_id} saved with outbox entry")
except Exception as e:
conn.rollback()
print(f"β Transaction failed: {e}")
raise
finally:
conn.close()
async def get_pending_messages(self) -> List[OutboxMessage]:
"""Get pending messages from outbox"""
conn = sqlite3.connect(self.db_path)
cursor = conn.execute('''
SELECT id, event_type, payload, created_at, status, retry_count, error_message
FROM outbox
WHERE status = ?
ORDER BY created_at ASC
''', (OutboxStatus.PENDING.value,))
messages = []
for row in cursor.fetchall():
messages.append(OutboxMessage(
id=row[0],
event_type=row[1],
payload=json.loads(row[2]),
created_at=datetime.fromisoformat(row[3]),
status=OutboxStatus(row[4]),
retry_count=row[5],
error_message=row[6]
))
conn.close()
return messages
async def mark_completed(self, message_id: int):
"""Mark outbox message as completed"""
conn = sqlite3.connect(self.db_path)
conn.execute('''
UPDATE outbox
SET status = ?, error_message = NULL
WHERE id = ?
''', (OutboxStatus.COMPLETED.value, message_id))
conn.commit()
conn.close()
async def mark_failed(self, message_id: int, error: str):
"""Mark outbox message as failed"""
conn = sqlite3.connect(self.db_path)
conn.execute('''
UPDATE outbox
SET status = ?, retry_count = retry_count + 1, error_message = ?
WHERE id = ?
''', (OutboxStatus.FAILED.value, error, message_id))
conn.commit()
conn.close()
class OutboxProcessor:
def __init__(self, outbox_service: OutboxService):
self.outbox_service = outbox_service
async def process_outbox(self):
"""Background process to handle outbox messages"""
pending_messages = await self.outbox_service.get_pending_messages()
for message in pending_messages:
try:
# Simulate message delivery
await self._deliver_message(message.payload)
await self.outbox_service.mark_completed(message.id)
print(f"β
Message {message.id} delivered successfully")
except Exception as e:
await self.outbox_service.mark_failed(message.id, str(e))
print(f"β Failed to deliver message {message.id}: {e}")
async def _deliver_message(self, payload: Dict):
"""Simulate message delivery to external service"""
# Simulate network call
await asyncio.sleep(0.1)
# Simulate occasional failures
import random
if random.random() < 0.1: # 10% failure rate
raise Exception("Network timeout")
print(f"π€ Delivered message to {payload['recipient_id']}: {payload['content']}")
# Usage Example
async def outbox_pattern_example():
outbox_service = OutboxService("outbox.db")
processor = OutboxProcessor(outbox_service)
# Alice sends message (stored in outbox)
await outbox_service.send_message_with_outbox("alice", "bob", "Hello from outbox!")
# Background processor handles delivery
await processor.process_outbox()sequenceDiagram
participant A as Alice (Sender)
participant ADB as Alice's DB + Outbox
participant BP as Background Processor
participant BInbox as Bob's Inbox
participant B as Bob (Recipient)
participant Push as Push Service
A->>ADB: 1. Business logic + Outbox (transaction)
ADB-->>A: 2. Acknowledge local save
loop Reliable Delivery
BP->>ADB: 3. Poll Alice's outbox
ADB->>BP: 4. Return pending messages
BP->>BInbox: 5. Deliver to Bob's inbox
BInbox-->>BP: 6. Acknowledge inbox delivery
BP->>ADB: 7. Mark outbox as completed
end
BInbox->>Push: 8. Trigger push notification
Push->>B: 9. Notify Bob of new message
B->>BInbox: 10. Fetch messages from inbox
BInbox->>B: 11. Return messages
B->>BInbox: 12. Mark messages as read
- β Messages never lost due to server crashes
- β Exactly-once sending (no duplicate messages)
- β Audit trail of all sent messages
- β Can retry failed deliveries
- β Messages available when user comes online
- β Multi-device synchronization
- β Message history and search
- β Push notification triggers
- β Unread message counts
1. Alice types message β Outbox (local transaction)
2. Background process β Reads outbox β Delivers to Bob's inbox
3. Bob's inbox updated β WebSocket notification sent
4. Bob's device receives notification β Fetches from inbox
5. Acknowledgments flow back through the system
Bob's Inbox Service:
β (when new message arrives)
APNS/FCM Service β Reads from inbox events
β (push notification)
Bob's Device β Gets notification
β (app opens)
Bob's Inbox β App fetches actual messages
Use both patterns together for mission-critical messaging:
- Outbox = "I guarantee to send this message exactly once"
- Inbox = "I guarantee you can receive this message when ready"
- Together = End-to-end reliable messaging with exactly-once semantics
For chat applications specifically, this combination ensures:
- No messages are lost
- Users can access message history offline
- Real-time delivery works reliably
- System can handle network failures gracefully
- Messages are delivered exactly once
- Complete audit trail exists