The Banking API requires a robust architecture to handle millions of transactions while ensuring consistency, low latency, and fault tolerance.
flowchart TD
Client[Client Applications] --> LB[Load Balancer]
LB --> API[FastAPI Application]
API --> Redis[Redis Cache]
API --> DB[(PostgreSQL Database)]
API --> Kafka[Apache Kafka]
Kafka --> WS[Worker Services]
WS --> DB
API --> Logger[Logging Service]
API <--> gRPC[gRPC Services]
gRPC <--> Kafka
subgraph Monitoring
Prometheus[Prometheus]
Grafana[Grafana Dashboard]
end
API --> Prometheus
Prometheus --> Grafana
- FastAPI Application: Core API handling transaction requests
- PostgreSQL: Primary database with ACID compliance
- Redis: For caching and distributed locking
- Apache Kafka: High-throughput distributed event streaming platform
- gRPC Services: High-performance RPC framework for service communication
- PyArrow: Efficient data serialization between services
- Load Balancer: Distributes traffic across API instances
- Monitoring: Prometheus and Grafana for real-time metrics
- Logging: Centralized logging for transaction audit trails
erDiagram
ACCOUNTS {
uuid account_id PK
string account_number UK
string customer_id FK
string account_type
timestamp created_at
timestamp updated_at
boolean is_active
}
BALANCES {
uuid balance_id PK
uuid account_id FK
decimal current_balance
decimal available_balance
timestamp last_updated
int64 version "For optimistic locking"
}
TRANSACTIONS {
uuid transaction_id PK
uuid account_id FK
string transaction_type
decimal amount
string currency
string description
string reference_id
timestamp created_at
string status
uuid related_transaction_id FK "For transfers"
}
TRANSACTION_LOGS {
uuid log_id PK
uuid transaction_id FK
string event_type
jsonb metadata
timestamp created_at
string created_by
}
USERS {
uuid user_id PK
string username UK
string password_hash
string email UK
string role
boolean is_active
timestamp created_at
timestamp last_login
}
ACCOUNTS ||--o{ BALANCES : has
ACCOUNTS ||--o{ TRANSACTIONS : has
TRANSACTIONS ||--o{ TRANSACTION_LOGS : logs
USERS ||--o{ ACCOUNTS : owns
GET /api/v1/accounts/{account_id}
- Get account detailsGET /api/v1/accounts/{account_id}/balance
- Get account balanceGET /api/v1/accounts/{account_id}/transactions
- Get transaction history
POST /api/v1/transactions/debit
- Debit an accountPOST /api/v1/transactions/credit
- Credit an accountPOST /api/v1/transactions/transfer
- Transfer between accountsGET /api/v1/transactions/{transaction_id}
- Get transaction details
syntax = "proto3";
package banking;
service TransactionService {
rpc DebitAccount(DebitRequest) returns (TransactionResponse);
rpc CreditAccount(CreditRequest) returns (TransactionResponse);
rpc TransferBetweenAccounts(TransferRequest) returns (TransactionResponse);
rpc GetTransaction(GetTransactionRequest) returns (Transaction);
}
message DebitRequest {
string account_id = 1;
double amount = 2;
string currency = 3;
string description = 4;
string reference_id = 5;
}
message CreditRequest {
string account_id = 1;
double amount = 2;
string currency = 3;
string description = 4;
string reference_id = 5;
}
message TransferRequest {
string source_account_id = 1;
string destination_account_id = 2;
double amount = 3;
string currency = 4;
string description = 5;
string reference_id = 6;
}
message TransactionResponse {
string transaction_id = 1;
string status = 2;
string timestamp = 3;
string account_id = 4;
double amount = 5;
string currency = 6;
double available_balance = 7;
}
message GetTransactionRequest {
string transaction_id = 1;
}
message Transaction {
string transaction_id = 1;
string account_id = 2;
string transaction_type = 3;
double amount = 4;
string currency = 5;
string description = 6;
string reference_id = 7;
string created_at = 8;
string status = 9;
string related_transaction_id = 10;
}
import grpc
import banking_pb2
import banking_pb2_grpc
from concurrent import futures
from decimal import Decimal
from app.services.transaction import debit_account, credit_account, transfer_between_accounts
class TransactionServicer(banking_pb2_grpc.TransactionServiceServicer):
def __init__(self, db_pool):
self.db_pool = db_pool
async def DebitAccount(self, request, context):
async with self.db_pool.acquire() as db:
result = await debit_account(
account_id=request.account_id,
amount=Decimal(str(request.amount)),
currency=request.currency,
description=request.description,
reference_id=request.reference_id,
db=db
)
return banking_pb2.TransactionResponse(
transaction_id=str(result.transaction_id),
status=result.status,
timestamp=result.created_at.isoformat(),
account_id=str(result.account_id),
amount=float(result.amount),
currency=result.currency,
available_balance=float(result.available_balance)
)
# Similar implementations for other methods
async def serve():
server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))
banking_pb2_grpc.add_TransactionServiceServicer_to_server(
TransactionServicer(db_pool), server
)
server.add_insecure_port('[::]:50051')
await server.start()
await server.wait_for_termination()
banking.transactions.submitted - New transaction requests
banking.transactions.completed - Successfully processed transactions
banking.transactions.failed - Failed transactions
banking.accounts.balances - Account balance updates
banking.fraud.alerts - Fraud detection alerts
banking.notifications - User notifications
from kafka import KafkaProducer
import json
import pyarrow as pa
import pyarrow.parquet as pq
import io
from uuid import UUID
from decimal import Decimal
class TransactionProducer:
def __init__(self, bootstrap_servers):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: self._serialize_transaction(v)
)
# Define schema for transaction records
self.transaction_schema = pa.schema([
('transaction_id', pa.string()),
('account_id', pa.string()),
('transaction_type', pa.string()),
('amount', pa.decimal128(precision=18, scale=2)),
('currency', pa.string()),
('description', pa.string()),
('reference_id', pa.string()),
('created_at', pa.timestamp('us')),
('status', pa.string())
])
def _serialize_transaction(self, transaction_data):
"""Serialize transaction using PyArrow for efficient binary representation"""
# Convert UUID and Decimal to appropriate types
serializable_data = {
**transaction_data,
'transaction_id': str(transaction_data['transaction_id']),
'account_id': str(transaction_data['account_id']),
'amount': transaction_data['amount']
}
# Create PyArrow record batch
batch = pa.RecordBatch.from_pylist([serializable_data], schema=self.transaction_schema)
# Write to in-memory buffer as Parquet format
buf = io.BytesIO()
pq.write_table(pa.Table.from_batches([batch]), buf)
return buf.getvalue()
async def send_transaction(self, transaction_data, transaction_type):
"""Send transaction data to appropriate Kafka topic"""
topic = f"banking.transactions.{transaction_type}"
# Async send to Kafka topic
future = self.producer.send(
topic=topic,
value=transaction_data
)
# Wait for send to complete
try:
record_metadata = future.get(timeout=10)
return record_metadata
except Exception as e:
print(f"Failed to send transaction to Kafka: {e}")
raise
from kafka import KafkaConsumer
import pyarrow as pa
import pyarrow.parquet as pq
import io
from decimal import Decimal
from datetime import datetime
import threading
class TransactionConsumer:
def __init__(self, bootstrap_servers, db_pool, transaction_service):
self.consumer = KafkaConsumer(
'banking.transactions.submitted',
bootstrap_servers=bootstrap_servers,
group_id='transaction-processor',
auto_offset_reset='earliest',
enable_auto_commit=False
)
self.db_pool = db_pool
self.transaction_service = transaction_service
def _deserialize_transaction(self, message_value):
"""Deserialize transaction data from PyArrow/Parquet format"""
buf = io.BytesIO(message_value)
table = pq.read_table(buf)
# Convert to Python dict
record = table.to_pylist()[0]
# Convert string representations back to appropriate types
record['transaction_id'] = UUID(record['transaction_id'])
record['account_id'] = UUID(record['account_id'])
record['amount'] = Decimal(str(record['amount']))
return record
async def process_transactions(self):
"""Process transactions from Kafka topic"""
for message in self.consumer:
try:
# Deserialize the transaction data
transaction_data = self._deserialize_transaction(message.value)
# Process based on transaction type
if transaction_data['transaction_type'] == 'debit':
result = await self.transaction_service.process_debit(transaction_data)
elif transaction_data['transaction_type'] == 'credit':
result = await self.transaction_service.process_credit(transaction_data)
elif transaction_data['transaction_type'] == 'transfer':
result = await self.transaction_service.process_transfer(transaction_data)
# Publish result to completed topic
await self.publish_result(result, 'completed')
# Commit offset after successful processing
self.consumer.commit()
except Exception as e:
# Log error and publish to failed topic
print(f"Error processing transaction: {e}")
transaction_data['error'] = str(e)
transaction_data['status'] = 'failed'
await self.publish_result(transaction_data, 'failed')
def start(self):
"""Start the consumer in a background thread"""
thread = threading.Thread(target=self.process_transactions, daemon=True)
thread.start()
async def debit_account(account_id: UUID, amount: Decimal, db: AsyncSession, kafka_producer):
# First, get the balance with current version
balance = await db.get(Balance, account_id)
if balance.available_balance < amount:
raise InsufficientFundsError()
# Update balance with version check
result = await db.execute(
update(Balance)
.where(Balance.account_id == account_id, Balance.version == balance.version)
.values(
current_balance=Balance.current_balance - amount,
available_balance=Balance.available_balance - amount,
version=Balance.version + 1,
last_updated=datetime.utcnow()
)
.returning(Balance)
)
updated_balance = result.scalar_one_or_none()
if not updated_balance:
# Version mismatch, publish conflict event to Kafka
await kafka_producer.send_transaction(
{
'transaction_id': uuid.uuid4(),
'account_id': account_id,
'event_type': 'concurrency_conflict',
'timestamp': datetime.utcnow().isoformat()
},
'failed'
)
raise ConcurrencyError("Balance was updated by another transaction")
# Publish balance update event to Kafka
await kafka_producer.send_transaction(
{
'account_id': account_id,
'balance': float(updated_balance.current_balance),
'available_balance': float(updated_balance.available_balance),
'version': updated_balance.version,
'timestamp': updated_balance.last_updated.isoformat()
},
'balances'
)
return updated_balance
async def process_transaction(transaction_data: dict, redis: Redis, db: AsyncSession, kafka_producer):
account_id = transaction_data["account_id"]
lock_name = f"account_lock:{account_id}"
# Acquire distributed lock
lock = redis.lock(lock_name, timeout=5, blocking_timeout=10)
try:
if lock.acquire():
# Execute transaction in database transaction
async with db.begin():
# Process transaction logic here
result = await perform_transaction(transaction_data, db)
# Publish result to Kafka
await kafka_producer.send_transaction(result, 'completed')
return result
else:
# If lock acquisition fails, publish to Kafka for retry
transaction_data['status'] = 'retry'
transaction_data['retry_timestamp'] = datetime.utcnow().isoformat()
await kafka_producer.send_transaction(transaction_data, 'submitted')
raise ResourceLockError("Could not acquire lock for account")
finally:
# Make sure to release the lock
try:
lock.release()
except:
pass
flowchart TD
API[FastAPI Service] --> Command[Command Validation]
Command --> Kafka[Kafka Transaction Stream]
Kafka --> Process[Transaction Processor]
Process --> DB[(Database Write)]
Process --> Events[Kafka Event Stream]
Events --> Projection[Balance Projection]
Events --> Notification[Notification Service]
Events --> Analytics[Analytics Service]
from datetime import datetime
import uuid
async def transfer_funds(source_id, dest_id, amount, description, kafka_producer, db):
# Generate transaction IDs
transfer_id = uuid.uuid4()
debit_id = uuid.uuid4()
credit_id = uuid.uuid4()
# Phase 1: Prepare by publishing intent to Kafka
prepare_event = {
'transfer_id': str(transfer_id),
'source_account_id': str(source_id),
'destination_account_id': str(dest_id),
'amount': float(amount),
'debit_id': str(debit_id),
'credit_id': str(credit_id),
'status': 'preparing',
'timestamp': datetime.utcnow().isoformat()
}
await kafka_producer.send_transaction(prepare_event, 'transfers.preparing')
try:
# Execute debit
debit_result = await debit_account(
account_id=source_id,
amount=amount,
transaction_id=debit_id,
related_transaction_id=transfer_id,
db=db,
kafka_producer=kafka_producer
)
# Execute credit
credit_result = await credit_account(
account_id=dest_id,
amount=amount,
transaction_id=credit_id,
related_transaction_id=transfer_id,
db=db,
kafka_producer=kafka_producer
)
# Phase 2: Commit by publishing completion to Kafka
complete_event = {
'transfer_id': str(transfer_id),
'source_account_id': str(source_id),
'destination_account_id': str(dest_id),
'amount': float(amount),
'debit_id': str(debit_id),
'credit_id': str(credit_id),
'status': 'completed',
'timestamp': datetime.utcnow().isoformat()
}
await kafka_producer.send_transaction(complete_event, 'transfers.completed')
return {
'transfer_id': transfer_id,
'status': 'completed',
'debit_transaction': debit_result,
'credit_transaction': credit_result
}
except Exception as e:
# If any step fails, publish compensation event to Kafka
compensation_event = {
'transfer_id': str(transfer_id),
'source_account_id': str(source_id),
'destination_account_id': str(dest_id),
'amount': float(amount),
'debit_id': str(debit_id),
'credit_id': str(credit_id),
'status': 'failed',
'error': str(e),
'timestamp': datetime.utcnow().isoformat()
}
await kafka_producer.send_transaction(compensation_event, 'transfers.failed')
# Compensating transactions will be handled by a separate consumer
raise TransferFailedError(f"Transfer failed: {str(e)}")
import pyarrow as pa
import pyarrow.parquet as pq
import io
from decimal import Decimal
from datetime import datetime
class ArrowSerializer:
"""Efficient serialization using PyArrow for high-performance data interchange"""
def __init__(self):
# Define schema for different message types
self.transaction_schema = pa.schema([
('transaction_id', pa.string()),
('account_id', pa.string()),
('transaction_type', pa.string()),
('amount', pa.decimal128(precision=18, scale=2)),
('currency', pa.string()),
('description', pa.string()),
('reference_id', pa.string()),
('timestamp', pa.timestamp('us')),
('status', pa.string())
])
self.balance_schema = pa.schema([
('account_id', pa.string()),
('current_balance', pa.decimal128(precision=18, scale=2)),
('available_balance', pa.decimal128(precision=18, scale=2)),
('version', pa.int32()),
('timestamp', pa.timestamp('us'))
])
def serialize(self, data, schema_type='transaction'):
"""Serialize data to binary format using PyArrow"""
if schema_type == 'transaction':
schema = self.transaction_schema
elif schema_type == 'balance':
schema = self.balance_schema
else:
raise ValueError(f"Unknown schema type: {schema_type}")
# Create a record batch
batch = pa.RecordBatch.from_pylist([data], schema=schema)
# Write to in-memory buffer
buf = io.BytesIO()
writer = pa.ipc.new_stream(buf, schema)
writer.write_batch(batch)
writer.close()
return buf.getvalue()
def deserialize(self, binary_data, schema_type='transaction'):
"""Deserialize binary data using PyArrow"""
buf = io.BytesIO(binary_data)
reader = pa.ipc.open_stream(buf)
# Read all batches and concatenate
batches = [batch for batch in reader]
table = pa.Table.from_batches(batches)
# Convert to Python dictionary
records = table.to_pylist()
# Handle decimal and timestamp conversions
for record in records:
for key, value in record.items():
if isinstance(value, pa.lib.Decimal128Value):
record[key] = Decimal(str(value))
elif isinstance(value, datetime):
record[key] = value.isoformat()
return records[0] if records else None
-- Account lookups by account number
CREATE INDEX idx_accounts_account_number ON accounts(account_number);
-- Fast balance lookups
CREATE INDEX idx_balances_account_id ON balances(account_id);
-- Transaction history queries
CREATE INDEX idx_transactions_account_id ON transactions(account_id);
CREATE INDEX idx_transactions_created_at ON transactions(created_at);
-- Composite index for account-based time-range queries
CREATE INDEX idx_transactions_account_time ON transactions(account_id, created_at);
-- Partial index for active transactions (status = 'pending')
CREATE INDEX idx_transactions_pending ON transactions(account_id, created_at)
WHERE status = 'pending';
async def get_account_balance(account_id: UUID, redis: Redis, db: AsyncSession):
# Try to get from cache first
cache_key = f"balance:{account_id}"
cached_balance = await redis.get(cache_key)
if cached_balance:
return json.loads(cached_balance)
# If not in cache, get from database
balance = await db.get(Balance, account_id)
# Store in cache with expiration (short TTL for balances)
balance_dict = balance.to_dict()
await redis.set(cache_key, json.dumps(balance_dict), ex=60) # 60 seconds
return balance_dict
# Kafka consumer for balance updates to invalidate cache
class BalanceCacheInvalidator:
def __init__(self, bootstrap_servers, redis_client):
self.consumer = KafkaConsumer(
'banking.accounts.balances',
bootstrap_servers=bootstrap_servers,
group_id='cache-invalidator',
auto_offset_reset='latest'
)
self.redis = redis_client
async def start_invalidation(self):
for message in self.consumer:
try:
balance_event = json.loads(message.value.decode('utf-8'))
account_id = balance_event.get('account_id')
if account_id:
# Invalidate cache for this account
cache_key = f"balance:{account_id}"
await self.redis.delete(cache_key)
except Exception as e:
print(f"Error processing balance update: {e}")
class DeadLetterQueueHandler:
def __init__(self, bootstrap_servers):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
self.consumer = KafkaConsumer(
'banking.transactions.failed',
bootstrap_servers=bootstrap_servers,
group_id='dlq-handler',
auto_offset_reset='earliest'
)
async def send_to_dlq(self, failed_message, error, topic='banking.transactions.dlq'):
"""Send a failed message to the dead letter queue"""
dlq_message = {
'original_message': failed_message,
'error': str(error),
'timestamp': datetime.utcnow().isoformat(),
'retry_count': failed_message.get('retry_count', 0) + 1
}
self.producer.send(topic, dlq_message)
async def process_failed_transactions(self, retry_handler):
"""Process messages from the failed transactions topic"""
for message in self.consumer:
try:
failed_transaction = json.loads(message.value.decode('utf-8'))
# Check if we should retry
retry_count = failed_transaction.get('retry_count', 0)
if retry_count < 3: # Max retry attempts
# Update retry count and send back to main topic
failed_transaction['retry_count'] = retry_count + 1
failed_transaction['last_retry'] = datetime.utcnow().isoformat()
# Send to retry handler
await retry_handler(failed_transaction)
else:
# Max retries exceeded, send to permanent DLQ
await self.send_to_dlq(
failed_transaction,
"Max retry attempts exceeded",
'banking.transactions.permanent.dlq'
)
# Also notify monitoring system
await self.notify_monitoring(failed_transaction)
except Exception as e:
print(f"Error processing failed transaction: {e}")
async def notify_monitoring(self, failed_transaction):
"""Send notification to monitoring system about permanent failure"""
# Implementation depends on monitoring system
pass
from fastapi import Request, status
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
import uuid
# Transaction ID and trace ID middleware
@app.middleware("http")
async def add_transaction_tracing(request: Request, call_next):
# Generate transaction and trace IDs for tracking
transaction_id = str(uuid.uuid4())
trace_id = str(uuid.uuid4())
# Add to request state
request.state.transaction_id = transaction_id
request.state.trace_id = trace_id
# Add to headers
response = await call_next(request)
response.headers["X-Transaction-ID"] = transaction_id
response.headers["X-Trace-ID"] = trace_id
return response
# Custom exception handler with Kafka logging
async def insufficient_funds_exception_handler(
request: Request,
exc: InsufficientFundsError,
kafka_producer
):
error_data = {
"error": {
"code": "INSUFFICIENT_FUNDS",
"message": str(exc),
"transaction_id": exc.transaction_id,
"trace_id": request.state.trace_id,
"timestamp": datetime.utcnow().isoformat(),
"request_id": request.state.request_id
}
}
# Log error to Kafka
await kafka_producer.send_transaction(
{
"transaction_id": exc.transaction_id,
"trace_id": request.state.trace_id,
"error_type": "INSUFFICIENT_FUNDS",
"error_message": str(exc),
"timestamp": datetime.utcnow().isoformat()
},
'errors'
)
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content=error_data
)
version: '3.8'
services:
api:
build: ./api
ports:
- "8000:8000"
depends_on:
- db
- redis
- kafka
- schema-registry
environment:
- DATABASE_URL=postgresql+asyncpg://user:password@db:5432/banking
- REDIS_URL=redis://redis:6379/0
- KAFKA_BOOTSTRAP_SERVERS=kafka:9092
- SCHEMA_REGISTRY_URL=http://schema-registry:8081
volumes:
- ./api:/app
restart: always
worker:
build: ./worker
depends_on:
- kafka
- db
environment:
- DATABASE_URL=postgresql+asyncpg://user:password@db:5432/banking
- KAFKA_BOOTSTRAP_SERVERS=kafka:9092
- SCHEMA_REGISTRY_URL=http://schema-registry:8081
volumes:
- ./worker:/app
restart: always
grpc-service:
build: ./grpc
ports:
- "50051:50051"
depends_on:
- db
- kafka
environment:
- DATABASE_URL=postgresql+asyncpg://user:password@db:5432/banking
- KAFKA_BOOTSTRAP_SERVERS=kafka:9092
volumes:
- ./grpc:/app
restart: always
db:
image: postgres:15
volumes:
- postgres-data:/var/lib/postgresql/data
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
- POSTGRES_DB=banking
ports:
- "5432:5432"
redis:
image: redis:7
ports:
- "6379:6379"
volumes:
- redis-data:/data
zookeeper:
image: confluentinc/cp-zookeeper:7.3.2
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.3.2
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
schema-registry:
image: confluentinc/cp-schema-registry:7.3.2
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
kafka-ui:
image: provectuslabs/kafka-ui:latest
depends_on:
- kafka
- schema-registry
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
prometheus:
image: prom/prometheus
volumes:
- ./prometheus:/etc/prometheus
- prometheus-data:/prometheus
ports:
- "9090:9090"
grafana:
image: grafana/grafana
volumes:
- grafana-data:/var/lib/grafana
ports:
- "3000:3000"
depends_on:
- prometheus
volumes:
postgres-data:
redis-data:
prometheus-data:
grafana-data:
# kafka-statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
spec:
serviceName: "kafka"
replicas: 3
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
containers:
- name: kafka
image: confluentinc/cp-kafka:7.3.2
ports:
- containerPort: 9092
name: kafka
env:
- name: KAFKA_BROKER_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: KAFKA_ZOOKEEPER_CONNECT
value: "zookeeper:2181"
- name: KAFKA_ADVERTISED_LISTENERS
value: "PLAINTEXT://$(POD_NAME).kafka:9092"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "3"
volumeMounts:
- name: kafka-data
mountPath: /var/lib/kafka/data
volumeClaimTemplates:
- metadata:
name: kafka-data
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 10Gi
# grpc-load-test.yaml
concurrency: 50
total: 10000
call: banking.TransactionService.DebitAccount
host: localhost:50051
data:
account_id: "123e4567-e89b-12d3-a456-426614174000"
amount: 10.00
currency: "USD"
description: "Load test transaction"
reference_id: "LOAD-TEST-{{.RequestNumber}}"
import pytest
from testcontainers.kafka import KafkaContainer
from testcontainers.postgres import PostgresContainer
from kafka import KafkaProducer, KafkaConsumer
import json
import asyncio
from app.main import app
from httpx import AsyncClient
@pytest.fixture(scope="session")
def kafka_container():
with KafkaContainer() as kafka:
yield kafka
@pytest.fixture(scope="session")
def postgres_container():
with PostgresContainer() as postgres:
yield postgres
@pytest.fixture
async def kafka_producer(kafka_container):
producer = KafkaProducer(
bootstrap_servers=kafka_container.get_bootstrap_server(),
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
yield producer
producer.close()
@pytest.fixture
async def kafka_consumer(kafka_container):
consumer = KafkaConsumer(
'banking.transactions.completed',
bootstrap_servers=kafka_container.get_bootstrap_server(),
auto_offset_reset='earliest',
group_id='test-group',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
yield consumer
consumer.close()
@pytest.mark.asyncio
async def test_transaction_flow(kafka_producer, kafka_consumer, postgres_container):
# Configure app to use test containers
app.dependency_overrides[get_db_pool] = lambda: postgres_container.get_connection_url()
app.dependency_overrides[get_kafka_producer] = lambda: kafka_producer
# Create test account
async with AsyncClient(app=app, base_url="http://test") as client:
# Create account
account_response = await client.post("/api/v1/accounts", json={
"customer_id": "test-customer",
"account_type": "checking"
})
assert account_response.status_code == 201
account_id = account_response.json()["account_id"]
# Initial credit to account
await client.post("/api/v1/transactions/credit", json={
"account_id": account_id,
"amount": 1000.00,
"currency": "USD",
"description": "Initial credit",
"reference_id": "TEST-INIT-CREDIT"
})
# Perform debit transaction
debit_response = await client.post("/api/v1/transactions/debit", json={
"account_id": account_id,
"amount": 100.00,
"currency": "USD",
"description": "Test debit",
"reference_id": "TEST-DEBIT-1"
})
assert debit_response.status_code == 200
debit_result = debit_response.json()
# Check for message in Kafka
message = next(kafka_consumer)
assert message.value["transaction_id"] == debit_result["transaction_id"]
assert message.value["status"] == "completed"
# Verify balance
balance_response = await client.get(f"/api/v1/accounts/{account_id}/balance")
assert balance_response.status_code == 200
assert balance_response.json()["available_balance"] == 900.0
- Set up FastAPI project structure
- Implement database models and migrations
- Create basic account endpoints
- Set up Kafka infrastructure and topics
- Implement PyArrow serialization
- Set up Docker development environment
- Implement Kafka producers and consumers
- Develop transaction event sourcing pattern
- Set up Redis caching with Kafka invalidation
- Implement optimistic locking with versioning
- Configure transaction consistency mechanisms
- Define gRPC service definitions
- Implement gRPC services for critical operations
- Set up client-server communication
- Integrate with Kafka for event propagation
- Set up dead letter queues and retry mechanisms
- Implement Prometheus metrics for Kafka and gRPC
- Configure Grafana dashboards
- Set up centralized logging with correlation IDs
- Implement unit and integration tests
- Set up CI/CD pipeline with test containers
- Deploy to Kubernetes environment
- Load testing and performance optimization
from fastapi import Depends, HTTPException, Request
import time
import redis
import asyncio
class RateLimiter:
def __init__(self, redis_client: redis.Redis, kafka_producer=None):
self.redis_client = redis_client
self.kafka_producer = kafka_producer
self.rate_limit = 5 # requests
self.time_window = 1 # second
async def __call__(self, request: Request):
client_ip = request.client.host
key = f"rate_limit:{client_ip}"
current = self.redis_client.get(key)
if current is None:
self.redis_client.set(key, 1, ex=self.time_window)
# Log rate limit event to Kafka for analytics
if self.kafka_producer:
await self.kafka_producer.send_transaction(
{
'client_ip': client_ip,
'endpoint': request.url.path,
'count': 1,
'timestamp': time.time(),
'limit': self.rate_limit
},
'rate_limits'
)
return
if int(current) >= self.rate_limit:
# Log rate limit exceeded event
if self.kafka_producer:
await self.kafka_producer.send_transaction(
{
'client_ip': client_ip,
'endpoint': request.url.path,
'exceeded': True,
'timestamp': time.time(),
'limit': self.rate_limit
},
'rate_limits.exceeded'
)
raise HTTPException(
status_code=429,
detail="Rate limit exceeded. Try again later."
)
self.redis_client.incr(key)
# Log rate limit event
if self.kafka_producer:
await self.kafka_producer.send_transaction(
{
'client_ip': client_ip,
'endpoint': request.url.path,
'count': int(current) + 1,
'timestamp': time.time(),
'limit': self.rate_limit
},
'rate_limits'
)
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka import Producer, Consumer
import json
import time
from typing import Dict, Any, List
import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
class FraudDetectionService:
def __init__(self, bootstrap_servers: str):
"""Initialize fraud detection with Kafka streams"""
self.bootstrap_servers = bootstrap_servers
self.producer = Producer({'bootstrap.servers': bootstrap_servers})
# Create topics if they don't exist
self._create_topics()
# Train initial model
self.model = self._train_initial_model()
# Transaction history for user/account
self.transaction_history: Dict[str, List[Dict[str, Any]]] = {}
def _create_topics(self):
"""Create required Kafka topics"""
admin_client = AdminClient({'bootstrap.servers': self.bootstrap_servers})
topics = [
NewTopic('banking.transactions.submitted', num_partitions=5, replication_factor=3),
NewTopic('banking.transactions.completed', num_partitions=5, replication_factor=3),
NewTopic('banking.fraud.alerts', num_partitions=3, replication_factor=3)
]
admin_client.create_topics(topics)
def _train_initial_model(self):
"""Train initial fraud detection model"""
# In production, this would load historical transaction data
# For this example, we're creating a simple isolation forest model
model = IsolationForest(random_state=42, contamination=0.01)
# Train on sample data or return pre-trained model
return model
def _extract_features(self, transaction: Dict[str, Any]) -> np.ndarray:
"""Extract features from transaction for fraud detection"""
# Features might include:
# - Transaction amount
# - Time since last transaction
# - Difference from average transaction amount
# - Geographic location features
# - Device information
# Simplified example:
features = [
float(transaction['amount']),
float(transaction.get('time_since_last', 0)),
float(transaction.get('deviation_from_avg', 0))
]
return np.array(features).reshape(1, -1)
def _enrich_transaction(self, transaction: Dict[str, Any]) -> Dict[str, Any]:
"""Add derived features to transaction data"""
account_id = transaction['account_id']
# Get account history or initialize empty list
history = self.transaction_history.get(account_id, [])
# Calculate time since last transaction
if history:
last_transaction_time = float(history[-1]['timestamp'])
time_since_last = time.time() - last_transaction_time
else:
time_since_last = 0
# Calculate average amount and deviation
if history:
amounts = [float(tx['amount']) for tx in history]
avg_amount = sum(amounts) / len(amounts)
deviation_from_avg = float(transaction['amount']) - avg_amount
else:
avg_amount = float(transaction['amount'])
deviation_from_avg = 0
# Add enriched features
enriched = transaction.copy()
enriched['time_since_last'] = time_since_last
enriched['avg_amount'] = avg_amount
enriched['deviation_from_avg'] = deviation_from_avg
# Update history
if len(history) > 100: # Keep last 100 transactions
history.pop(0)
history.append({
'amount': transaction['amount'],
'timestamp': time.time(),
'transaction_id': transaction['transaction_id']
})
self.transaction_history[account_id] = history
return enriched
def detect_fraud(self, transaction: Dict[str, Any]) -> Dict[str, Any]:
"""Detect if a transaction is fraudulent"""
# Enrich transaction with derived features
enriched = self._enrich_transaction(transaction)
# Extract features for model
features = self._extract_features(enriched)
# Predict anomaly score (-1 for anomalies, 1 for normal)
score = self.model.decision_function(features)[0]
# Convert to probability-like score (0 to 1, where 1 is most anomalous)
fraud_score = 1 - (score + 1) / 2
# Add fraud detection results
result = enriched.copy()
result['fraud_score'] = float(fraud_score)
result['is_fraudulent'] = fraud_score > 0.9 # Threshold for fraud alert
# If fraudulent, send alert to Kafka
if result['is_fraudulent']:
self._send_fraud_alert(result)
return result
def _send_fraud_alert(self, transaction: Dict[str, Any]):
"""Send fraud alert to Kafka topic"""
alert = {
'transaction_id': transaction['transaction_id'],
'account_id': transaction['account_id'],
'amount': transaction['amount'],
'fraud_score': transaction['fraud_score'],
'timestamp': time.time(),
'alert_id': f"fraud-{time.time()}"
}
self.producer.produce(
'banking.fraud.alerts',
key=transaction['account_id'],
value=json.dumps(alert)
)
self.producer.flush()
def start_processing(self):
"""Start processing transactions from Kafka for fraud detection"""
consumer = Consumer({
'bootstrap.servers': self.bootstrap_servers,
'group.id': 'fraud-detection-service',
'auto.offset.reset': 'latest'
})
consumer.subscribe(['banking.transactions.submitted'])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
try:
transaction = json.loads(msg.value().decode('utf-8'))
# Process transaction for fraud detection
result = self.detect_fraud(transaction)
# Forward the transaction with fraud score to appropriate topic
target_topic = 'banking.transactions.completed'
if result['is_fraudulent']:
target_topic = 'banking.transactions.held'
self.producer.produce(
target_topic,
key=transaction['account_id'],
value=json.dumps(result)
)
except Exception as e:
print(f"Error processing transaction: {e}")
finally:
consumer.close()
from grpc import StatusCode, RpcError
import grpc
import jwt
from datetime import datetime, timedelta
from typing import Dict, Any, Callable
# JWT Configuration
JWT_SECRET = "your-secret-key" # In production, use environment variable
JWT_ALGORITHM = "HS256"
JWT_EXPIRATION = 30 # minutes
def create_jwt_token(data: Dict[str, Any]) -> str:
"""Create a JWT token"""
expiration = datetime.utcnow() + timedelta(minutes=JWT_EXPIRATION)
payload = {
**data,
"exp": expiration
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
def verify_jwt_token(token: str) -> Dict[str, Any]:
"""Verify and decode JWT token"""
try:
return jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
except jwt.PyJWTError:
raise ValueError("Invalid token")
# gRPC Auth Interceptor
class AuthInterceptor(grpc.ServerInterceptor):
def __init__(self):
def abort(ignored_request, context):
context.abort(StatusCode.UNAUTHENTICATED, "Invalid or missing token")
self._abort_handler = grpc.unary_unary_rpc_method_handler(abort)
def intercept_service(self, continuation, handler_call_details):
# Extract metadata
metadata = dict(handler_call_details.invocation_metadata)
# Check for auth token in metadata
auth_token = metadata.get('authorization', '')
if not auth_token.startswith('Bearer '):
return self._abort_handler
token = auth_token[7:] # Remove 'Bearer ' prefix
try:
# Verify token
payload = verify_jwt_token(token)
# Add user info to context
context = handler_call_details.invocation_metadata
context = context + (('user_id', payload.get('user_id')),)
# Continue with the call
return continuation(handler_call_details._replace(invocation_metadata=context))
except ValueError:
return self._abort_handler
# gRPC Server with Auth
def create_grpc_server():
server = grpc.server(
ThreadPoolExecutor(max_workers=10),
interceptors=[AuthInterceptor()]
)
# Add servicers here
banking_pb2_grpc.add_TransactionServiceServicer_to_server(
TransactionServicer(), server
)
server.add_insecure_port('[::]:50051')
return server
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from starlette.middleware.sessions import SessionMiddleware
from starlette.middleware.httpsredirect import HTTPSRedirectMiddleware
app = FastAPI()
# HTTPS Redirect in production
app.add_middleware(HTTPSRedirectMiddleware)
# Trusted Host validation
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["api.yourbank.com", "www.yourbank.com"]
)
# CORS configuration
app.add_middleware(
CORSMiddleware,
allow_origins=["https://app.yourbank.com"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Compression for response payloads
app.add_middleware(GZipMiddleware, minimum_size=1000)
# Session middleware for web sessions
app.add_middleware(
SessionMiddleware,
secret_key="your-secret-key",
max_age=3600 # 1 hour
)
# Security headers middleware
@app.middleware("http")
async def add_security_headers(request, call_next):
response = await call_next(request)
# Add security headers
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
response.headers["Content-Security-Policy"] = "default-src 'self'"
return response
I'll create a comprehensive folder structure for the banking API project and outline a phase-wise implementation plan for all files.
banking-api/
├── api/ # FastAPI application
│ ├── main.py # Application entry point
│ ├── config.py # Configuration settings
│ ├── dependencies.py # Dependency injection
│ ├── middleware/ # API middleware components
│ ├── routes/ # API endpoints
│ ├── schemas/ # Pydantic schemas
│ ├── services/ # Business logic services
│ └── utils/ # Utility functions
├── common/ # Shared code and utilities
│ ├── kafka/ # Kafka utilities
│ ├── serializers/ # Data serialization (PyArrow)
│ └── errors/ # Error handling
├── db/ # Database layer
│ ├── models.py # SQLAlchemy models
│ ├── session.py # DB session management
│ └── migrations/ # Alembic migrations
├── grpc_service/ # gRPC implementation
│ ├── server.py # gRPC server
│ ├── interceptors/ # gRPC middleware
│ ├── protos/ # Protocol buffers
│ └── handlers/ # gRPC service handlers
├── worker/ # Kafka consumers and processors
│ ├── main.py # Worker entry point
│ ├── consumers/ # Kafka consumers
│ ├── producers/ # Kafka producers
│ └── processors/ # Business logic processors
├── tests/ # Test suite
│ ├── api/ # API tests
│ ├── grpc/ # gRPC tests
│ ├── worker/ # Worker tests
│ └── integration/ # End-to-end tests
├── infrastructure/ # Deployment configuration
│ ├── docker/ # Docker files
│ ├── kubernetes/ # K8s manifests
│ └── prometheus/ # Monitoring config
├── scripts/ # Utility scripts
└── requirements.txt # Project dependencies
Day 1-2: Project Initialization
requirements.txt
- Initial dependenciesapi/config.py
- Application configurationapi/main.py
- FastAPI app setupapi/dependencies.py
- Core dependencies
Day 3-5: Database Setup
db/models.py
- Create SQLAlchemy models for:- Accounts
- Balances
- Transactions
- Transaction Logs
- Users
db/session.py
- Database connection managementdb/migrations/
- Alembic setup and initial migration
Day 6-7: Core Schemas and Docker
api/schemas/account.py
- Account Pydantic schemasapi/schemas/transaction.py
- Transaction schemasapi/schemas/user.py
- User schemasinfrastructure/docker/Dockerfile.api
- API containerinfrastructure/docker-compose.yml
- Development environment
Day 1-2: Account Endpoints
api/routes/accounts.py
- Account API routesapi/services/account.py
- Account business logicapi/middleware/auth.py
- Authentication middleware
Day 3-5: Transaction Endpoints
api/routes/transactions.py
- Transaction API routesapi/services/transaction.py
- Basic transaction logicapi/utils/exceptions.py
- Custom exceptions
Day 6-7: Testing
tests/api/test_accounts.py
- Account API teststests/api/test_transactions.py
- Transaction API testsscripts/init_db.py
- Database initialization script
Day 1-2: Kafka Base Setup
common/kafka/producer.py
- Kafka producer basecommon/kafka/consumer.py
- Kafka consumer basescripts/create_topics.py
- Kafka topic creation
Day 3-5: Serialization and Worker Setup
common/serializers/arrow.py
- PyArrow serializationworker/main.py
- Worker service initializationworker/producers/transaction.py
- Transaction producerinfrastructure/docker/Dockerfile.worker
- Worker container- Update
docker-compose.yml
- Add Kafka services
Day 6-7: Transaction Processing
worker/consumers/transaction.py
- Transaction consumerworker/processors/transaction.py
- Transaction processing logictests/worker/test_processors.py
- Basic processor tests
Day 1-2: Redis Caching
api/services/cache.py
- Redis caching implementationworker/consumers/balance.py
- Balance update consumer
Day 3-5: Concurrency Management
- Update
api/services/transaction.py
- Add optimistic locking api/middleware/tracing.py
- Transaction tracingapi/middleware/rate_limiter.py
- Rate limiting with Redis
Day 6-7: Monitoring Setup
infrastructure/prometheus/prometheus.yml
- Prometheus configinfrastructure/prometheus/grafana-dashboards/banking-dashboard.json
- Grafana dashboardcommon/errors/handlers.py
- Error handlers with Kafka logging
Day 1-2: gRPC Protocol Definition
grpc_service/protos/banking.proto
- Service definitions- Generate Python stubs from proto file
Day 3-4: gRPC Server and Handler
grpc_service/server.py
- gRPC server implementationgrpc_service/handlers/transaction.py
- Transaction service handlergrpc_service/interceptors/auth.py
- Authentication interceptor
Day 5-7: Integration and Testing
grpc_service/client.py
- Internal gRPC client- Update
api/services/transaction.py
- Add gRPC client integration tests/grpc/test_transactions.py
- gRPC service testsinfrastructure/docker/Dockerfile.grpc
- gRPC service container- Update
docker-compose.yml
- Add gRPC service
Day 1-2: Error Handling
common/errors/exceptions.py
- Enhanced exceptions- Update
worker/consumers/transaction.py
- Add dead letter queue - Update
worker/processors/transaction.py
- Add retry mechanisms
Day 3-4: Distributed Transactions
- Update
api/services/transaction.py
- Two-phase commit with Kafka - Update
worker/processors/transaction.py
- Compensation handling - Update
common/kafka/producer.py
- Add transaction guarantees
Day 5-7: Monitoring and Alerting
worker/processors/fraud.py
- Basic fraud detection- Update Prometheus and Grafana configurations
- Add metrics collection to all services
- Implement centralized logging with correlation IDs
Day 1-3: Unit and Integration Tests
tests/conftest.py
- Shared test fixturestests/integration/test_kafka.py
- Kafka integration teststests/integration/test_transaction_flow.py
- End-to-end tests
Day 4-7: Load Testing and Optimization
scripts/load_test.py
- Load testing script- Performance optimization for critical paths
- Tune PostgreSQL, Kafka, and Redis configurations
- Implement connection pooling and caching improvements
Day 1-3: Kubernetes Manifests
infrastructure/kubernetes/api-deployment.yaml
- API deploymentinfrastructure/kubernetes/grpc-deployment.yaml
- gRPC deploymentinfrastructure/kubernetes/worker-deployment.yaml
- Worker deploymentinfrastructure/kubernetes/kafka-statefulset.yaml
- Kafka StatefulSetinfrastructure/kubernetes/postgres-statefulset.yaml
- Database StatefulSet
Day 4-7: CI/CD and Documentation
- Setup CI/CD pipeline
- Update
README.md
with complete documentation - Create runbooks for operations
- Final performance testing and tuning
from fastapi import FastAPI
from starlette.middleware.cors import CORSMiddleware
from starlette.middleware.trustedhost import TrustedHostMiddleware
from api.routes import accounts, transactions
from api.middleware import auth, tracing, rate_limiter, security
from api.utils.exceptions import register_exception_handlers
app = FastAPI(title="Banking API", version="1.0.0")
# Middleware
app.add_middleware(TrustedHostMiddleware, allowed_hosts=["*"])
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(tracing.TransactionTracingMiddleware)
app.add_middleware(rate_limiter.RateLimitMiddleware)
app.add_middleware(auth.JWTAuthMiddleware)
# Register routes
app.include_router(accounts.router, prefix="/api/v1/accounts", tags=["accounts"])
app.include_router(transactions.router, prefix="/api/v1/transactions", tags=["transactions"])
# Register exception handlers
register_exception_handlers(app)
@app.get("/health")
async def health_check():
return {"status": "healthy"}
from sqlalchemy import Column, String, Numeric, Boolean, DateTime, ForeignKey, Integer, Text, func
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
import uuid
from datetime import datetime
Base = declarative_base()
class Account(Base):
__tablename__ = "accounts"
account_id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
account_number = Column(String, unique=True, nullable=False)
customer_id = Column(String, nullable=False)
account_type = Column(String, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
is_active = Column(Boolean, default=True)
# Relationships
balances = relationship("Balance", back_populates="account")
transactions = relationship("Transaction", back_populates="account")
class Balance(Base):
__tablename__ = "balances"
balance_id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
account_id = Column(UUID(as_uuid=True), ForeignKey("accounts.account_id"), nullable=False)
current_balance = Column(Numeric(precision=18, scale=2), nullable=False, default=0)
available_balance = Column(Numeric(precision=18, scale=2), nullable=False, default=0)
last_updated = Column(DateTime, default=datetime.utcnow)
version = Column(Integer, default=1, nullable=False) # For optimistic locking
# Relationships
account = relationship("Account", back_populates="balances")
# Additional models for Transaction, TransactionLog, User
from decimal import Decimal
import logging
from datetime import datetime
import uuid
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy import update
from db.models import Transaction, Balance, TransactionLog
from common.kafka.producer import TransactionProducer
logger = logging.getLogger(__name__)
class TransactionProcessor:
def __init__(self, db_session, kafka_producer: TransactionProducer, redis_client=None):
self.db = db_session
self.kafka_producer = kafka_producer
self.redis_client = redis_client
async def process_debit(self, transaction_data):
"""Process a debit transaction with optimistic locking"""
account_id = transaction_data["account_id"]
amount = Decimal(str(transaction_data["amount"]))
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
# Get current balance with version
async with self.db.begin():
balance = await self.db.execute(
select(Balance).where(Balance.account_id == account_id)
)
balance = balance.scalar_one()
if balance.available_balance < amount:
await self._handle_insufficient_funds(transaction_data)
return
# Update balance with version check
result = await self.db.execute(
update(Balance)
.where(
Balance.account_id == account_id,
Balance.version == balance.version
)
.values(
current_balance=Balance.current_balance - amount,
available_balance=Balance.available_balance - amount,
version=Balance.version + 1,
last_updated=datetime.utcnow()
)
.returning(Balance)
)
updated_balance = result.scalar_one_or_none()
if not updated_balance:
# Version conflict, retry
retry_count += 1
continue
# Create transaction record
transaction = Transaction(
transaction_id=transaction_data.get("transaction_id", uuid.uuid4()),
account_id=account_id,
transaction_type="debit",
amount=amount,
currency=transaction_data.get("currency", "USD"),
description=transaction_data.get("description", ""),
reference_id=transaction_data.get("reference_id", ""),
status="completed",
created_at=datetime.utcnow()
)
self.db.add(transaction)
await self.db.flush()
# Log transaction
transaction_log = TransactionLog(
transaction_id=transaction.transaction_id,
event_type="transaction_completed",
metadata={"balance_after": str(updated_balance.current_balance)},
created_at=datetime.utcnow()
)
self.db.add(transaction_log)
# Invalidate cache if Redis is available
if self.redis_client:
await self.redis_client.delete(f"balance:{account_id}")
# Publish balance update event to Kafka
await self.kafka_producer.send_transaction(
{
"account_id": str(account_id),
"balance": str(updated_balance.current_balance),
"available_balance": str(updated_balance.available_balance),
"version": updated_balance.version,
"timestamp": updated_balance.last_updated.isoformat()
},
"balances"
)
# Return successful transaction
return {
"transaction_id": str(transaction.transaction_id),
"status": "completed",
"amount": str(amount),
"balance": str(updated_balance.current_balance)
}
except Exception as e:
logger.error(f"Error processing debit: {e}")
retry_count += 1
# If we get here, all retries failed
await self._handle_transaction_failure(transaction_data, "max_retries_exceeded")
import grpc
import asyncio
from concurrent import futures
import logging
from grpc_service.handlers.transaction import TransactionServicer
from grpc_service.interceptors.auth import AuthInterceptor
import banking_pb2_grpc
logger = logging.getLogger(__name__)
async def serve(db_pool, kafka_producer, redis_client=None):
"""Start the gRPC server with all services registered"""
server = grpc.aio.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=[AuthInterceptor()]
)
# Register service handlers
transaction_servicer = TransactionServicer(db_pool, kafka_producer, redis_client)
banking_pb2_grpc.add_TransactionServiceServicer_to_server(
transaction_servicer, server
)
server.add_insecure_port('[::]:50051')
logger.info("Starting gRPC server on port 50051")
await server.start()
try:
await server.wait_for_termination()
except KeyboardInterrupt:
await server.stop(0)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(serve())
This implementation plan provides a structured approach to building a scalable banking API with all the required components. The file structure is designed to separate concerns and make the codebase maintainable. The phase-wise plan ensures systematic development with clear milestones.
Here are several Mermaid diagrams that visualize different aspects of the Banking API architecture:
flowchart TB
Client[Client Applications] --> LB[Load Balancer]
LB --> API[FastAPI Service]
LB --> gRPCLB[gRPC Load Balancer]
gRPCLB --> gRPC[gRPC Service]
subgraph Data Storage
DB[(PostgreSQL)]
Redis[(Redis Cache)]
end
subgraph Messaging
Kafka[Apache Kafka]
end
subgraph Processing
TxWorker[Transaction Worker]
AuditWorker[Audit Worker]
FraudWorker[Fraud Detection]
end
subgraph Monitoring
Prom[Prometheus]
Graf[Grafana]
Logs[Centralized Logging]
end
API --> DB
API --> Redis
API --> Kafka
API <--> gRPC
gRPC --> DB
gRPC --> Kafka
Kafka --> TxWorker
Kafka --> AuditWorker
Kafka --> FraudWorker
TxWorker --> DB
AuditWorker --> DB
FraudWorker --> Kafka
API --> Prom
gRPC --> Prom
Kafka --> Prom
TxWorker --> Prom
Prom --> Graf
API --> Logs
gRPC --> Logs
TxWorker --> Logs
AuditWorker --> Logs
FraudWorker --> Logs
sequenceDiagram
participant Client
participant API as FastAPI Service
participant Redis
participant gRPC as gRPC Service
participant Kafka
participant Worker as Transaction Worker
participant DB as PostgreSQL
Client->>API: POST /api/v1/transactions/transfer
API->>Redis: Acquire distributed lock
Redis-->>API: Lock acquired
API->>gRPC: TransferBetweenAccounts RPC
gRPC->>DB: Begin transaction
gRPC->>DB: Check source balance
DB-->>gRPC: Balance OK
gRPC->>Kafka: Publish prepare_transfer event
gRPC->>DB: Debit source account
DB-->>gRPC: Debit OK
gRPC->>DB: Credit destination account
DB-->>gRPC: Credit OK
gRPC->>DB: Commit transaction
DB-->>gRPC: Commit OK
gRPC->>Kafka: Publish transfer_completed event
gRPC-->>API: Transfer completed response
API->>Redis: Release lock
API-->>Client: 200 OK with transaction details
Kafka-->>Worker: Consume transfer_completed event
Worker->>DB: Log transaction details
Worker->>Kafka: Publish balance_updated event
flowchart TD
Client([Client]) -- HTTP Requests --> API
InternalSystem([Internal System]) -- gRPC Calls --> gRPC
subgraph API Layer
API[FastAPI Service]
gRPC[gRPC Service]
end
subgraph Data Layer
DB[(PostgreSQL)]
Cache[(Redis Cache)]
end
subgraph Event Processing
Kafka[Kafka]
TxProcessor[Transaction Processor]
FraudDetector[Fraud Detection]
Auditor[Audit Logger]
Notifier[Notification Service]
end
API -- Read/Write --> DB
API -- Cache Operations --> Cache
API -- Publish Events --> Kafka
API -- RPC Calls --> gRPC
gRPC -- Read/Write --> DB
gRPC -- Publish Events --> Kafka
Kafka -- Transaction Events --> TxProcessor
Kafka -- Account Activity --> FraudDetector
Kafka -- All Events --> Auditor
Kafka -- Status Updates --> Notifier
TxProcessor -- Process Transactions --> DB
FraudDetector -- Flag Suspicious --> Kafka
Auditor -- Log Activities --> DB
Notifier -- Send Notifications --> Client
flowchart TB
Internet((Internet)) --> Ingress[Ingress Controller]
subgraph Kubernetes Cluster
Ingress --> APIService[API Service]
Ingress --> gRPCService[gRPC Service]
APIService --> APIPod1[API Pod 1]
APIService --> APIPod2[API Pod 2]
APIService --> APIPod3[API Pod 3]
gRPCService --> gRPCPod1[gRPC Pod 1]
gRPCService --> gRPCPod2[gRPC Pod 2]
subgraph Data Tier
PostgresStatefulSet[Postgres StatefulSet]
PostgresStatefulSet --> PostgresPrimary[Postgres Primary]
PostgresStatefulSet --> PostgresReplica1[Postgres Replica 1]
PostgresStatefulSet --> PostgresReplica2[Postgres Replica 2]
RedisStatefulSet[Redis StatefulSet]
RedisStatefulSet --> RedisMaster[Redis Master]
RedisStatefulSet --> RedisSlave1[Redis Slave 1]
end
subgraph Event Processing
KafkaStatefulSet[Kafka StatefulSet]
KafkaStatefulSet --> KafkaBroker1[Kafka Broker 1]
KafkaStatefulSet --> KafkaBroker2[Kafka Broker 2]
KafkaStatefulSet --> KafkaBroker3[Kafka Broker 3]
ZKStatefulSet[Zookeeper StatefulSet]
ZKStatefulSet --> ZK1[Zookeeper 1]
ZKStatefulSet --> ZK2[Zookeeper 2]
ZKStatefulSet --> ZK3[Zookeeper 3]
WorkerDeployment[Worker Deployment]
WorkerDeployment --> WorkerPod1[Worker Pod 1]
WorkerDeployment --> WorkerPod2[Worker Pod 2]
WorkerDeployment --> WorkerPod3[Worker Pod 3]
end
subgraph Monitoring
PrometheusDeploy[Prometheus]
GrafanaDeploy[Grafana]
AlertManagerDeploy[Alert Manager]
end
end
APIPod1 --> PostgresStatefulSet
APIPod1 --> RedisStatefulSet
APIPod1 --> KafkaStatefulSet
gRPCPod1 --> PostgresStatefulSet
gRPCPod1 --> KafkaStatefulSet
WorkerPod1 --> PostgresStatefulSet
WorkerPod1 --> KafkaStatefulSet
KafkaBroker1 --> ZKStatefulSet
erDiagram
ACCOUNTS ||--o{ BALANCES : "has current"
ACCOUNTS ||--o{ TRANSACTIONS : "has many"
TRANSACTIONS ||--o{ TRANSACTION_LOGS : "has logs"
USERS ||--o{ ACCOUNTS : "owns"
TRANSACTIONS }o--|| TRANSACTIONS : "relates to"
ACCOUNTS {
uuid account_id PK
string account_number UK
string customer_id FK
string account_type
timestamp created_at
timestamp updated_at
boolean is_active
}
BALANCES {
uuid balance_id PK
uuid account_id FK
decimal current_balance
decimal available_balance
timestamp last_updated
int64 version
}
TRANSACTIONS {
uuid transaction_id PK
uuid account_id FK
string transaction_type
decimal amount
string currency
string description
string reference_id
timestamp created_at
string status
uuid related_transaction_id FK
}
TRANSACTION_LOGS {
uuid log_id PK
uuid transaction_id FK
string event_type
jsonb metadata
timestamp created_at
string created_by
}
USERS {
uuid user_id PK
string username UK
string password_hash
string email UK
string role
boolean is_active
timestamp created_at
timestamp last_login
}
flowchart LR
API[API Service] --> SubmittedTopic
gRPC[gRPC Service] --> SubmittedTopic
subgraph Kafka Topics
SubmittedTopic[banking.transactions.submitted]
CompletedTopic[banking.transactions.completed]
FailedTopic[banking.transactions.failed]
BalancesTopic[banking.accounts.balances]
FraudTopic[banking.fraud.alerts]
NotificationsTopic[banking.notifications]
DLQTopic[banking.transactions.dlq]
end
SubmittedTopic --> TxProcessor[Transaction Processor]
TxProcessor --> CompletedTopic
TxProcessor --> FailedTopic
TxProcessor --> BalancesTopic
FailedTopic --> RetryHandler[Retry Handler]
RetryHandler -->|retry| SubmittedTopic
RetryHandler -->|max retries| DLQTopic
SubmittedTopic --> FraudDetector[Fraud Detector]
FraudDetector --> FraudTopic
CompletedTopic --> NotificationService[Notification Service]
FailedTopic --> NotificationService
FraudTopic --> NotificationService
NotificationService --> NotificationsTopic
BalancesTopic --> CacheInvalidator[Cache Invalidator]
BalancesTopic --> AnalyticsProcessor[Analytics Processor]
sequenceDiagram
participant C1 as Client 1
participant C2 as Client 2
participant API as API Service
participant Redis as Redis
participant DB as PostgreSQL
C1->>API: Debit Account A
C2->>API: Debit Account A (concurrent)
API->>Redis: Acquire lock for Account A
Redis-->>API: Lock granted
C2->>API: Debit Account A
API->>Redis: Try to acquire lock
Redis-->>API: Lock denied
API-->>C2: 429 Too Many Requests (retry)
API->>DB: Get balance with version=5
DB-->>API: Balance = $1000, version=5
API->>DB: Update balance WHERE version=5
Note over API,DB: SET balance=$900, version=6
DB-->>API: Updated 1 row
API->>Redis: Release lock
Redis-->>API: Lock released
API-->>C1: 200 OK, Transaction Complete
C2->>API: Retry Debit Account A
API->>Redis: Acquire lock for Account A
Redis-->>API: Lock granted
API->>DB: Get balance with version=6
DB-->>API: Balance = $900, version=6
API->>DB: Update balance WHERE version=6
Note over API,DB: SET balance=$800, version=7
DB-->>API: Updated 1 row
API->>Redis: Release lock
Redis-->>API: Lock released
API-->>C2: 200 OK, Transaction Complete
These diagrams provide a comprehensive view of the Banking API's architecture, data flow, deployment strategy, and transaction handling mechanisms. They visualize how the different components interact and how data moves through the system, making it easier to understand the overall design and implementation.