gists:
- id: 8b02d3b87d953ce96be1cdfb58e9d5dd url: 'https://gist.github.com/lyminhtanh/8b02d3b87d953ce96be1cdfb58e9d5dd' createdAt: '2025-06-12T05:14:55Z' updatedAt: '2025-06-12T05:14:55Z' filename: Reactive có phải giải pháp duy nhất cho high traffic?.md isPublic: false baseUrl: 'https://api.github.com'
From [[why reactive]] Thông tin bạn cung cấp về reactive programming là khá chính xác và thực tế. Tôi sẽ xác thực và bổ sung một số điểm quan trọng:
1. Về hiệu suất và tài nguyên:
- Con số 10,000 threads cho 10,000 concurrent users là realistic với blocking model
- R2DBC và WebClient thực sự giúp giảm thread usage đáng kể
- Context switching overhead là vấn đề thực tế với nhiều threads
2. Về công nghệ:
- R2DBC cho reactive database access là đúng
- WebClient thay thế RestTemplate cho reactive HTTP client
- Latency 300-500ms cho DB + HTTP call là hợp lý
// Blocking (Traditional)
@RestController
public class ProductController {
// Mỗi request = 1 thread từ Tomcat thread pool (default 200)
// Với 10k concurrent users → thread pool exhausted
@GetMapping("/product/{id}")
public Product getProduct(@PathVariable String id) {
Product product = productRepository.findById(id); // Block thread
Inventory inventory = inventoryClient.getInventory(id); // Block thread
return enrichProduct(product, inventory);
}
}
// Reactive
@RestController
public class ReactiveProductController {
// Threads được release ngay khi gặp I/O, tái sử dụng cho requests khác
@GetMapping("/product/{id}")
public Mono<Product> getProduct(@PathVariable String id) {
return productRepository.findById(id) // Non-blocking
.flatMap(product ->
inventoryClient.getInventory(id) // Non-blocking
.map(inventory -> enrichProduct(product, inventory))
);
}
}
- Blocking: Tomcat default thread pool = 200, có thể tăng lên ~1000-2000
- Reactive: Netty event loop thường chỉ cần 2-4 threads/CPU core
- Với 10k users: Blocking cần hàng nghìn threads, Reactive chỉ cần vài chục
3. Backpressure handling:
// Reactive hỗ trợ backpressure tự động
Flux<Product> products = productService.getProducts()
.onBackpressureBuffer(1000) // Buffer 1000 items
.onBackpressureDrop(); // Drop excess items
4. Error handling phức tạp hơn:
// Blocking - đơn giản
try {
return productService.getProduct(id);
} catch (Exception e) {
return handleError(e);
}
// Reactive - cần reactive error handling
return productService.getProduct(id)
.onErrorReturn(DefaultProduct.empty())
.onErrorMap(DatabaseException.class, ex -> new ServiceException(ex));
5. Testing và debugging:
// Reactive testing cần StepVerifier
@Test
public void testReactiveFlow() {
StepVerifier.create(productService.getProduct("123"))
.expectNextMatches(product -> product.getId().equals("123"))
.verifyComplete();
}
Tiêu chí | Blocking | Reactive | Ghi chú |
---|---|---|---|
Memory usage | Cao (mỗi thread ~1MB stack) | Thấp | 10k threads ≈ 10GB memory |
Latency under load | Tăng exponentially | Ổn định | Blocking có thread pool exhaustion |
Learning curve | Dễ | Khó | Reactive cần tư duy hoàn toàn khác |
Ecosystem maturity | Rất mature | Đang phát triển | Một số libs chưa hỗ trợ reactive |
Error stack traces | Rõ ràng | Phức tạp | Reactive stack traces khó debug |
✅ Nên dùng khi:
- High concurrency (>1000 concurrent users)
- I/O intensive operations
- Microservices với nhiều external calls
- Real-time data streaming
- Mobile apps với limited resources
❌ Không cần thiết khi:
- Ứng dụng nhỏ (<100 concurrent users)
- CPU-intensive tasks
- Team thiếu kinh nghiệm reactive
- Legacy systems với nhiều blocking dependencies
- Đừng reactive hoá toàn bộ - Bắt đầu với các endpoints high-traffic
- Monitoring quan trọng - Reactive metrics khác với blocking
- Test thoroughly - Reactive bugs khó reproduce
- Team training - Đầu tư thời gian học reactive patterns
Nhìn chung, phân tích của bạn rất thực tế và phù hợp với kinh nghiệm production. Bạn có muốn tôi demo code cụ thể cho case study này không?
Backpressure xảy ra khi producer tạo ra data nhanh hơn consumer xử lý. Giống như nước chảy vào bồn nhanh hơn thoát ra → bồn tràn.
// Producer tạo 1000 items/giây
// Consumer chỉ xử lý được 100 items/giây
// → 900 items/giây bị tích tụ → OutOfMemoryError
// 1. BUFFER - Lưu trữ tạm thời
Flux<String> data = dataProducer()
.onBackpressureBuffer(1000) // Buffer tối đa 1000 items
.doOnNext(item -> processSlowly(item));
// 2. DROP - Bỏ qua items mới nhất
Flux<String> data = dataProducer()
.onBackpressureDrop(dropped -> log.warn("Dropped: " + dropped))
.doOnNext(item -> processSlowly(item));
// 3. LATEST - Chỉ giữ item mới nhất
Flux<String> data = dataProducer()
.onBackpressureLatest()
.doOnNext(item -> processSlowly(item));
// 4. ERROR - Throw exception
Flux<String> data = dataProducer()
.onBackpressureError() // Ném BackpressureException
.doOnNext(item -> processSlowly(item));
@Service
public class TradingDataService {
// Market data stream - 10,000 updates/second
public Flux<MarketData> getMarketDataStream() {
return marketDataSource.stream()
.onBackpressureLatest() // Chỉ quan tâm giá mới nhất
.sample(Duration.ofMillis(100)); // Sample mỗi 100ms
}
// Order processing - có thể xử lý 500 orders/second => DO NOT DO THIS
public Flux<OrderResult> processOrders() {
return orderStream
.onBackpressureBuffer(1000, // Buffer 1000 orders
BufferOverflowStrategy.DROP_OLDEST) // Drop orders cũ
.flatMap(order -> processOrder(order), 10); // Max 10 concurrent
}
}
# Multiple instances behind load balancer
Server 1: 1000 users
Server 2: 1000 users
Server 3: 1000 users
→ Total: 3000 users với blocking architecture
// Tối ưu connection pool
@Configuration
public class DatabaseConfig {
@Bean
public DataSource dataSource() {
HikariConfig config = new HikariConfig();
config.setMaximumPoolSize(50); // Tăng pool size
config.setMinimumIdle(10);
config.setConnectionTimeout(5000);
return new HikariDataSource(config);
}
}
// Tối ưu thread pool
server.tomcat.threads.max=500
server.tomcat.threads.min-spare=50
@Service
public class ProductService {
@Cacheable("products")
public Product getProduct(String id) {
// Chỉ hit DB khi cache miss
return productRepository.findById(id);
}
@CacheEvict(value = "products", key = "#product.id")
public Product updateProduct(Product product) {
return productRepository.save(product);
}
}
@Service
public class OrderService {
@Async("taskExecutor")
public CompletableFuture<OrderResult> processOrderAsync(Order order) {
// Xử lý async, không block main thread
OrderResult result = heavyProcessing(order);
return CompletableFuture.completedFuture(result);
}
}
// Producer
@Service
public class OrderProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void submitOrder(Order order) {
rabbitTemplate.convertAndSend("order.queue", order);
// Immediate response, actual processing happens async
}
}
// Consumer
@RabbitListener(queues = "order.queue")
public void processOrder(Order order) {
// Process in background
orderService.processOrder(order);
}
Giải pháp | Cost | Complexity | Scalability | Time to implement |
---|---|---|---|---|
Horizontal Scaling | Cao | Thấp | Tốt | Nhanh |
Connection Tuning | Thấp | Thấp | Trung bình | Rất nhanh |
Caching | Thấp | Trung bình | Tốt | Nhanh |
Async Processing | Thấp | Trung bình | Tốt | Trung bình |
Message Queue | Trung bình | Trung bình | Rất tốt | Trung bình |
Reactive | Thấp | Cao | Rất tốt | Lâu |
- Budget không hạn chế
- Cần giải pháp nhanh
- Team thiếu kinh nghiệm về optimization
- Data ít thay đổi
- Read-heavy workload
- Có pattern truy cập lặp lại
- Có thể process async
- Cần reliability (retry, dead letter queue)
- Các services cần decouple
- I/O intensive
- Cần optimize resource usage
- Team có kinh nghiệm reactive
- Long-term investment
Hybrid approach thường là tốt nhất:
@RestController
public class OptimizedController {
// 1. Cache layer
@Cacheable("products")
@GetMapping("/product/{id}")
public ResponseEntity<Product> getProduct(@PathVariable String id) {
// 2. Async processing for heavy operations
CompletableFuture<Product> productFuture = productService.getProductAsync(id);
CompletableFuture<Inventory> inventoryFuture = inventoryService.getInventoryAsync(id);
// 3. Combine results
return ResponseEntity.ok(
productFuture.thenCombine(inventoryFuture, this::combineProductAndInventory).get()
);
}
}
Kết luận: Reactive là một trong nhiều giải pháp. Lựa chọn phụ thuộc vào context cụ thể, team capability, và business requirements. Thường thì combination của nhiều approaches sẽ cho kết quả tốt nhất.
public Flux<OrderResult> processOrders() {
return orderStream // 1. Stream orders đầu vào
.onBackpressureBuffer(1000, // 2. Buffer tối đa 1000 orders
BufferOverflowStrategy.DROP_OLDEST) // 3. Khi buffer đầy → xóa order cũ nhất
.flatMap(order -> processOrder(order), 10); // 4. Xử lý max 10 orders đồng thời
}
1. orderStream
: Nguồn orders đầu vào (có thể 10,000/giây) 2. onBackpressureBuffer(1000)
: Tạo buffer chứa 1000 orders chờ xử lý 3. DROP_OLDEST
: Khi buffer đầy → xóa order cũ nhất để nhận order mới 4. flatMap(..., 10)
: Chỉ xử lý tối đa 10 orders cùng lúc
Incoming: 10,000,000 orders trong 1 giờ
= 2,778 orders/giây
Processing capacity: 500 orders/giây
// T=0: Hệ thống start
Buffer: [empty] (0/1000)
Processing: 0/10 concurrent slots
Queue outside buffer: 2,778 orders/giây đang đến
// T=1 giây
Buffer: [Order1...Order1000] (1000/1000) - FULL!
Processing: [Order1, Order2...Order10] (10/10) - FULL!
Dropped: 1,778 orders (2,778 - 1000 buffer slots)
// T=2 giây
Buffer: [Order1001...Order2000] (1000/1000)
Processing: [Order11, Order12...Order20] (10/10)
Completed: 10 orders từ batch đầu
Dropped: 1,778 orders nữa
// Pattern lặp lại...
// Tổng orders processed
Processed = 500 orders/giây × 3600 giây = 1,800,000 orders
// Orders bị drop
Dropped = 10,000,000 - 1,800,000 = 8,200,000 orders (82%!)
// Orders trong buffer cuối cùng
Buffer = 1,000 orders (chờ xử lý)
// 82% orders bị mất!
// Trong trading system → mất 8.2 triệu orders = thảm họa
// Orders cũ bị drop → orders mới được xử lý trước
// Order từ 10:00 AM bị drop
// Order từ 10:30 AM được xử lý
@Service
public class ImprovedOrderService {
// Dùng Redis/RabbitMQ thay vì memory buffer
@RabbitListener(queues = "order.queue", concurrency = "50-100")
public void processOrder(Order order) {
try {
orderProcessor.process(order);
} catch (Exception e) {
// Retry mechanism
retryTemplate.execute(() -> orderProcessor.process(order));
}
}
// Producer không bị block
public void submitOrder(Order order) {
rabbitTemplate.convertAndSend("order.queue", order);
// Immediate response
}
}
@Service
public class DatabaseQueueService {
// Lưu tất cả orders vào DB
public void submitOrder(Order order) {
order.setStatus(OrderStatus.PENDING);
orderRepository.save(order); // Không mất data
}
// Background job xử lý batch
@Scheduled(fixedDelay = 1000)
public void processPendingOrders() {
List<Order> pendingOrders = orderRepository
.findTop500ByStatusOrderByCreatedAt(OrderStatus.PENDING);
pendingOrders.parallelStream()
.forEach(this::processOrder);
}
}
@Service
public class ReactiveOrderService {
public Mono<Void> submitOrder(Order order) {
return redisTemplate.opsForList()
.leftPush("order.queue", order) // Persist in Redis
.then(); // Non-blocking
}
// Separate consumer
public Flux<OrderResult> processOrdersFromQueue() {
return Flux.interval(Duration.ofMillis(10))
.flatMap(tick ->
redisTemplate.opsForList().rightPop("order.queue"))
.filter(Objects::nonNull)
.flatMap(order -> processOrder(order), 50) // 50 concurrent
.onErrorContinue((error, order) -> {
log.error("Failed to process order: " + order, error);
// Dead letter queue
redisTemplate.opsForList().leftPush("order.dlq", order);
});
}
}
Approach | Throughput | Data Loss | Complexity | Cost |
---|---|---|---|---|
Original Code | 500/s | 82% | Thấp | Thấp |
RabbitMQ | 5,000/s | 0% | Trung bình | Trung bình |
Database Queue | 2,000/s | 0% | Thấp | Thấp |
Redis + Reactive | 10,000/s | ~0% | Cao | Trung bình |
// ❌ Không bao giờ làm thế này với 10M requests
.onBackpressureBuffer(1000, DROP_OLDEST)
// ✅ Dùng persistent queue
.publishOn(Schedulers.boundedElastic())
.flatMap(order -> persistentQueue.enqueue(order))
// Input rate: 2,778/s
// Processing rate: 500/s
// Deficit: 2,278/s
// → Cần scale processing hoặc reduce input
@Component
public class OrderMetrics {
private final MeterRegistry meterRegistry;
public void recordDroppedOrders(int count) {
meterRegistry.counter("orders.dropped").increment(count);
if (count > 100) {
alertService.sendAlert("High order drop rate: " + count);
}
}
}
Kết luận: Code gốc phù hợp cho demo/test nhưng không thể dùng production với 10M requests. Cần persistent queue + proper scaling strategy.