Skip to content

Instantly share code, notes, and snippets.

@danehammer
Created August 1, 2014 15:49
Show Gist options
  • Select an option

  • Save danehammer/b64c94b404680c485143 to your computer and use it in GitHub Desktop.

Select an option

Save danehammer/b64c94b404680c485143 to your computer and use it in GitHub Desktop.
example storm hook for reporting storm metrics to codahale metrics
import backtype.storm.generated.Grouping;
import backtype.storm.hooks.BaseTaskHook;
import backtype.storm.hooks.info.BoltAckInfo;
import backtype.storm.hooks.info.EmitInfo;
import backtype.storm.task.TopologyContext;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimaps;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.Timer;
/**
* A simple task hook that reports task metrics via the <a href="http://metrics.codahale.com/">Codehale Metrics API</a>.
*/
public class MetricsTaskHook extends BaseTaskHook {
private Meter ackMeter;
private Timer boltProcessLatencyTimer;
private Class<?> klass;
private ImmutableMap<Integer, Meter> taskToMeter;
/**
* Zero-arg constructor used when topology.auto.task.hooks is set in the Storm configuration. Metrics will be
* saved with the group "storm" and type "taskInfo".
*/
public MetricsTaskHook() { }
public MetricsTaskHook(Class<?> klass) {
this.klass = klass;
}
@Override
public void prepare(Map conf, TopologyContext context) {
ackMeter = Metrics.newMeter(getMetricName(klass, "acks", context), "acked", TimeUnit.SECONDS);
boltProcessLatencyTimer = Metrics.newTimer(getMetricName(klass, "boltProcessLatency", context), TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
ImmutableMap.Builder<Integer, Meter> taskToMeterBuilder = ImmutableMap.builder();
ListMultimap<String, Integer> componentToTasks = Multimaps.invertFrom(Multimaps.forMap(context.getTaskToComponent()),
ArrayListMultimap.<String, Integer>create());
//Map from stream id to component id to the Grouping used.
Map<String, Map<String, Grouping>> targets = context.getThisTargets();
for (Map.Entry<String, Map<String, Grouping>> entry : targets.entrySet()) {
for (String componentId : entry.getValue().keySet()) {
Meter meter = Metrics.newMeter(getMetricName(klass, componentId + "Emits", context), componentId + "Emitted", TimeUnit.SECONDS);
for (Integer task : componentToTasks.get(componentId)) {
taskToMeterBuilder.put(task, meter);
}
}
}
taskToMeter = taskToMeterBuilder.build();
}
@Override
public void boltAck(BoltAckInfo info) {
ackMeter.mark();
if (info.processLatencyMs != null) {
boltProcessLatencyTimer.update(info.processLatencyMs, TimeUnit.MILLISECONDS);
}
}
@Override
public void emit(EmitInfo info) {
for (Integer task : info.outTasks) {
Meter meter = taskToMeter.get(task);
if (meter != null) {
meter.mark();
}
}
}
protected MetricName getMetricName(Class<?> klass, String name, TopologyContext context) {
if (klass == null) {
return new MetricName("storm", "taskInfo", name, MetricsStormUtil.getTimerScope(context));
}
return new MetricName(klass, name, MetricsStormUtil.getTimerScope(context));
}
}
@vvarma
Copy link
Copy Markdown

vvarma commented Jun 30, 2015

Very useful. Thanks.!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment