Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save lyminhtanh/8b02d3b87d953ce96be1cdfb58e9d5dd to your computer and use it in GitHub Desktop.
Save lyminhtanh/8b02d3b87d953ce96be1cdfb58e9d5dd to your computer and use it in GitHub Desktop.
Reactive có phải giải pháp duy nhất cho high traffic

gists:


From [[why reactive]] Thông tin bạn cung cấp về reactive programming là khá chính xác và thực tế. Tôi sẽ xác thực và bổ sung một số điểm quan trọng:

Những điểm chính xác

1. Về hiệu suất và tài nguyên:

  • Con số 10,000 threads cho 10,000 concurrent users là realistic với blocking model
  • R2DBC và WebClient thực sự giúp giảm thread usage đáng kể
  • Context switching overhead là vấn đề thực tế với nhiều threads

2. Về công nghệ:

  • R2DBC cho reactive database access là đúng
  • WebClient thay thế RestTemplate cho reactive HTTP client
  • Latency 300-500ms cho DB + HTTP call là hợp lý

🔧 Bổ sung và làm rõ

Thread Pool thực tế:

// Blocking (Traditional)
@RestController
public class ProductController {
    // Mỗi request = 1 thread từ Tomcat thread pool (default 200)
    // Với 10k concurrent users → thread pool exhausted
    
    @GetMapping("/product/{id}")
    public Product getProduct(@PathVariable String id) {
        Product product = productRepository.findById(id); // Block thread
        Inventory inventory = inventoryClient.getInventory(id); // Block thread
        return enrichProduct(product, inventory);
    }
}

// Reactive
@RestController  
public class ReactiveProductController {
    // Threads được release ngay khi gặp I/O, tái sử dụng cho requests khác
    
    @GetMapping("/product/{id}")
    public Mono<Product> getProduct(@PathVariable String id) {
        return productRepository.findById(id) // Non-blocking
            .flatMap(product -> 
                inventoryClient.getInventory(id) // Non-blocking
                    .map(inventory -> enrichProduct(product, inventory))
            );
    }
}

Thực tế về Thread Usage:

  • Blocking: Tomcat default thread pool = 200, có thể tăng lên ~1000-2000
  • Reactive: Netty event loop thường chỉ cần 2-4 threads/CPU core
  • Với 10k users: Blocking cần hàng nghìn threads, Reactive chỉ cần vài chục

Bổ sung quan trọng:

3. Backpressure handling:

// Reactive hỗ trợ backpressure tự động
Flux<Product> products = productService.getProducts()
    .onBackpressureBuffer(1000) // Buffer 1000 items
    .onBackpressureDrop(); // Drop excess items

4. Error handling phức tạp hơn:

// Blocking - đơn giản
try {
    return productService.getProduct(id);
} catch (Exception e) {
    return handleError(e);
}

// Reactive - cần reactive error handling
return productService.getProduct(id)
    .onErrorReturn(DefaultProduct.empty())
    .onErrorMap(DatabaseException.class, ex -> new ServiceException(ex));

5. Testing và debugging:

// Reactive testing cần StepVerifier
@Test
public void testReactiveFlow() {
    StepVerifier.create(productService.getProduct("123"))
        .expectNextMatches(product -> product.getId().equals("123"))
        .verifyComplete();
}

📊 Bổ sung bảng so sánh:

Tiêu chí Blocking Reactive Ghi chú
Memory usage Cao (mỗi thread ~1MB stack) Thấp 10k threads ≈ 10GB memory
Latency under load Tăng exponentially Ổn định Blocking có thread pool exhaustion
Learning curve Dễ Khó Reactive cần tư duy hoàn toàn khác
Ecosystem maturity Rất mature Đang phát triển Một số libs chưa hỗ trợ reactive
Error stack traces Rõ ràng Phức tạp Reactive stack traces khó debug

🎯 Khi nào nên dùng Reactive:

✅ Nên dùng khi:

  • High concurrency (>1000 concurrent users)
  • I/O intensive operations
  • Microservices với nhiều external calls
  • Real-time data streaming
  • Mobile apps với limited resources

❌ Không cần thiết khi:

  • Ứng dụng nhỏ (<100 concurrent users)
  • CPU-intensive tasks
  • Team thiếu kinh nghiệm reactive
  • Legacy systems với nhiều blocking dependencies

💡 Lời khuyên thực tế:

  1. Đừng reactive hoá toàn bộ - Bắt đầu với các endpoints high-traffic
  2. Monitoring quan trọng - Reactive metrics khác với blocking
  3. Test thoroughly - Reactive bugs khó reproduce
  4. Team training - Đầu tư thời gian học reactive patterns

Nhìn chung, phân tích của bạn rất thực tế và phù hợp với kinh nghiệm production. Bạn có muốn tôi demo code cụ thể cho case study này không?

🌊 Backpressure Handling - Giải thích chi tiết

Backpressure là gì?

Backpressure xảy ra khi producer tạo ra data nhanh hơn consumer xử lý. Giống như nước chảy vào bồn nhanh hơn thoát ra → bồn tràn.

Ví dụ thực tế:

// Producer tạo 1000 items/giây
// Consumer chỉ xử lý được 100 items/giây
// → 900 items/giây bị tích tụ → OutOfMemoryError

Backpressure Strategies trong Reactive:

// 1. BUFFER - Lưu trữ tạm thời
Flux<String> data = dataProducer()
    .onBackpressureBuffer(1000) // Buffer tối đa 1000 items
    .doOnNext(item -> processSlowly(item));

// 2. DROP - Bỏ qua items mới nhất
Flux<String> data = dataProducer()
    .onBackpressureDrop(dropped -> log.warn("Dropped: " + dropped))
    .doOnNext(item -> processSlowly(item));

// 3. LATEST - Chỉ giữ item mới nhất
Flux<String> data = dataProducer()
    .onBackpressureLatest()
    .doOnNext(item -> processSlowly(item));

// 4. ERROR - Throw exception
Flux<String> data = dataProducer()
    .onBackpressureError() // Ném BackpressureException
    .doOnNext(item -> processSlowly(item));

Ví dụ thực tế - Live Trading System:

@Service
public class TradingDataService {
    
    // Market data stream - 10,000 updates/second
    public Flux<MarketData> getMarketDataStream() {
        return marketDataSource.stream()
            .onBackpressureLatest() // Chỉ quan tâm giá mới nhất
            .sample(Duration.ofMillis(100)); // Sample mỗi 100ms
    }
    
    // Order processing - có thể xử lý 500 orders/second => DO NOT DO THIS
    public Flux<OrderResult> processOrders() {
        return orderStream
            .onBackpressureBuffer(1000, // Buffer 1000 orders
                BufferOverflowStrategy.DROP_OLDEST) // Drop orders cũ
            .flatMap(order -> processOrder(order), 10); // Max 10 concurrent
    }
}

🚀 Reactive có phải giải pháp duy nhất cho high traffic?

KHÔNG! Có nhiều giải pháp khác:

1. Load Balancing + Horizontal Scaling

# Multiple instances behind load balancer
Server 1: 1000 users
Server 2: 1000 users  
Server 3: 1000 users
→ Total: 3000 users với blocking architecture

2. Connection Pooling + Tuning

// Tối ưu connection pool
@Configuration
public class DatabaseConfig {
    @Bean
    public DataSource dataSource() {
        HikariConfig config = new HikariConfig();
        config.setMaximumPoolSize(50); // Tăng pool size
        config.setMinimumIdle(10);
        config.setConnectionTimeout(5000);
        return new HikariDataSource(config);
    }
}

// Tối ưu thread pool
server.tomcat.threads.max=500
server.tomcat.threads.min-spare=50

3. Caching Strategy

@Service
public class ProductService {
    
    @Cacheable("products")
    public Product getProduct(String id) {
        // Chỉ hit DB khi cache miss
        return productRepository.findById(id);
    }
    
    @CacheEvict(value = "products", key = "#product.id")
    public Product updateProduct(Product product) {
        return productRepository.save(product);
    }
}

4. Async Processing

@Service
public class OrderService {
    
    @Async("taskExecutor")
    public CompletableFuture<OrderResult> processOrderAsync(Order order) {
        // Xử lý async, không block main thread
        OrderResult result = heavyProcessing(order);
        return CompletableFuture.completedFuture(result);
    }
}

5. Message Queue + Event Driven

// Producer
@Service
public class OrderProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void submitOrder(Order order) {
        rabbitTemplate.convertAndSend("order.queue", order);
        // Immediate response, actual processing happens async
    }
}

// Consumer
@RabbitListener(queues = "order.queue")
public void processOrder(Order order) {
    // Process in background
    orderService.processOrder(order);
}

📊 So sánh các giải pháp:

Giải pháp Cost Complexity Scalability Time to implement
Horizontal Scaling Cao Thấp Tốt Nhanh
Connection Tuning Thấp Thấp Trung bình Rất nhanh
Caching Thấp Trung bình Tốt Nhanh
Async Processing Thấp Trung bình Tốt Trung bình
Message Queue Trung bình Trung bình Rất tốt Trung bình
Reactive Thấp Cao Rất tốt Lâu

🎯 Khi nào chọn giải pháp nào:

Chọn Horizontal Scaling khi:

  • Budget không hạn chế
  • Cần giải pháp nhanh
  • Team thiếu kinh nghiệm về optimization

Chọn Caching khi:

  • Data ít thay đổi
  • Read-heavy workload
  • Có pattern truy cập lặp lại

Chọn Message Queue khi:

  • Có thể process async
  • Cần reliability (retry, dead letter queue)
  • Các services cần decouple

Chọn Reactive khi:

  • I/O intensive
  • Cần optimize resource usage
  • Team có kinh nghiệm reactive
  • Long-term investment

💡 Thực tế trong Production:

Hybrid approach thường là tốt nhất:

@RestController
public class OptimizedController {
    
    // 1. Cache layer
    @Cacheable("products")
    @GetMapping("/product/{id}")
    public ResponseEntity<Product> getProduct(@PathVariable String id) {
        
        // 2. Async processing for heavy operations  
        CompletableFuture<Product> productFuture = productService.getProductAsync(id);
        CompletableFuture<Inventory> inventoryFuture = inventoryService.getInventoryAsync(id);
        
        // 3. Combine results
        return ResponseEntity.ok(
            productFuture.thenCombine(inventoryFuture, this::combineProductAndInventory).get()
        );
    }
}

Kết luận: Reactive là một trong nhiều giải pháp. Lựa chọn phụ thuộc vào context cụ thể, team capability, và business requirements. Thường thì combination của nhiều approaches sẽ cho kết quả tốt nhất.

🔍 Phân tích chi tiết đoạn code:

Ý nghĩa từng dòng:

public Flux<OrderResult> processOrders() {
    return orderStream                           // 1. Stream orders đầu vào
        .onBackpressureBuffer(1000,             // 2. Buffer tối đa 1000 orders
            BufferOverflowStrategy.DROP_OLDEST) // 3. Khi buffer đầy → xóa order cũ nhất
        .flatMap(order -> processOrder(order), 10); // 4. Xử lý max 10 orders đồng thời
}

Chi tiết hoạt động:

1. orderStream: Nguồn orders đầu vào (có thể 10,000/giây) 2. onBackpressureBuffer(1000): Tạo buffer chứa 1000 orders chờ xử lý 3. DROP_OLDEST: Khi buffer đầy → xóa order cũ nhất để nhận order mới 4. flatMap(..., 10): Chỉ xử lý tối đa 10 orders cùng lúc


📊 Ví dụ với 10 triệu requests:

Scenario thực tế:

Incoming: 10,000,000 orders trong 1 giờ
= 2,778 orders/giây
Processing capacity: 500 orders/giây

Timeline hoạt động:

// T=0: Hệ thống start
Buffer: [empty] (0/1000)
Processing: 0/10 concurrent slots
Queue outside buffer: 2,778 orders/giây đang đến

// T=1 giây
Buffer: [Order1...Order1000] (1000/1000) - FULL!
Processing: [Order1, Order2...Order10] (10/10) - FULL!
Dropped: 1,778 orders (2,778 - 1000 buffer slots)

// T=2 giây  
Buffer: [Order1001...Order2000] (1000/1000)
Processing: [Order11, Order12...Order20] (10/10)
Completed: 10 orders từ batch đầu
Dropped: 1,778 orders nữa

// Pattern lặp lại...

Kết quả sau 1 giờ:

// Tổng orders processed
Processed = 500 orders/giây × 3600 giây = 1,800,000 orders

// Orders bị drop
Dropped = 10,000,000 - 1,800,000 = 8,200,000 orders (82%!)

// Orders trong buffer cuối cùng
Buffer = 1,000 orders (chờ xử )

⚠️ Vấn đề nghiêm trọng:

1. Mất dữ liệu khủng khiếp:

// 82% orders bị mất!
// Trong trading system → mất 8.2 triệu orders = thảm họa

2. FIFO không được đảm bảo:

// Orders cũ bị drop → orders mới được xử lý trước
// Order từ 10:00 AM bị drop
// Order từ 10:30 AM được xử lý

🛠️ Giải pháp tốt hơn:

Option 1: Persistent Queue + Multiple Workers

@Service
public class ImprovedOrderService {
    
    // Dùng Redis/RabbitMQ thay vì memory buffer
    @RabbitListener(queues = "order.queue", concurrency = "50-100")
    public void processOrder(Order order) {
        try {
            orderProcessor.process(order);
        } catch (Exception e) {
            // Retry mechanism
            retryTemplate.execute(() -> orderProcessor.process(order));
        }
    }
    
    // Producer không bị block
    public void submitOrder(Order order) {
        rabbitTemplate.convertAndSend("order.queue", order);
        // Immediate response
    }
}

Option 2: Database Queue + Batch Processing

@Service 
public class DatabaseQueueService {
    
    // Lưu tất cả orders vào DB
    public void submitOrder(Order order) {
        order.setStatus(OrderStatus.PENDING);
        orderRepository.save(order); // Không mất data
    }
    
    // Background job xử lý batch
    @Scheduled(fixedDelay = 1000)
    public void processPendingOrders() {
        List<Order> pendingOrders = orderRepository
            .findTop500ByStatusOrderByCreatedAt(OrderStatus.PENDING);
            
        pendingOrders.parallelStream()
            .forEach(this::processOrder);
    }
}

Option 3: Reactive với External Queue

@Service
public class ReactiveOrderService {
    
    public Mono<Void> submitOrder(Order order) {
        return redisTemplate.opsForList()
            .leftPush("order.queue", order) // Persist in Redis
            .then(); // Non-blocking
    }
    
    // Separate consumer
    public Flux<OrderResult> processOrdersFromQueue() {
        return Flux.interval(Duration.ofMillis(10))
            .flatMap(tick -> 
                redisTemplate.opsForList().rightPop("order.queue"))
            .filter(Objects::nonNull)
            .flatMap(order -> processOrder(order), 50) // 50 concurrent
            .onErrorContinue((error, order) -> {
                log.error("Failed to process order: " + order, error);
                // Dead letter queue
                redisTemplate.opsForList().leftPush("order.dlq", order);
            });
    }
}

📈 So sánh hiệu suất:

Approach Throughput Data Loss Complexity Cost
Original Code 500/s 82% Thấp Thấp
RabbitMQ 5,000/s 0% Trung bình Trung bình
Database Queue 2,000/s 0% Thấp Thấp
Redis + Reactive 10,000/s ~0% Cao Trung bình

💡 Bài học:

1. In-memory buffer không phù hợp cho high volume:

// ❌ Không bao giờ làm thế này với 10M requests
.onBackpressureBuffer(1000, DROP_OLDEST)

// ✅ Dùng persistent queue
.publishOn(Schedulers.boundedElastic())
.flatMap(order -> persistentQueue.enqueue(order))

2. Luôn tính toán capacity:

// Input rate: 2,778/s
// Processing rate: 500/s  
// Deficit: 2,278/s
// → Cần scale processing hoặc reduce input

3. Monitor và alert:

@Component
public class OrderMetrics {
    private final MeterRegistry meterRegistry;
    
    public void recordDroppedOrders(int count) {
        meterRegistry.counter("orders.dropped").increment(count);
        
        if (count > 100) {
            alertService.sendAlert("High order drop rate: " + count);
        }
    }
}

Kết luận: Code gốc phù hợp cho demo/test nhưng không thể dùng production với 10M requests. Cần persistent queue + proper scaling strategy.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment