Skip to content

Instantly share code, notes, and snippets.

@danehammer
Created August 1, 2014 15:49
Show Gist options
  • Save danehammer/b64c94b404680c485143 to your computer and use it in GitHub Desktop.
Save danehammer/b64c94b404680c485143 to your computer and use it in GitHub Desktop.
example storm hook for reporting storm metrics to codahale metrics
import backtype.storm.generated.Grouping;
import backtype.storm.hooks.BaseTaskHook;
import backtype.storm.hooks.info.BoltAckInfo;
import backtype.storm.hooks.info.EmitInfo;
import backtype.storm.task.TopologyContext;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimaps;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.Timer;
/**
* A simple task hook that reports task metrics via the <a href="http://metrics.codahale.com/">Codehale Metrics API</a>.
*/
public class MetricsTaskHook extends BaseTaskHook {
private Meter ackMeter;
private Timer boltProcessLatencyTimer;
private Class<?> klass;
private ImmutableMap<Integer, Meter> taskToMeter;
/**
* Zero-arg constructor used when topology.auto.task.hooks is set in the Storm configuration. Metrics will be
* saved with the group "storm" and type "taskInfo".
*/
public MetricsTaskHook() { }
public MetricsTaskHook(Class<?> klass) {
this.klass = klass;
}
@Override
public void prepare(Map conf, TopologyContext context) {
ackMeter = Metrics.newMeter(getMetricName(klass, "acks", context), "acked", TimeUnit.SECONDS);
boltProcessLatencyTimer = Metrics.newTimer(getMetricName(klass, "boltProcessLatency", context), TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
ImmutableMap.Builder<Integer, Meter> taskToMeterBuilder = ImmutableMap.builder();
ListMultimap<String, Integer> componentToTasks = Multimaps.invertFrom(Multimaps.forMap(context.getTaskToComponent()),
ArrayListMultimap.<String, Integer>create());
//Map from stream id to component id to the Grouping used.
Map<String, Map<String, Grouping>> targets = context.getThisTargets();
for (Map.Entry<String, Map<String, Grouping>> entry : targets.entrySet()) {
for (String componentId : entry.getValue().keySet()) {
Meter meter = Metrics.newMeter(getMetricName(klass, componentId + "Emits", context), componentId + "Emitted", TimeUnit.SECONDS);
for (Integer task : componentToTasks.get(componentId)) {
taskToMeterBuilder.put(task, meter);
}
}
}
taskToMeter = taskToMeterBuilder.build();
}
@Override
public void boltAck(BoltAckInfo info) {
ackMeter.mark();
if (info.processLatencyMs != null) {
boltProcessLatencyTimer.update(info.processLatencyMs, TimeUnit.MILLISECONDS);
}
}
@Override
public void emit(EmitInfo info) {
for (Integer task : info.outTasks) {
Meter meter = taskToMeter.get(task);
if (meter != null) {
meter.mark();
}
}
}
protected MetricName getMetricName(Class<?> klass, String name, TopologyContext context) {
if (klass == null) {
return new MetricName("storm", "taskInfo", name, MetricsStormUtil.getTimerScope(context));
}
return new MetricName(klass, name, MetricsStormUtil.getTimerScope(context));
}
}
@vvarma
Copy link

vvarma commented Jun 30, 2015

Very useful. Thanks.!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment