Created
October 24, 2014 18:27
-
-
Save Xorlev/1ac3642828e40f7e59aa to your computer and use it in GitHub Desktop.
A rate limited spout wrapper using Archaius dynamic properties to control overall rate limit. Interrogates Storm for total number of spouts to divide rate limit evenly between spout instances.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.fullcontact.storm.spout; | |
import backtype.storm.spout.SpoutOutputCollector; | |
import backtype.storm.task.TopologyContext; | |
import backtype.storm.topology.IRichSpout; | |
import backtype.storm.topology.OutputFieldsDeclarer; | |
import com.google.common.util.concurrent.RateLimiter; | |
import com.netflix.config.DynamicDoubleProperty; | |
import com.netflix.config.DynamicPropertyFactory; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.util.Map; | |
import java.util.concurrent.TimeUnit; | |
/** | |
* Rate limits a spout using rate information from spout.:namespace:.rate.limit. This limit is a total limit, | |
* the spout introspects how many instances it's running to determine its per-spout limit. | |
* | |
* Optionally takes a warmup time to "work up" to the rate limit over a period of time. | |
* | |
* @author Michael Rose <[email protected]> | |
*/ | |
public class RateLimitingForwardingSpout implements IRichSpout { | |
private static final Logger log = LoggerFactory.getLogger(RateLimitingForwardingSpout.class); | |
private final String namespace; | |
private final long warmupPeriod; | |
private final TimeUnit warmupUnit; | |
private final IRichSpout delegate; | |
private RateLimiter rateLimiter; | |
private DynamicDoubleProperty property; | |
public RateLimitingForwardingSpout(String namespace, IRichSpout delegate) { | |
this(namespace, delegate, 0, TimeUnit.SECONDS); | |
} | |
public RateLimitingForwardingSpout(String namespace, IRichSpout delegate, long warmupPeriod, TimeUnit warmupUnit) { | |
this.namespace = namespace; | |
this.delegate = delegate; | |
this.warmupPeriod = warmupPeriod; | |
this.warmupUnit = warmupUnit; | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { | |
delegate.declareOutputFields(outputFieldsDeclarer); | |
} | |
@Override | |
public Map<String, Object> getComponentConfiguration() { | |
return delegate.getComponentConfiguration(); | |
} | |
@Override | |
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { | |
property = DynamicPropertyFactory.getInstance().getDoubleProperty("spout."+namespace+".rate.limit", 300); | |
String componentId = topologyContext.getThisComponentId(); | |
final double totalLimit = property.get(); | |
final int totalSpouts = topologyContext.getComponentTasks(componentId).size(); | |
final double limit = totalLimit/totalSpouts; | |
if (warmupPeriod > 0) { | |
rateLimiter = RateLimiter.create(limit, warmupPeriod, warmupUnit); | |
log.info("Initializing {} spout with overall limit of {}/s, per instance of {}/s. Warmup over {} {}", | |
delegate.getClass().getSimpleName(), totalLimit, limit, warmupPeriod, warmupUnit); | |
} else { | |
rateLimiter = RateLimiter.create(limit); | |
log.info("Initializing {} spout with overall limit of {}/s, per instance of {}/s.", | |
delegate.getClass().getSimpleName(), totalLimit, limit); | |
} | |
property.addCallback(new Runnable() { | |
@Override | |
public void run() { | |
double newLimit = property.get()/totalSpouts; | |
log.info("Setting {} spout to overall limit of {}/s, per instance of {}/s.", | |
delegate.getClass().getSimpleName(), property.get(), newLimit); | |
rateLimiter.setRate(newLimit); | |
} | |
}); | |
delegate.open(map, topologyContext, spoutOutputCollector); | |
} | |
@Override | |
public void close() { | |
delegate.close(); | |
} | |
@Override | |
public void activate() { | |
delegate.activate(); | |
} | |
@Override | |
public void deactivate() { | |
delegate.deactivate(); | |
} | |
@Override | |
public void nextTuple() { | |
if (rateLimiter.tryAcquire(5, TimeUnit.MILLISECONDS)) { | |
delegate.nextTuple(); | |
} | |
} | |
@Override | |
public void ack(Object o) { | |
delegate.ack(o); | |
} | |
@Override | |
public void fail(Object o) { | |
delegate.fail(o); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment