Skip to content

Instantly share code, notes, and snippets.

@Xorlev
Created October 24, 2014 18:27
Show Gist options
  • Save Xorlev/1ac3642828e40f7e59aa to your computer and use it in GitHub Desktop.
Save Xorlev/1ac3642828e40f7e59aa to your computer and use it in GitHub Desktop.
A rate limited spout wrapper using Archaius dynamic properties to control overall rate limit. Interrogates Storm for total number of spouts to divide rate limit evenly between spout instances.
package com.fullcontact.storm.spout;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import com.google.common.util.concurrent.RateLimiter;
import com.netflix.config.DynamicDoubleProperty;
import com.netflix.config.DynamicPropertyFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Rate limits a spout using rate information from spout.:namespace:.rate.limit. This limit is a total limit,
* the spout introspects how many instances it's running to determine its per-spout limit.
*
* Optionally takes a warmup time to "work up" to the rate limit over a period of time.
*
* @author Michael Rose <[email protected]>
*/
public class RateLimitingForwardingSpout implements IRichSpout {
private static final Logger log = LoggerFactory.getLogger(RateLimitingForwardingSpout.class);
private final String namespace;
private final long warmupPeriod;
private final TimeUnit warmupUnit;
private final IRichSpout delegate;
private RateLimiter rateLimiter;
private DynamicDoubleProperty property;
public RateLimitingForwardingSpout(String namespace, IRichSpout delegate) {
this(namespace, delegate, 0, TimeUnit.SECONDS);
}
public RateLimitingForwardingSpout(String namespace, IRichSpout delegate, long warmupPeriod, TimeUnit warmupUnit) {
this.namespace = namespace;
this.delegate = delegate;
this.warmupPeriod = warmupPeriod;
this.warmupUnit = warmupUnit;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
delegate.declareOutputFields(outputFieldsDeclarer);
}
@Override
public Map<String, Object> getComponentConfiguration() {
return delegate.getComponentConfiguration();
}
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
property = DynamicPropertyFactory.getInstance().getDoubleProperty("spout."+namespace+".rate.limit", 300);
String componentId = topologyContext.getThisComponentId();
final double totalLimit = property.get();
final int totalSpouts = topologyContext.getComponentTasks(componentId).size();
final double limit = totalLimit/totalSpouts;
if (warmupPeriod > 0) {
rateLimiter = RateLimiter.create(limit, warmupPeriod, warmupUnit);
log.info("Initializing {} spout with overall limit of {}/s, per instance of {}/s. Warmup over {} {}",
delegate.getClass().getSimpleName(), totalLimit, limit, warmupPeriod, warmupUnit);
} else {
rateLimiter = RateLimiter.create(limit);
log.info("Initializing {} spout with overall limit of {}/s, per instance of {}/s.",
delegate.getClass().getSimpleName(), totalLimit, limit);
}
property.addCallback(new Runnable() {
@Override
public void run() {
double newLimit = property.get()/totalSpouts;
log.info("Setting {} spout to overall limit of {}/s, per instance of {}/s.",
delegate.getClass().getSimpleName(), property.get(), newLimit);
rateLimiter.setRate(newLimit);
}
});
delegate.open(map, topologyContext, spoutOutputCollector);
}
@Override
public void close() {
delegate.close();
}
@Override
public void activate() {
delegate.activate();
}
@Override
public void deactivate() {
delegate.deactivate();
}
@Override
public void nextTuple() {
if (rateLimiter.tryAcquire(5, TimeUnit.MILLISECONDS)) {
delegate.nextTuple();
}
}
@Override
public void ack(Object o) {
delegate.ack(o);
}
@Override
public void fail(Object o) {
delegate.fail(o);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment