Skip to content

Instantly share code, notes, and snippets.

@DebopamParam
Last active June 27, 2025 17:20
Show Gist options
  • Save DebopamParam/2a6ea4da984b670e284a12de937f091c to your computer and use it in GitHub Desktop.
Save DebopamParam/2a6ea4da984b670e284a12de937f091c to your computer and use it in GitHub Desktop.
Scalable Banking API - Detailed Implementation Plan

Scalable Banking API - Detailed Implementation Plan

1. Architecture Overview

The Banking API requires a robust architecture to handle millions of transactions while ensuring consistency, low latency, and fault tolerance.

High-Level Architecture

Scroll down to see detailed Architecture

flowchart TD
    Client[Client Applications] --> LB[Load Balancer]
    LB --> API[FastAPI Application]
    API --> Redis[Redis Cache]
    API --> DB[(PostgreSQL Database)]
    API --> Kafka[Apache Kafka]
    Kafka --> WS[Worker Services]
    WS --> DB
    API --> Logger[Logging Service]
    API <--> gRPC[gRPC Services]
    gRPC <--> Kafka
    subgraph Monitoring
        Prometheus[Prometheus]
        Grafana[Grafana Dashboard]
    end
    API --> Prometheus
    Prometheus --> Grafana
Loading

Component Descriptions

  • FastAPI Application: Core API handling transaction requests
  • PostgreSQL: Primary database with ACID compliance
  • Redis: For caching and distributed locking
  • Apache Kafka: High-throughput distributed event streaming platform
  • gRPC Services: High-performance RPC framework for service communication
  • PyArrow: Efficient data serialization between services
  • Load Balancer: Distributes traffic across API instances
  • Monitoring: Prometheus and Grafana for real-time metrics
  • Logging: Centralized logging for transaction audit trails

2. Database Schema

erDiagram
    ACCOUNTS {
        uuid account_id PK
        string account_number UK
        string customer_id FK
        string account_type
        timestamp created_at
        timestamp updated_at
        boolean is_active
    }
    
    BALANCES {
        uuid balance_id PK
        uuid account_id FK
        decimal current_balance
        decimal available_balance
        timestamp last_updated
        int64 version "For optimistic locking"
    }
    
    TRANSACTIONS {
        uuid transaction_id PK
        uuid account_id FK
        string transaction_type
        decimal amount
        string currency
        string description
        string reference_id
        timestamp created_at
        string status
        uuid related_transaction_id FK "For transfers"
    }
    
    TRANSACTION_LOGS {
        uuid log_id PK
        uuid transaction_id FK
        string event_type
        jsonb metadata
        timestamp created_at
        string created_by
    }
    
    USERS {
        uuid user_id PK
        string username UK
        string password_hash
        string email UK
        string role
        boolean is_active
        timestamp created_at
        timestamp last_login
    }
    
    ACCOUNTS ||--o{ BALANCES : has
    ACCOUNTS ||--o{ TRANSACTIONS : has
    TRANSACTIONS ||--o{ TRANSACTION_LOGS : logs
    USERS ||--o{ ACCOUNTS : owns
Loading

3. API Design

Endpoints

Account Operations

  • GET /api/v1/accounts/{account_id} - Get account details
  • GET /api/v1/accounts/{account_id}/balance - Get account balance
  • GET /api/v1/accounts/{account_id}/transactions - Get transaction history

Transaction Operations

  • POST /api/v1/transactions/debit - Debit an account
  • POST /api/v1/transactions/credit - Credit an account
  • POST /api/v1/transactions/transfer - Transfer between accounts
  • GET /api/v1/transactions/{transaction_id} - Get transaction details

gRPC Service Definition

syntax = "proto3";

package banking;

service TransactionService {
  rpc DebitAccount(DebitRequest) returns (TransactionResponse);
  rpc CreditAccount(CreditRequest) returns (TransactionResponse);
  rpc TransferBetweenAccounts(TransferRequest) returns (TransactionResponse);
  rpc GetTransaction(GetTransactionRequest) returns (Transaction);
}

message DebitRequest {
  string account_id = 1;
  double amount = 2;
  string currency = 3;
  string description = 4;
  string reference_id = 5;
}

message CreditRequest {
  string account_id = 1;
  double amount = 2;
  string currency = 3;
  string description = 4;
  string reference_id = 5;
}

message TransferRequest {
  string source_account_id = 1;
  string destination_account_id = 2;
  double amount = 3;
  string currency = 4;
  string description = 5;
  string reference_id = 6;
}

message TransactionResponse {
  string transaction_id = 1;
  string status = 2;
  string timestamp = 3;
  string account_id = 4;
  double amount = 5;
  string currency = 6;
  double available_balance = 7;
}

message GetTransactionRequest {
  string transaction_id = 1;
}

message Transaction {
  string transaction_id = 1;
  string account_id = 2;
  string transaction_type = 3;
  double amount = 4;
  string currency = 5;
  string description = 6;
  string reference_id = 7;
  string created_at = 8;
  string status = 9;
  string related_transaction_id = 10;
}

gRPC Server Implementation

import grpc
import banking_pb2
import banking_pb2_grpc
from concurrent import futures
from decimal import Decimal
from app.services.transaction import debit_account, credit_account, transfer_between_accounts

class TransactionServicer(banking_pb2_grpc.TransactionServiceServicer):
    def __init__(self, db_pool):
        self.db_pool = db_pool

    async def DebitAccount(self, request, context):
        async with self.db_pool.acquire() as db:
            result = await debit_account(
                account_id=request.account_id,
                amount=Decimal(str(request.amount)),
                currency=request.currency,
                description=request.description,
                reference_id=request.reference_id,
                db=db
            )
            
            return banking_pb2.TransactionResponse(
                transaction_id=str(result.transaction_id),
                status=result.status,
                timestamp=result.created_at.isoformat(),
                account_id=str(result.account_id),
                amount=float(result.amount),
                currency=result.currency,
                available_balance=float(result.available_balance)
            )
    
    # Similar implementations for other methods

async def serve():
    server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))
    banking_pb2_grpc.add_TransactionServiceServicer_to_server(
        TransactionServicer(db_pool), server
    )
    server.add_insecure_port('[::]:50051')
    await server.start()
    await server.wait_for_termination()

4. Apache Kafka Integration

Kafka Topic Structure

banking.transactions.submitted    - New transaction requests
banking.transactions.completed    - Successfully processed transactions
banking.transactions.failed       - Failed transactions
banking.accounts.balances         - Account balance updates
banking.fraud.alerts              - Fraud detection alerts
banking.notifications             - User notifications

Kafka Producer for Transactions

from kafka import KafkaProducer
import json
import pyarrow as pa
import pyarrow.parquet as pq
import io
from uuid import UUID
from decimal import Decimal

class TransactionProducer:
    def __init__(self, bootstrap_servers):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: self._serialize_transaction(v)
        )
        
        # Define schema for transaction records
        self.transaction_schema = pa.schema([
            ('transaction_id', pa.string()),
            ('account_id', pa.string()),
            ('transaction_type', pa.string()),
            ('amount', pa.decimal128(precision=18, scale=2)),
            ('currency', pa.string()),
            ('description', pa.string()),
            ('reference_id', pa.string()),
            ('created_at', pa.timestamp('us')),
            ('status', pa.string())
        ])
    
    def _serialize_transaction(self, transaction_data):
        """Serialize transaction using PyArrow for efficient binary representation"""
        # Convert UUID and Decimal to appropriate types
        serializable_data = {
            **transaction_data,
            'transaction_id': str(transaction_data['transaction_id']),
            'account_id': str(transaction_data['account_id']),
            'amount': transaction_data['amount']
        }
        
        # Create PyArrow record batch
        batch = pa.RecordBatch.from_pylist([serializable_data], schema=self.transaction_schema)
        
        # Write to in-memory buffer as Parquet format
        buf = io.BytesIO()
        pq.write_table(pa.Table.from_batches([batch]), buf)
        
        return buf.getvalue()
    
    async def send_transaction(self, transaction_data, transaction_type):
        """Send transaction data to appropriate Kafka topic"""
        topic = f"banking.transactions.{transaction_type}"
        
        # Async send to Kafka topic
        future = self.producer.send(
            topic=topic,
            value=transaction_data
        )
        
        # Wait for send to complete
        try:
            record_metadata = future.get(timeout=10)
            return record_metadata
        except Exception as e:
            print(f"Failed to send transaction to Kafka: {e}")
            raise

Kafka Consumer for Transaction Processing

from kafka import KafkaConsumer
import pyarrow as pa
import pyarrow.parquet as pq
import io
from decimal import Decimal
from datetime import datetime
import threading

class TransactionConsumer:
    def __init__(self, bootstrap_servers, db_pool, transaction_service):
        self.consumer = KafkaConsumer(
            'banking.transactions.submitted',
            bootstrap_servers=bootstrap_servers,
            group_id='transaction-processor',
            auto_offset_reset='earliest',
            enable_auto_commit=False
        )
        self.db_pool = db_pool
        self.transaction_service = transaction_service
        
    def _deserialize_transaction(self, message_value):
        """Deserialize transaction data from PyArrow/Parquet format"""
        buf = io.BytesIO(message_value)
        table = pq.read_table(buf)
        
        # Convert to Python dict
        record = table.to_pylist()[0]
        
        # Convert string representations back to appropriate types
        record['transaction_id'] = UUID(record['transaction_id'])
        record['account_id'] = UUID(record['account_id'])
        record['amount'] = Decimal(str(record['amount']))
        
        return record
    
    async def process_transactions(self):
        """Process transactions from Kafka topic"""
        for message in self.consumer:
            try:
                # Deserialize the transaction data
                transaction_data = self._deserialize_transaction(message.value)
                
                # Process based on transaction type
                if transaction_data['transaction_type'] == 'debit':
                    result = await self.transaction_service.process_debit(transaction_data)
                elif transaction_data['transaction_type'] == 'credit':
                    result = await self.transaction_service.process_credit(transaction_data)
                elif transaction_data['transaction_type'] == 'transfer':
                    result = await self.transaction_service.process_transfer(transaction_data)
                
                # Publish result to completed topic
                await self.publish_result(result, 'completed')
                
                # Commit offset after successful processing
                self.consumer.commit()
                
            except Exception as e:
                # Log error and publish to failed topic
                print(f"Error processing transaction: {e}")
                transaction_data['error'] = str(e)
                transaction_data['status'] = 'failed'
                await self.publish_result(transaction_data, 'failed')
    
    def start(self):
        """Start the consumer in a background thread"""
        thread = threading.Thread(target=self.process_transactions, daemon=True)
        thread.start()

5. Concurrency Management Strategy

Optimistic Locking with Kafka Event Sourcing

async def debit_account(account_id: UUID, amount: Decimal, db: AsyncSession, kafka_producer):
    # First, get the balance with current version
    balance = await db.get(Balance, account_id)
    if balance.available_balance < amount:
        raise InsufficientFundsError()
    
    # Update balance with version check
    result = await db.execute(
        update(Balance)
        .where(Balance.account_id == account_id, Balance.version == balance.version)
        .values(
            current_balance=Balance.current_balance - amount,
            available_balance=Balance.available_balance - amount,
            version=Balance.version + 1,
            last_updated=datetime.utcnow()
        )
        .returning(Balance)
    )
    
    updated_balance = result.scalar_one_or_none()
    if not updated_balance:
        # Version mismatch, publish conflict event to Kafka
        await kafka_producer.send_transaction(
            {
                'transaction_id': uuid.uuid4(),
                'account_id': account_id,
                'event_type': 'concurrency_conflict',
                'timestamp': datetime.utcnow().isoformat()
            },
            'failed'
        )
        raise ConcurrencyError("Balance was updated by another transaction")
        
    # Publish balance update event to Kafka
    await kafka_producer.send_transaction(
        {
            'account_id': account_id,
            'balance': float(updated_balance.current_balance),
            'available_balance': float(updated_balance.available_balance),
            'version': updated_balance.version,
            'timestamp': updated_balance.last_updated.isoformat()
        },
        'balances'
    )
    
    return updated_balance

Distributed Locks with Redis

async def process_transaction(transaction_data: dict, redis: Redis, db: AsyncSession, kafka_producer):
    account_id = transaction_data["account_id"]
    lock_name = f"account_lock:{account_id}"
    
    # Acquire distributed lock
    lock = redis.lock(lock_name, timeout=5, blocking_timeout=10)
    try:
        if lock.acquire():
            # Execute transaction in database transaction
            async with db.begin():
                # Process transaction logic here
                result = await perform_transaction(transaction_data, db)
                
                # Publish result to Kafka
                await kafka_producer.send_transaction(result, 'completed')
                
            return result
        else:
            # If lock acquisition fails, publish to Kafka for retry
            transaction_data['status'] = 'retry'
            transaction_data['retry_timestamp'] = datetime.utcnow().isoformat()
            await kafka_producer.send_transaction(transaction_data, 'submitted')
            
            raise ResourceLockError("Could not acquire lock for account")
    finally:
        # Make sure to release the lock
        try:
            lock.release()
        except:
            pass

6. Transaction Consistency Implementation

Event Sourcing Pattern with Kafka

flowchart TD
    API[FastAPI Service] --> Command[Command Validation]
    Command --> Kafka[Kafka Transaction Stream]
    Kafka --> Process[Transaction Processor]
    Process --> DB[(Database Write)]
    Process --> Events[Kafka Event Stream]
    Events --> Projection[Balance Projection]
    Events --> Notification[Notification Service]
    Events --> Analytics[Analytics Service]
Loading

Two-Phase Commit with Kafka Coordination

from datetime import datetime
import uuid

async def transfer_funds(source_id, dest_id, amount, description, kafka_producer, db):
    # Generate transaction IDs
    transfer_id = uuid.uuid4()
    debit_id = uuid.uuid4()
    credit_id = uuid.uuid4()
    
    # Phase 1: Prepare by publishing intent to Kafka
    prepare_event = {
        'transfer_id': str(transfer_id),
        'source_account_id': str(source_id),
        'destination_account_id': str(dest_id),
        'amount': float(amount),
        'debit_id': str(debit_id),
        'credit_id': str(credit_id),
        'status': 'preparing',
        'timestamp': datetime.utcnow().isoformat()
    }
    
    await kafka_producer.send_transaction(prepare_event, 'transfers.preparing')
    
    try:
        # Execute debit
        debit_result = await debit_account(
            account_id=source_id, 
            amount=amount, 
            transaction_id=debit_id,
            related_transaction_id=transfer_id,
            db=db,
            kafka_producer=kafka_producer
        )
        
        # Execute credit
        credit_result = await credit_account(
            account_id=dest_id, 
            amount=amount, 
            transaction_id=credit_id,
            related_transaction_id=transfer_id,
            db=db,
            kafka_producer=kafka_producer
        )
        
        # Phase 2: Commit by publishing completion to Kafka
        complete_event = {
            'transfer_id': str(transfer_id),
            'source_account_id': str(source_id),
            'destination_account_id': str(dest_id),
            'amount': float(amount),
            'debit_id': str(debit_id),
            'credit_id': str(credit_id),
            'status': 'completed',
            'timestamp': datetime.utcnow().isoformat()
        }
        
        await kafka_producer.send_transaction(complete_event, 'transfers.completed')
        
        return {
            'transfer_id': transfer_id,
            'status': 'completed',
            'debit_transaction': debit_result,
            'credit_transaction': credit_result
        }
        
    except Exception as e:
        # If any step fails, publish compensation event to Kafka
        compensation_event = {
            'transfer_id': str(transfer_id),
            'source_account_id': str(source_id),
            'destination_account_id': str(dest_id),
            'amount': float(amount),
            'debit_id': str(debit_id),
            'credit_id': str(credit_id),
            'status': 'failed',
            'error': str(e),
            'timestamp': datetime.utcnow().isoformat()
        }
        
        await kafka_producer.send_transaction(compensation_event, 'transfers.failed')
        
        # Compensating transactions will be handled by a separate consumer
        
        raise TransferFailedError(f"Transfer failed: {str(e)}")

7. Performance Optimization

PyArrow for Efficient Data Serialization

import pyarrow as pa
import pyarrow.parquet as pq
import io
from decimal import Decimal
from datetime import datetime

class ArrowSerializer:
    """Efficient serialization using PyArrow for high-performance data interchange"""
    
    def __init__(self):
        # Define schema for different message types
        self.transaction_schema = pa.schema([
            ('transaction_id', pa.string()),
            ('account_id', pa.string()),
            ('transaction_type', pa.string()),
            ('amount', pa.decimal128(precision=18, scale=2)),
            ('currency', pa.string()),
            ('description', pa.string()),
            ('reference_id', pa.string()),
            ('timestamp', pa.timestamp('us')),
            ('status', pa.string())
        ])
        
        self.balance_schema = pa.schema([
            ('account_id', pa.string()),
            ('current_balance', pa.decimal128(precision=18, scale=2)),
            ('available_balance', pa.decimal128(precision=18, scale=2)),
            ('version', pa.int32()),
            ('timestamp', pa.timestamp('us'))
        ])
    
    def serialize(self, data, schema_type='transaction'):
        """Serialize data to binary format using PyArrow"""
        if schema_type == 'transaction':
            schema = self.transaction_schema
        elif schema_type == 'balance':
            schema = self.balance_schema
        else:
            raise ValueError(f"Unknown schema type: {schema_type}")
        
        # Create a record batch
        batch = pa.RecordBatch.from_pylist([data], schema=schema)
        
        # Write to in-memory buffer
        buf = io.BytesIO()
        writer = pa.ipc.new_stream(buf, schema)
        writer.write_batch(batch)
        writer.close()
        
        return buf.getvalue()
    
    def deserialize(self, binary_data, schema_type='transaction'):
        """Deserialize binary data using PyArrow"""
        buf = io.BytesIO(binary_data)
        reader = pa.ipc.open_stream(buf)
        
        # Read all batches and concatenate
        batches = [batch for batch in reader]
        table = pa.Table.from_batches(batches)
        
        # Convert to Python dictionary
        records = table.to_pylist()
        
        # Handle decimal and timestamp conversions
        for record in records:
            for key, value in record.items():
                if isinstance(value, pa.lib.Decimal128Value):
                    record[key] = Decimal(str(value))
                elif isinstance(value, datetime):
                    record[key] = value.isoformat()
        
        return records[0] if records else None

Database Indexing Strategy

-- Account lookups by account number
CREATE INDEX idx_accounts_account_number ON accounts(account_number);

-- Fast balance lookups
CREATE INDEX idx_balances_account_id ON balances(account_id);

-- Transaction history queries
CREATE INDEX idx_transactions_account_id ON transactions(account_id);
CREATE INDEX idx_transactions_created_at ON transactions(created_at);

-- Composite index for account-based time-range queries
CREATE INDEX idx_transactions_account_time ON transactions(account_id, created_at);

-- Partial index for active transactions (status = 'pending')
CREATE INDEX idx_transactions_pending ON transactions(account_id, created_at)
WHERE status = 'pending';

Redis Caching with Invalidation via Kafka

async def get_account_balance(account_id: UUID, redis: Redis, db: AsyncSession):
    # Try to get from cache first
    cache_key = f"balance:{account_id}"
    cached_balance = await redis.get(cache_key)
    
    if cached_balance:
        return json.loads(cached_balance)
    
    # If not in cache, get from database
    balance = await db.get(Balance, account_id)
    
    # Store in cache with expiration (short TTL for balances)
    balance_dict = balance.to_dict()
    await redis.set(cache_key, json.dumps(balance_dict), ex=60)  # 60 seconds
    
    return balance_dict

# Kafka consumer for balance updates to invalidate cache
class BalanceCacheInvalidator:
    def __init__(self, bootstrap_servers, redis_client):
        self.consumer = KafkaConsumer(
            'banking.accounts.balances',
            bootstrap_servers=bootstrap_servers,
            group_id='cache-invalidator',
            auto_offset_reset='latest'
        )
        self.redis = redis_client
    
    async def start_invalidation(self):
        for message in self.consumer:
            try:
                balance_event = json.loads(message.value.decode('utf-8'))
                account_id = balance_event.get('account_id')
                
                if account_id:
                    # Invalidate cache for this account
                    cache_key = f"balance:{account_id}"
                    await self.redis.delete(cache_key)
            except Exception as e:
                print(f"Error processing balance update: {e}")

8. Error Handling and Recovery

Kafka-Based Dead Letter Queue

class DeadLetterQueueHandler:
    def __init__(self, bootstrap_servers):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        
        self.consumer = KafkaConsumer(
            'banking.transactions.failed',
            bootstrap_servers=bootstrap_servers,
            group_id='dlq-handler',
            auto_offset_reset='earliest'
        )
    
    async def send_to_dlq(self, failed_message, error, topic='banking.transactions.dlq'):
        """Send a failed message to the dead letter queue"""
        dlq_message = {
            'original_message': failed_message,
            'error': str(error),
            'timestamp': datetime.utcnow().isoformat(),
            'retry_count': failed_message.get('retry_count', 0) + 1
        }
        
        self.producer.send(topic, dlq_message)
    
    async def process_failed_transactions(self, retry_handler):
        """Process messages from the failed transactions topic"""
        for message in self.consumer:
            try:
                failed_transaction = json.loads(message.value.decode('utf-8'))
                
                # Check if we should retry
                retry_count = failed_transaction.get('retry_count', 0)
                if retry_count < 3:  # Max retry attempts
                    # Update retry count and send back to main topic
                    failed_transaction['retry_count'] = retry_count + 1
                    failed_transaction['last_retry'] = datetime.utcnow().isoformat()
                    
                    # Send to retry handler
                    await retry_handler(failed_transaction)
                else:
                    # Max retries exceeded, send to permanent DLQ
                    await self.send_to_dlq(
                        failed_transaction, 
                        "Max retry attempts exceeded",
                        'banking.transactions.permanent.dlq'
                    )
                    
                    # Also notify monitoring system
                    await self.notify_monitoring(failed_transaction)
                    
            except Exception as e:
                print(f"Error processing failed transaction: {e}")
    
    async def notify_monitoring(self, failed_transaction):
        """Send notification to monitoring system about permanent failure"""
        # Implementation depends on monitoring system
        pass

Structured Error Response with Kafka Tracing

from fastapi import Request, status
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
import uuid

# Transaction ID and trace ID middleware
@app.middleware("http")
async def add_transaction_tracing(request: Request, call_next):
    # Generate transaction and trace IDs for tracking
    transaction_id = str(uuid.uuid4())
    trace_id = str(uuid.uuid4())
    
    # Add to request state
    request.state.transaction_id = transaction_id
    request.state.trace_id = trace_id
    
    # Add to headers
    response = await call_next(request)
    response.headers["X-Transaction-ID"] = transaction_id
    response.headers["X-Trace-ID"] = trace_id
    
    return response

# Custom exception handler with Kafka logging
async def insufficient_funds_exception_handler(
    request: Request, 
    exc: InsufficientFundsError,
    kafka_producer
):
    error_data = {
        "error": {
            "code": "INSUFFICIENT_FUNDS",
            "message": str(exc),
            "transaction_id": exc.transaction_id,
            "trace_id": request.state.trace_id,
            "timestamp": datetime.utcnow().isoformat(),
            "request_id": request.state.request_id
        }
    }
    
    # Log error to Kafka
    await kafka_producer.send_transaction(
        {
            "transaction_id": exc.transaction_id,
            "trace_id": request.state.trace_id,
            "error_type": "INSUFFICIENT_FUNDS",
            "error_message": str(exc),
            "timestamp": datetime.utcnow().isoformat()
        },
        'errors'
    )
    
    return JSONResponse(
        status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
        content=error_data
    )

9. Deployment Strategy

Docker Compose with Kafka

version: '3.8'

services:
  api:
    build: ./api
    ports:
      - "8000:8000"
    depends_on:
      - db
      - redis
      - kafka
      - schema-registry
    environment:
      - DATABASE_URL=postgresql+asyncpg://user:password@db:5432/banking
      - REDIS_URL=redis://redis:6379/0
      - KAFKA_BOOTSTRAP_SERVERS=kafka:9092
      - SCHEMA_REGISTRY_URL=http://schema-registry:8081
    volumes:
      - ./api:/app
    restart: always

  worker:
    build: ./worker
    depends_on:
      - kafka
      - db
    environment:
      - DATABASE_URL=postgresql+asyncpg://user:password@db:5432/banking
      - KAFKA_BOOTSTRAP_SERVERS=kafka:9092
      - SCHEMA_REGISTRY_URL=http://schema-registry:8081
    volumes:
      - ./worker:/app
    restart: always

  grpc-service:
    build: ./grpc
    ports:
      - "50051:50051"
    depends_on:
      - db
      - kafka
    environment:
      - DATABASE_URL=postgresql+asyncpg://user:password@db:5432/banking
      - KAFKA_BOOTSTRAP_SERVERS=kafka:9092
    volumes:
      - ./grpc:/app
    restart: always

  db:
    image: postgres:15
    volumes:
      - postgres-data:/var/lib/postgresql/data
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=banking
    ports:
      - "5432:5432"

  redis:
    image: redis:7
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data

  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.2
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.3.2
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  schema-registry:
    image: confluentinc/cp-schema-registry:7.3.2
    depends_on:
      - kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    depends_on:
      - kafka
      - schema-registry
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
      KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081

  prometheus:
    image: prom/prometheus
    volumes:
      - ./prometheus:/etc/prometheus
      - prometheus-data:/prometheus
    ports:
      - "9090:9090"

  grafana:
    image: grafana/grafana
    volumes:
      - grafana-data:/var/lib/grafana
    ports:
      - "3000:3000"
    depends_on:
      - prometheus

volumes:
  postgres-data:
  redis-data:
  prometheus-data:
  grafana-data:

Kubernetes Deployment

# kafka-statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
spec:
  serviceName: "kafka"
  replicas: 3
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      containers:
      - name: kafka
        image: confluentinc/cp-kafka:7.3.2
        ports:
        - containerPort: 9092
          name: kafka
        env:
        - name: KAFKA_BROKER_ID
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: "zookeeper:2181"
        - name: KAFKA_ADVERTISED_LISTENERS
          value: "PLAINTEXT://$(POD_NAME).kafka:9092"
        - name: POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
          value: "3"
        volumeMounts:
        - name: kafka-data
          mountPath: /var/lib/kafka/data
  volumeClaimTemplates:
  - metadata:
      name: kafka-data
    spec:
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 10Gi

10. Testing Strategy

gRPC Load Testing with ghz

# grpc-load-test.yaml
concurrency: 50
total: 10000
call: banking.TransactionService.DebitAccount
host: localhost:50051
data:
  account_id: "123e4567-e89b-12d3-a456-426614174000"
  amount: 10.00
  currency: "USD"
  description: "Load test transaction"
  reference_id: "LOAD-TEST-{{.RequestNumber}}"

Integration Testing with Kafka Test Containers

import pytest
from testcontainers.kafka import KafkaContainer
from testcontainers.postgres import PostgresContainer
from kafka import KafkaProducer, KafkaConsumer
import json
import asyncio
from app.main import app
from httpx import AsyncClient

@pytest.fixture(scope="session")
def kafka_container():
    with KafkaContainer() as kafka:
        yield kafka

@pytest.fixture(scope="session")
def postgres_container():
    with PostgresContainer() as postgres:
        yield postgres

@pytest.fixture
async def kafka_producer(kafka_container):
    producer = KafkaProducer(
        bootstrap_servers=kafka_container.get_bootstrap_server(),
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    yield producer
    producer.close()

@pytest.fixture
async def kafka_consumer(kafka_container):
    consumer = KafkaConsumer(
        'banking.transactions.completed',
        bootstrap_servers=kafka_container.get_bootstrap_server(),
        auto_offset_reset='earliest',
        group_id='test-group',
        value_deserializer=lambda m: json.loads(m.decode('utf-8'))
    )
    yield consumer
    consumer.close()

@pytest.mark.asyncio
async def test_transaction_flow(kafka_producer, kafka_consumer, postgres_container):
    # Configure app to use test containers
    app.dependency_overrides[get_db_pool] = lambda: postgres_container.get_connection_url()
    app.dependency_overrides[get_kafka_producer] = lambda: kafka_producer
    
    # Create test account
    async with AsyncClient(app=app, base_url="http://test") as client:
        # Create account
        account_response = await client.post("/api/v1/accounts", json={
            "customer_id": "test-customer",
            "account_type": "checking"
        })
        assert account_response.status_code == 201
        account_id = account_response.json()["account_id"]
        
        # Initial credit to account
        await client.post("/api/v1/transactions/credit", json={
            "account_id": account_id,
            "amount": 1000.00,
            "currency": "USD",
            "description": "Initial credit",
            "reference_id": "TEST-INIT-CREDIT"
        })
        
        # Perform debit transaction
        debit_response = await client.post("/api/v1/transactions/debit", json={
            "account_id": account_id,
            "amount": 100.00,
            "currency": "USD",
            "description": "Test debit",
            "reference_id": "TEST-DEBIT-1"
        })
        
        assert debit_response.status_code == 200
        debit_result = debit_response.json()
        
        # Check for message in Kafka
        message = next(kafka_consumer)
        assert message.value["transaction_id"] == debit_result["transaction_id"]
        assert message.value["status"] == "completed"
        
        # Verify balance
        balance_response = await client.get(f"/api/v1/accounts/{account_id}/balance")
        assert balance_response.status_code == 200
        assert balance_response.json()["available_balance"] == 900.0

11. Implementation Roadmap

Phase 1: Core API and Database (2 weeks)

  1. Set up FastAPI project structure
  2. Implement database models and migrations
  3. Create basic account endpoints
  4. Set up Kafka infrastructure and topics
  5. Implement PyArrow serialization
  6. Set up Docker development environment

Phase 2: Transaction Processing with Kafka (2 weeks)

  1. Implement Kafka producers and consumers
  2. Develop transaction event sourcing pattern
  3. Set up Redis caching with Kafka invalidation
  4. Implement optimistic locking with versioning
  5. Configure transaction consistency mechanisms

Phase 3: High-Performance gRPC Services (1 week)

  1. Define gRPC service definitions
  2. Implement gRPC services for critical operations
  3. Set up client-server communication
  4. Integrate with Kafka for event propagation

Phase 4: Reliability & Monitoring (1 week)

  1. Set up dead letter queues and retry mechanisms
  2. Implement Prometheus metrics for Kafka and gRPC
  3. Configure Grafana dashboards
  4. Set up centralized logging with correlation IDs

Phase 5: Testing & Deployment (2 weeks)

  1. Implement unit and integration tests
  2. Set up CI/CD pipeline with test containers
  3. Deploy to Kubernetes environment
  4. Load testing and performance optimization

12. Additional Features

Rate Limiting with Redis and Kafka Analytics

from fastapi import Depends, HTTPException, Request
import time
import redis
import asyncio

class RateLimiter:
    def __init__(self, redis_client: redis.Redis, kafka_producer=None):
        self.redis_client = redis_client
        self.kafka_producer = kafka_producer
        self.rate_limit = 5  # requests
        self.time_window = 1  # second
    
    async def __call__(self, request: Request):
        client_ip = request.client.host
        key = f"rate_limit:{client_ip}"
        
        current = self.redis_client.get(key)
        if current is None:
            self.redis_client.set(key, 1, ex=self.time_window)
            
            # Log rate limit event to Kafka for analytics
            if self.kafka_producer:
                await self.kafka_producer.send_transaction(
                    {
                        'client_ip': client_ip,
                        'endpoint': request.url.path,
                        'count': 1,
                        'timestamp': time.time(),
                        'limit': self.rate_limit
                    },
                    'rate_limits'
                )
            return
        
        if int(current) >= self.rate_limit:
            # Log rate limit exceeded event
            if self.kafka_producer:
                await self.kafka_producer.send_transaction(
                    {
                        'client_ip': client_ip,
                        'endpoint': request.url.path,
                        'exceeded': True,
                        'timestamp': time.time(),
                        'limit': self.rate_limit
                    },
                    'rate_limits.exceeded'
                )
            
            raise HTTPException(
                status_code=429, 
                detail="Rate limit exceeded. Try again later."
            )
        
        self.redis_client.incr(key)
        
        # Log rate limit event
        if self.kafka_producer:
            await self.kafka_producer.send_transaction(
                {
                    'client_ip': client_ip,
                    'endpoint': request.url.path,
                    'count': int(current) + 1,
                    'timestamp': time.time(),
                    'limit': self.rate_limit
                },
                'rate_limits'
            )

Real-time Fraud Detection with Kafka Streams

from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka import Producer, Consumer
import json
import time
from typing import Dict, Any, List
import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest

class FraudDetectionService:
    def __init__(self, bootstrap_servers: str):
        """Initialize fraud detection with Kafka streams"""
        self.bootstrap_servers = bootstrap_servers
        self.producer = Producer({'bootstrap.servers': bootstrap_servers})
        
        # Create topics if they don't exist
        self._create_topics()
        
        # Train initial model
        self.model = self._train_initial_model()
        
        # Transaction history for user/account
        self.transaction_history: Dict[str, List[Dict[str, Any]]] = {}
        
    def _create_topics(self):
        """Create required Kafka topics"""
        admin_client = AdminClient({'bootstrap.servers': self.bootstrap_servers})
        
        topics = [
            NewTopic('banking.transactions.submitted', num_partitions=5, replication_factor=3),
            NewTopic('banking.transactions.completed', num_partitions=5, replication_factor=3),
            NewTopic('banking.fraud.alerts', num_partitions=3, replication_factor=3)
        ]
        
        admin_client.create_topics(topics)
    
    def _train_initial_model(self):
        """Train initial fraud detection model"""
        # In production, this would load historical transaction data
        # For this example, we're creating a simple isolation forest model
        model = IsolationForest(random_state=42, contamination=0.01)
        # Train on sample data or return pre-trained model
        return model
    
    def _extract_features(self, transaction: Dict[str, Any]) -> np.ndarray:
        """Extract features from transaction for fraud detection"""
        # Features might include:
        # - Transaction amount
        # - Time since last transaction
        # - Difference from average transaction amount
        # - Geographic location features
        # - Device information
        
        # Simplified example:
        features = [
            float(transaction['amount']),
            float(transaction.get('time_since_last', 0)),
            float(transaction.get('deviation_from_avg', 0))
        ]
        
        return np.array(features).reshape(1, -1)
    
    def _enrich_transaction(self, transaction: Dict[str, Any]) -> Dict[str, Any]:
        """Add derived features to transaction data"""
        account_id = transaction['account_id']
        
        # Get account history or initialize empty list
        history = self.transaction_history.get(account_id, [])
        
        # Calculate time since last transaction
        if history:
            last_transaction_time = float(history[-1]['timestamp'])
            time_since_last = time.time() - last_transaction_time
        else:
            time_since_last = 0
        
        # Calculate average amount and deviation
        if history:
            amounts = [float(tx['amount']) for tx in history]
            avg_amount = sum(amounts) / len(amounts)
            deviation_from_avg = float(transaction['amount']) - avg_amount
        else:
            avg_amount = float(transaction['amount'])
            deviation_from_avg = 0
        
        # Add enriched features
        enriched = transaction.copy()
        enriched['time_since_last'] = time_since_last
        enriched['avg_amount'] = avg_amount
        enriched['deviation_from_avg'] = deviation_from_avg
        
        # Update history
        if len(history) > 100:  # Keep last 100 transactions
            history.pop(0)
        history.append({
            'amount': transaction['amount'],
            'timestamp': time.time(),
            'transaction_id': transaction['transaction_id']
        })
        
        self.transaction_history[account_id] = history
        
        return enriched
    
    def detect_fraud(self, transaction: Dict[str, Any]) -> Dict[str, Any]:
        """Detect if a transaction is fraudulent"""
        # Enrich transaction with derived features
        enriched = self._enrich_transaction(transaction)
        
        # Extract features for model
        features = self._extract_features(enriched)
        
        # Predict anomaly score (-1 for anomalies, 1 for normal)
        score = self.model.decision_function(features)[0]
        
        # Convert to probability-like score (0 to 1, where 1 is most anomalous)
        fraud_score = 1 - (score + 1) / 2
        
        # Add fraud detection results
        result = enriched.copy()
        result['fraud_score'] = float(fraud_score)
        result['is_fraudulent'] = fraud_score > 0.9  # Threshold for fraud alert
        
        # If fraudulent, send alert to Kafka
        if result['is_fraudulent']:
            self._send_fraud_alert(result)
        
        return result
    
    def _send_fraud_alert(self, transaction: Dict[str, Any]):
        """Send fraud alert to Kafka topic"""
        alert = {
            'transaction_id': transaction['transaction_id'],
            'account_id': transaction['account_id'],
            'amount': transaction['amount'],
            'fraud_score': transaction['fraud_score'],
            'timestamp': time.time(),
            'alert_id': f"fraud-{time.time()}"
        }
        
        self.producer.produce(
            'banking.fraud.alerts',
            key=transaction['account_id'],
            value=json.dumps(alert)
        )
        self.producer.flush()
    
    def start_processing(self):
        """Start processing transactions from Kafka for fraud detection"""
        consumer = Consumer({
            'bootstrap.servers': self.bootstrap_servers,
            'group.id': 'fraud-detection-service',
            'auto.offset.reset': 'latest'
        })
        
        consumer.subscribe(['banking.transactions.submitted'])
        
        try:
            while True:
                msg = consumer.poll(1.0)
                
                if msg is None:
                    continue
                if msg.error():
                    print(f"Consumer error: {msg.error()}")
                    continue
                
                try:
                    transaction = json.loads(msg.value().decode('utf-8'))
                    
                    # Process transaction for fraud detection
                    result = self.detect_fraud(transaction)
                    
                    # Forward the transaction with fraud score to appropriate topic
                    target_topic = 'banking.transactions.completed'
                    if result['is_fraudulent']:
                        target_topic = 'banking.transactions.held'
                    
                    self.producer.produce(
                        target_topic,
                        key=transaction['account_id'],
                        value=json.dumps(result)
                    )
                    
                except Exception as e:
                    print(f"Error processing transaction: {e}")
                
        finally:
            consumer.close()

13. Security Considerations

JWT Authentication with gRPC Interceptors

from grpc import StatusCode, RpcError
import grpc
import jwt
from datetime import datetime, timedelta
from typing import Dict, Any, Callable

# JWT Configuration
JWT_SECRET = "your-secret-key"  # In production, use environment variable
JWT_ALGORITHM = "HS256"
JWT_EXPIRATION = 30  # minutes

def create_jwt_token(data: Dict[str, Any]) -> str:
    """Create a JWT token"""
    expiration = datetime.utcnow() + timedelta(minutes=JWT_EXPIRATION)
    payload = {
        **data,
        "exp": expiration
    }
    return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)

def verify_jwt_token(token: str) -> Dict[str, Any]:
    """Verify and decode JWT token"""
    try:
        return jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
    except jwt.PyJWTError:
        raise ValueError("Invalid token")

# gRPC Auth Interceptor
class AuthInterceptor(grpc.ServerInterceptor):
    def __init__(self):
        def abort(ignored_request, context):
            context.abort(StatusCode.UNAUTHENTICATED, "Invalid or missing token")
            
        self._abort_handler = grpc.unary_unary_rpc_method_handler(abort)
    
    def intercept_service(self, continuation, handler_call_details):
        # Extract metadata
        metadata = dict(handler_call_details.invocation_metadata)
        
        # Check for auth token in metadata
        auth_token = metadata.get('authorization', '')
        if not auth_token.startswith('Bearer '):
            return self._abort_handler
            
        token = auth_token[7:]  # Remove 'Bearer ' prefix
        
        try:
            # Verify token
            payload = verify_jwt_token(token)
            
            # Add user info to context
            context = handler_call_details.invocation_metadata
            context = context + (('user_id', payload.get('user_id')),)
            
            # Continue with the call
            return continuation(handler_call_details._replace(invocation_metadata=context))
            
        except ValueError:
            return self._abort_handler

# gRPC Server with Auth
def create_grpc_server():
    server = grpc.server(
        ThreadPoolExecutor(max_workers=10),
        interceptors=[AuthInterceptor()]
    )
    
    # Add servicers here
    banking_pb2_grpc.add_TransactionServiceServicer_to_server(
        TransactionServicer(), server
    )
    
    server.add_insecure_port('[::]:50051')
    return server

API Security Headers and CORS

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from starlette.middleware.sessions import SessionMiddleware
from starlette.middleware.httpsredirect import HTTPSRedirectMiddleware

app = FastAPI()

# HTTPS Redirect in production
app.add_middleware(HTTPSRedirectMiddleware)

# Trusted Host validation
app.add_middleware(
    TrustedHostMiddleware, 
    allowed_hosts=["api.yourbank.com", "www.yourbank.com"]
)

# CORS configuration
app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://app.yourbank.com"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Compression for response payloads
app.add_middleware(GZipMiddleware, minimum_size=1000)

# Session middleware for web sessions
app.add_middleware(
    SessionMiddleware, 
    secret_key="your-secret-key",
    max_age=3600  # 1 hour
)

# Security headers middleware
@app.middleware("http")
async def add_security_headers(request, call_next):
    response = await call_next(request)
    
    # Add security headers
    response.headers["X-Content-Type-Options"] = "nosniff"
    response.headers["X-Frame-Options"] = "DENY"
    response.headers["X-XSS-Protection"] = "1; mode=block"
    response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
    response.headers["Content-Security-Policy"] = "default-src 'self'"
    
    return response

Banking API Folder Structure and Implementation Plan

I'll create a comprehensive folder structure for the banking API project and outline a phase-wise implementation plan for all files.

Folder Structure

banking-api/
├── api/                           # FastAPI application
│   ├── main.py                    # Application entry point
│   ├── config.py                  # Configuration settings
│   ├── dependencies.py            # Dependency injection
│   ├── middleware/                # API middleware components
│   ├── routes/                    # API endpoints
│   ├── schemas/                   # Pydantic schemas
│   ├── services/                  # Business logic services
│   └── utils/                     # Utility functions
├── common/                        # Shared code and utilities
│   ├── kafka/                     # Kafka utilities
│   ├── serializers/               # Data serialization (PyArrow)
│   └── errors/                    # Error handling
├── db/                            # Database layer
│   ├── models.py                  # SQLAlchemy models
│   ├── session.py                 # DB session management
│   └── migrations/                # Alembic migrations
├── grpc_service/                  # gRPC implementation
│   ├── server.py                  # gRPC server
│   ├── interceptors/              # gRPC middleware
│   ├── protos/                    # Protocol buffers
│   └── handlers/                  # gRPC service handlers
├── worker/                        # Kafka consumers and processors
│   ├── main.py                    # Worker entry point
│   ├── consumers/                 # Kafka consumers
│   ├── producers/                 # Kafka producers
│   └── processors/                # Business logic processors
├── tests/                         # Test suite
│   ├── api/                       # API tests
│   ├── grpc/                      # gRPC tests
│   ├── worker/                    # Worker tests
│   └── integration/               # End-to-end tests
├── infrastructure/                # Deployment configuration
│   ├── docker/                    # Docker files
│   ├── kubernetes/                # K8s manifests
│   └── prometheus/                # Monitoring config
├── scripts/                       # Utility scripts
└── requirements.txt               # Project dependencies

Phase-wise Implementation Plan

Phase 1: Core API and Database (2 weeks)

Week 1: Project Setup and Database

Day 1-2: Project Initialization

  • requirements.txt - Initial dependencies
  • api/config.py - Application configuration
  • api/main.py - FastAPI app setup
  • api/dependencies.py - Core dependencies

Day 3-5: Database Setup

  • db/models.py - Create SQLAlchemy models for:
    • Accounts
    • Balances
    • Transactions
    • Transaction Logs
    • Users
  • db/session.py - Database connection management
  • db/migrations/ - Alembic setup and initial migration

Day 6-7: Core Schemas and Docker

  • api/schemas/account.py - Account Pydantic schemas
  • api/schemas/transaction.py - Transaction schemas
  • api/schemas/user.py - User schemas
  • infrastructure/docker/Dockerfile.api - API container
  • infrastructure/docker-compose.yml - Development environment

Week 2: API Implementation

Day 1-2: Account Endpoints

  • api/routes/accounts.py - Account API routes
  • api/services/account.py - Account business logic
  • api/middleware/auth.py - Authentication middleware

Day 3-5: Transaction Endpoints

  • api/routes/transactions.py - Transaction API routes
  • api/services/transaction.py - Basic transaction logic
  • api/utils/exceptions.py - Custom exceptions

Day 6-7: Testing

  • tests/api/test_accounts.py - Account API tests
  • tests/api/test_transactions.py - Transaction API tests
  • scripts/init_db.py - Database initialization script

Phase 2: Transaction Processing with Kafka (2 weeks)

Week 3: Kafka Infrastructure

Day 1-2: Kafka Base Setup

  • common/kafka/producer.py - Kafka producer base
  • common/kafka/consumer.py - Kafka consumer base
  • scripts/create_topics.py - Kafka topic creation

Day 3-5: Serialization and Worker Setup

  • common/serializers/arrow.py - PyArrow serialization
  • worker/main.py - Worker service initialization
  • worker/producers/transaction.py - Transaction producer
  • infrastructure/docker/Dockerfile.worker - Worker container
  • Update docker-compose.yml - Add Kafka services

Day 6-7: Transaction Processing

  • worker/consumers/transaction.py - Transaction consumer
  • worker/processors/transaction.py - Transaction processing logic
  • tests/worker/test_processors.py - Basic processor tests

Week 4: Advanced Transaction Processing

Day 1-2: Redis Caching

  • api/services/cache.py - Redis caching implementation
  • worker/consumers/balance.py - Balance update consumer

Day 3-5: Concurrency Management

  • Update api/services/transaction.py - Add optimistic locking
  • api/middleware/tracing.py - Transaction tracing
  • api/middleware/rate_limiter.py - Rate limiting with Redis

Day 6-7: Monitoring Setup

  • infrastructure/prometheus/prometheus.yml - Prometheus config
  • infrastructure/prometheus/grafana-dashboards/banking-dashboard.json - Grafana dashboard
  • common/errors/handlers.py - Error handlers with Kafka logging

Phase 3: High-Performance gRPC Services (1 week)

Week 5: gRPC Implementation

Day 1-2: gRPC Protocol Definition

  • grpc_service/protos/banking.proto - Service definitions
  • Generate Python stubs from proto file

Day 3-4: gRPC Server and Handler

  • grpc_service/server.py - gRPC server implementation
  • grpc_service/handlers/transaction.py - Transaction service handler
  • grpc_service/interceptors/auth.py - Authentication interceptor

Day 5-7: Integration and Testing

  • grpc_service/client.py - Internal gRPC client
  • Update api/services/transaction.py - Add gRPC client integration
  • tests/grpc/test_transactions.py - gRPC service tests
  • infrastructure/docker/Dockerfile.grpc - gRPC service container
  • Update docker-compose.yml - Add gRPC service

Phase 4: Reliability & Monitoring (1 week)

Week 6: Error Handling and Monitoring

Day 1-2: Error Handling

  • common/errors/exceptions.py - Enhanced exceptions
  • Update worker/consumers/transaction.py - Add dead letter queue
  • Update worker/processors/transaction.py - Add retry mechanisms

Day 3-4: Distributed Transactions

  • Update api/services/transaction.py - Two-phase commit with Kafka
  • Update worker/processors/transaction.py - Compensation handling
  • Update common/kafka/producer.py - Add transaction guarantees

Day 5-7: Monitoring and Alerting

  • worker/processors/fraud.py - Basic fraud detection
  • Update Prometheus and Grafana configurations
  • Add metrics collection to all services
  • Implement centralized logging with correlation IDs

Phase 5: Testing & Deployment (2 weeks)

Week 7: Testing

Day 1-3: Unit and Integration Tests

  • tests/conftest.py - Shared test fixtures
  • tests/integration/test_kafka.py - Kafka integration tests
  • tests/integration/test_transaction_flow.py - End-to-end tests

Day 4-7: Load Testing and Optimization

  • scripts/load_test.py - Load testing script
  • Performance optimization for critical paths
  • Tune PostgreSQL, Kafka, and Redis configurations
  • Implement connection pooling and caching improvements

Week 8: Kubernetes Deployment

Day 1-3: Kubernetes Manifests

  • infrastructure/kubernetes/api-deployment.yaml - API deployment
  • infrastructure/kubernetes/grpc-deployment.yaml - gRPC deployment
  • infrastructure/kubernetes/worker-deployment.yaml - Worker deployment
  • infrastructure/kubernetes/kafka-statefulset.yaml - Kafka StatefulSet
  • infrastructure/kubernetes/postgres-statefulset.yaml - Database StatefulSet

Day 4-7: CI/CD and Documentation

  • Setup CI/CD pipeline
  • Update README.md with complete documentation
  • Create runbooks for operations
  • Final performance testing and tuning

Key Files Implementation Details

1. api/main.py (Phase 1)

from fastapi import FastAPI
from starlette.middleware.cors import CORSMiddleware
from starlette.middleware.trustedhost import TrustedHostMiddleware

from api.routes import accounts, transactions
from api.middleware import auth, tracing, rate_limiter, security
from api.utils.exceptions import register_exception_handlers

app = FastAPI(title="Banking API", version="1.0.0")

# Middleware
app.add_middleware(TrustedHostMiddleware, allowed_hosts=["*"])
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
app.add_middleware(tracing.TransactionTracingMiddleware)
app.add_middleware(rate_limiter.RateLimitMiddleware)
app.add_middleware(auth.JWTAuthMiddleware)

# Register routes
app.include_router(accounts.router, prefix="/api/v1/accounts", tags=["accounts"])
app.include_router(transactions.router, prefix="/api/v1/transactions", tags=["transactions"])

# Register exception handlers
register_exception_handlers(app)

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

2. db/models.py (Phase 1)

from sqlalchemy import Column, String, Numeric, Boolean, DateTime, ForeignKey, Integer, Text, func
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
import uuid
from datetime import datetime

Base = declarative_base()

class Account(Base):
    __tablename__ = "accounts"
    
    account_id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    account_number = Column(String, unique=True, nullable=False)
    customer_id = Column(String, nullable=False)
    account_type = Column(String, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    is_active = Column(Boolean, default=True)
    
    # Relationships
    balances = relationship("Balance", back_populates="account")
    transactions = relationship("Transaction", back_populates="account")

class Balance(Base):
    __tablename__ = "balances"
    
    balance_id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    account_id = Column(UUID(as_uuid=True), ForeignKey("accounts.account_id"), nullable=False)
    current_balance = Column(Numeric(precision=18, scale=2), nullable=False, default=0)
    available_balance = Column(Numeric(precision=18, scale=2), nullable=False, default=0)
    last_updated = Column(DateTime, default=datetime.utcnow)
    version = Column(Integer, default=1, nullable=False)  # For optimistic locking
    
    # Relationships
    account = relationship("Account", back_populates="balances")

# Additional models for Transaction, TransactionLog, User

3. worker/processors/transaction.py (Phase 2)

from decimal import Decimal
import logging
from datetime import datetime
import uuid
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy import update

from db.models import Transaction, Balance, TransactionLog
from common.kafka.producer import TransactionProducer

logger = logging.getLogger(__name__)

class TransactionProcessor:
    def __init__(self, db_session, kafka_producer: TransactionProducer, redis_client=None):
        self.db = db_session
        self.kafka_producer = kafka_producer
        self.redis_client = redis_client
    
    async def process_debit(self, transaction_data):
        """Process a debit transaction with optimistic locking"""
        account_id = transaction_data["account_id"]
        amount = Decimal(str(transaction_data["amount"]))
        
        max_retries = 3
        retry_count = 0
        
        while retry_count < max_retries:
            try:
                # Get current balance with version
                async with self.db.begin():
                    balance = await self.db.execute(
                        select(Balance).where(Balance.account_id == account_id)
                    )
                    balance = balance.scalar_one()
                    
                    if balance.available_balance < amount:
                        await self._handle_insufficient_funds(transaction_data)
                        return
                    
                    # Update balance with version check
                    result = await self.db.execute(
                        update(Balance)
                        .where(
                            Balance.account_id == account_id,
                            Balance.version == balance.version
                        )
                        .values(
                            current_balance=Balance.current_balance - amount,
                            available_balance=Balance.available_balance - amount,
                            version=Balance.version + 1,
                            last_updated=datetime.utcnow()
                        )
                        .returning(Balance)
                    )
                    
                    updated_balance = result.scalar_one_or_none()
                    
                    if not updated_balance:
                        # Version conflict, retry
                        retry_count += 1
                        continue
                    
                    # Create transaction record
                    transaction = Transaction(
                        transaction_id=transaction_data.get("transaction_id", uuid.uuid4()),
                        account_id=account_id,
                        transaction_type="debit",
                        amount=amount,
                        currency=transaction_data.get("currency", "USD"),
                        description=transaction_data.get("description", ""),
                        reference_id=transaction_data.get("reference_id", ""),
                        status="completed",
                        created_at=datetime.utcnow()
                    )
                    
                    self.db.add(transaction)
                    await self.db.flush()
                    
                    # Log transaction
                    transaction_log = TransactionLog(
                        transaction_id=transaction.transaction_id,
                        event_type="transaction_completed",
                        metadata={"balance_after": str(updated_balance.current_balance)},
                        created_at=datetime.utcnow()
                    )
                    
                    self.db.add(transaction_log)
                
                # Invalidate cache if Redis is available
                if self.redis_client:
                    await self.redis_client.delete(f"balance:{account_id}")
                
                # Publish balance update event to Kafka
                await self.kafka_producer.send_transaction(
                    {
                        "account_id": str(account_id),
                        "balance": str(updated_balance.current_balance),
                        "available_balance": str(updated_balance.available_balance),
                        "version": updated_balance.version,
                        "timestamp": updated_balance.last_updated.isoformat()
                    },
                    "balances"
                )
                
                # Return successful transaction
                return {
                    "transaction_id": str(transaction.transaction_id),
                    "status": "completed",
                    "amount": str(amount),
                    "balance": str(updated_balance.current_balance)
                }
                
            except Exception as e:
                logger.error(f"Error processing debit: {e}")
                retry_count += 1
        
        # If we get here, all retries failed
        await self._handle_transaction_failure(transaction_data, "max_retries_exceeded")

4. grpc_service/server.py (Phase 3)

import grpc
import asyncio
from concurrent import futures
import logging

from grpc_service.handlers.transaction import TransactionServicer
from grpc_service.interceptors.auth import AuthInterceptor
import banking_pb2_grpc

logger = logging.getLogger(__name__)

async def serve(db_pool, kafka_producer, redis_client=None):
    """Start the gRPC server with all services registered"""
    server = grpc.aio.server(
        futures.ThreadPoolExecutor(max_workers=10),
        interceptors=[AuthInterceptor()]
    )
    
    # Register service handlers
    transaction_servicer = TransactionServicer(db_pool, kafka_producer, redis_client)
    banking_pb2_grpc.add_TransactionServiceServicer_to_server(
        transaction_servicer, server
    )
    
    server.add_insecure_port('[::]:50051')
    logger.info("Starting gRPC server on port 50051")
    
    await server.start()
    
    try:
        await server.wait_for_termination()
    except KeyboardInterrupt:
        await server.stop(0)

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    asyncio.run(serve())

This implementation plan provides a structured approach to building a scalable banking API with all the required components. The file structure is designed to separate concerns and make the codebase maintainable. The phase-wise plan ensures systematic development with clear milestones.

Banking API Architecture Diagrams

Here are several Mermaid diagrams that visualize different aspects of the Banking API architecture:

1. Component Architecture

flowchart TB
    Client[Client Applications] --> LB[Load Balancer]
    LB --> API[FastAPI Service]
    LB --> gRPCLB[gRPC Load Balancer]
    gRPCLB --> gRPC[gRPC Service]
    
    subgraph Data Storage
        DB[(PostgreSQL)]
        Redis[(Redis Cache)]
    end
    
    subgraph Messaging
        Kafka[Apache Kafka]
    end
    
    subgraph Processing
        TxWorker[Transaction Worker]
        AuditWorker[Audit Worker]
        FraudWorker[Fraud Detection]
    end
    
    subgraph Monitoring
        Prom[Prometheus]
        Graf[Grafana]
        Logs[Centralized Logging]
    end
    
    API --> DB
    API --> Redis
    API --> Kafka
    API <--> gRPC
    
    gRPC --> DB
    gRPC --> Kafka
    
    Kafka --> TxWorker
    Kafka --> AuditWorker
    Kafka --> FraudWorker
    
    TxWorker --> DB
    AuditWorker --> DB
    FraudWorker --> Kafka
    
    API --> Prom
    gRPC --> Prom
    Kafka --> Prom
    TxWorker --> Prom
    Prom --> Graf
    
    API --> Logs
    gRPC --> Logs
    TxWorker --> Logs
    AuditWorker --> Logs
    FraudWorker --> Logs
Loading

2. Transaction Flow

sequenceDiagram
    participant Client
    participant API as FastAPI Service
    participant Redis
    participant gRPC as gRPC Service
    participant Kafka
    participant Worker as Transaction Worker
    participant DB as PostgreSQL
    
    Client->>API: POST /api/v1/transactions/transfer
    
    API->>Redis: Acquire distributed lock
    Redis-->>API: Lock acquired
    
    API->>gRPC: TransferBetweenAccounts RPC
    
    gRPC->>DB: Begin transaction
    gRPC->>DB: Check source balance
    DB-->>gRPC: Balance OK
    
    gRPC->>Kafka: Publish prepare_transfer event
    
    gRPC->>DB: Debit source account
    DB-->>gRPC: Debit OK
    
    gRPC->>DB: Credit destination account
    DB-->>gRPC: Credit OK
    
    gRPC->>DB: Commit transaction
    DB-->>gRPC: Commit OK
    
    gRPC->>Kafka: Publish transfer_completed event
    gRPC-->>API: Transfer completed response
    
    API->>Redis: Release lock
    API-->>Client: 200 OK with transaction details
    
    Kafka-->>Worker: Consume transfer_completed event
    Worker->>DB: Log transaction details
    Worker->>Kafka: Publish balance_updated event
Loading

3. Data Flow Diagram

flowchart TD
    Client([Client]) -- HTTP Requests --> API
    InternalSystem([Internal System]) -- gRPC Calls --> gRPC
    
    subgraph API Layer
        API[FastAPI Service]
        gRPC[gRPC Service]
    end
    
    subgraph Data Layer
        DB[(PostgreSQL)]
        Cache[(Redis Cache)]
    end
    
    subgraph Event Processing
        Kafka[Kafka]
        TxProcessor[Transaction Processor]
        FraudDetector[Fraud Detection]
        Auditor[Audit Logger]
        Notifier[Notification Service]
    end
    
    API -- Read/Write --> DB
    API -- Cache Operations --> Cache
    API -- Publish Events --> Kafka
    API -- RPC Calls --> gRPC
    
    gRPC -- Read/Write --> DB
    gRPC -- Publish Events --> Kafka
    
    Kafka -- Transaction Events --> TxProcessor
    Kafka -- Account Activity --> FraudDetector
    Kafka -- All Events --> Auditor
    Kafka -- Status Updates --> Notifier
    
    TxProcessor -- Process Transactions --> DB
    FraudDetector -- Flag Suspicious --> Kafka
    Auditor -- Log Activities --> DB
    
    Notifier -- Send Notifications --> Client
Loading

4. Kubernetes Deployment Architecture

flowchart TB
    Internet((Internet)) --> Ingress[Ingress Controller]
    
    subgraph Kubernetes Cluster
        Ingress --> APIService[API Service]
        Ingress --> gRPCService[gRPC Service]
        
        APIService --> APIPod1[API Pod 1]
        APIService --> APIPod2[API Pod 2]
        APIService --> APIPod3[API Pod 3]
        
        gRPCService --> gRPCPod1[gRPC Pod 1]
        gRPCService --> gRPCPod2[gRPC Pod 2]
        
        subgraph Data Tier
            PostgresStatefulSet[Postgres StatefulSet]
            PostgresStatefulSet --> PostgresPrimary[Postgres Primary]
            PostgresStatefulSet --> PostgresReplica1[Postgres Replica 1]
            PostgresStatefulSet --> PostgresReplica2[Postgres Replica 2]
            
            RedisStatefulSet[Redis StatefulSet]
            RedisStatefulSet --> RedisMaster[Redis Master]
            RedisStatefulSet --> RedisSlave1[Redis Slave 1]
        end
        
        subgraph Event Processing
            KafkaStatefulSet[Kafka StatefulSet]
            KafkaStatefulSet --> KafkaBroker1[Kafka Broker 1]
            KafkaStatefulSet --> KafkaBroker2[Kafka Broker 2]
            KafkaStatefulSet --> KafkaBroker3[Kafka Broker 3]
            
            ZKStatefulSet[Zookeeper StatefulSet]
            ZKStatefulSet --> ZK1[Zookeeper 1]
            ZKStatefulSet --> ZK2[Zookeeper 2]
            ZKStatefulSet --> ZK3[Zookeeper 3]
            
            WorkerDeployment[Worker Deployment]
            WorkerDeployment --> WorkerPod1[Worker Pod 1]
            WorkerDeployment --> WorkerPod2[Worker Pod 2]
            WorkerDeployment --> WorkerPod3[Worker Pod 3]
        end
        
        subgraph Monitoring
            PrometheusDeploy[Prometheus]
            GrafanaDeploy[Grafana]
            AlertManagerDeploy[Alert Manager]
        end
    end
    
    APIPod1 --> PostgresStatefulSet
    APIPod1 --> RedisStatefulSet
    APIPod1 --> KafkaStatefulSet
    
    gRPCPod1 --> PostgresStatefulSet
    gRPCPod1 --> KafkaStatefulSet
    
    WorkerPod1 --> PostgresStatefulSet
    WorkerPod1 --> KafkaStatefulSet
    
    KafkaBroker1 --> ZKStatefulSet
Loading

5. Database Schema - Enhanced ERD

erDiagram
    ACCOUNTS ||--o{ BALANCES : "has current"
    ACCOUNTS ||--o{ TRANSACTIONS : "has many"
    TRANSACTIONS ||--o{ TRANSACTION_LOGS : "has logs"
    USERS ||--o{ ACCOUNTS : "owns"
    TRANSACTIONS }o--|| TRANSACTIONS : "relates to"
    
    ACCOUNTS {
        uuid account_id PK
        string account_number UK
        string customer_id FK
        string account_type
        timestamp created_at
        timestamp updated_at
        boolean is_active
    }
    
    BALANCES {
        uuid balance_id PK
        uuid account_id FK
        decimal current_balance
        decimal available_balance
        timestamp last_updated
        int64 version
    }
    
    TRANSACTIONS {
        uuid transaction_id PK
        uuid account_id FK
        string transaction_type
        decimal amount
        string currency
        string description
        string reference_id
        timestamp created_at
        string status
        uuid related_transaction_id FK
    }
    
    TRANSACTION_LOGS {
        uuid log_id PK
        uuid transaction_id FK
        string event_type
        jsonb metadata
        timestamp created_at
        string created_by
    }
    
    USERS {
        uuid user_id PK
        string username UK
        string password_hash
        string email UK
        string role
        boolean is_active
        timestamp created_at
        timestamp last_login
    }
Loading

6. Kafka Topic Architecture and Event Flow

flowchart LR
    API[API Service] --> SubmittedTopic
    gRPC[gRPC Service] --> SubmittedTopic
    
    subgraph Kafka Topics
        SubmittedTopic[banking.transactions.submitted]
        CompletedTopic[banking.transactions.completed]
        FailedTopic[banking.transactions.failed]
        BalancesTopic[banking.accounts.balances]
        FraudTopic[banking.fraud.alerts]
        NotificationsTopic[banking.notifications]
        DLQTopic[banking.transactions.dlq]
    end
    
    SubmittedTopic --> TxProcessor[Transaction Processor]
    TxProcessor --> CompletedTopic
    TxProcessor --> FailedTopic
    TxProcessor --> BalancesTopic
    
    FailedTopic --> RetryHandler[Retry Handler]
    RetryHandler -->|retry| SubmittedTopic
    RetryHandler -->|max retries| DLQTopic
    
    SubmittedTopic --> FraudDetector[Fraud Detector]
    FraudDetector --> FraudTopic
    
    CompletedTopic --> NotificationService[Notification Service]
    FailedTopic --> NotificationService
    FraudTopic --> NotificationService
    NotificationService --> NotificationsTopic
    
    BalancesTopic --> CacheInvalidator[Cache Invalidator]
    BalancesTopic --> AnalyticsProcessor[Analytics Processor]
Loading

7. Transaction Concurrency Management

sequenceDiagram
    participant C1 as Client 1
    participant C2 as Client 2
    participant API as API Service
    participant Redis as Redis
    participant DB as PostgreSQL
    
    C1->>API: Debit Account A
    C2->>API: Debit Account A (concurrent)
    
    API->>Redis: Acquire lock for Account A
    Redis-->>API: Lock granted
    
    C2->>API: Debit Account A
    API->>Redis: Try to acquire lock
    Redis-->>API: Lock denied
    API-->>C2: 429 Too Many Requests (retry)
    
    API->>DB: Get balance with version=5
    DB-->>API: Balance = $1000, version=5
    
    API->>DB: Update balance WHERE version=5
    Note over API,DB: SET balance=$900, version=6
    DB-->>API: Updated 1 row
    
    API->>Redis: Release lock
    Redis-->>API: Lock released
    
    API-->>C1: 200 OK, Transaction Complete
    
    C2->>API: Retry Debit Account A
    API->>Redis: Acquire lock for Account A
    Redis-->>API: Lock granted
    
    API->>DB: Get balance with version=6
    DB-->>API: Balance = $900, version=6
    
    API->>DB: Update balance WHERE version=6
    Note over API,DB: SET balance=$800, version=7
    DB-->>API: Updated 1 row
    
    API->>Redis: Release lock
    Redis-->>API: Lock released
    
    API-->>C2: 200 OK, Transaction Complete
Loading

These diagrams provide a comprehensive view of the Banking API's architecture, data flow, deployment strategy, and transaction handling mechanisms. They visualize how the different components interact and how data moves through the system, making it easier to understand the overall design and implementation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment