Skip to content

Instantly share code, notes, and snippets.

@paulosuzart
Last active August 25, 2018 15:57
Show Gist options
  • Save paulosuzart/d0f5756dfe75c05658d419c32246e27e to your computer and use it in GitHub Desktop.
Save paulosuzart/d0f5756dfe75c05658d419c32246e27e to your computer and use it in GitHub Desktop.
package com.example.demo.controllers;
import com.example.demo.rating.Limiter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@RestController("/sample")
public class LimitedController {
@GetMapping
public Mono<String> hello() {
return Mono.just("Big World");
}
}
package com.example.demo.controllers.filters;
import com.example.demo.rating.Limiter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
@Component
public class RateLimiterFilter implements WebFilter {
@Autowired
Limiter limiter;
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
//TODO: This will blow if no {@code apikey} is present
String apiKey = exchange.getRequest().getQueryParams().getFirst("apikey");
return limiter.allowRequest(apiKey).flatMap(count -> {
exchange.getResponse().getHeaders().add("rx-limiter-count", String.valueOf(count));
return chain.filter(exchange);
});
}
}
package com.example.demo.rating;
import java.util.ArrayDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentHashMap.KeySetView;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@Component
public class Limiter {
private ConcurrentHashMap<String, ArrayDeque<Long>> limitBuckets = new ConcurrentHashMap<>();
private static int LIMIT = 50;
private static long WINDOW_SIZE = 60000;
@PostConstruct
void init() {
Schedulers.newSingle("sample", false).schedulePeriodically(() -> {
KeySetView<String, ArrayDeque<Long>> keySet = limitBuckets.keySet();
long timestamp = System.currentTimeMillis();
long windowStart = timestamp - WINDOW_SIZE * 3;
keySet.removeIf(key -> limitBuckets.get(key).peekLast() < windowStart);
System.out.println("Cleaning Map " + limitBuckets);
}, 0, 1, TimeUnit.SECONDS);
}
public Mono<Integer> allowRequest(String userToken) {
return Mono.create(sink -> {
limitBuckets.compute(userToken, (token, window) -> {
if (window == null) {
window = new ArrayDeque<>();
}
long timestamp = System.currentTimeMillis();
long windowStart = timestamp - WINDOW_SIZE;
window.add(timestamp);
while (window.peek() < windowStart) {
window.poll();
}
if (window.size() <= LIMIT) {
sink.success(window.size());
} else {
sink.error(new RateLimitExceededException(window.size()));
}
return window;
});
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment