Created
August 1, 2014 15:49
-
-
Save danehammer/b64c94b404680c485143 to your computer and use it in GitHub Desktop.
example storm hook for reporting storm metrics to codahale metrics
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import backtype.storm.generated.Grouping; | |
import backtype.storm.hooks.BaseTaskHook; | |
import backtype.storm.hooks.info.BoltAckInfo; | |
import backtype.storm.hooks.info.EmitInfo; | |
import backtype.storm.task.TopologyContext; | |
import java.util.Map; | |
import java.util.concurrent.TimeUnit; | |
import com.google.common.collect.ArrayListMultimap; | |
import com.google.common.collect.ImmutableMap; | |
import com.google.common.collect.ListMultimap; | |
import com.google.common.collect.Multimaps; | |
import com.yammer.metrics.Metrics; | |
import com.yammer.metrics.core.Meter; | |
import com.yammer.metrics.core.MetricName; | |
import com.yammer.metrics.core.Timer; | |
/** | |
* A simple task hook that reports task metrics via the <a href="http://metrics.codahale.com/">Codehale Metrics API</a>. | |
*/ | |
public class MetricsTaskHook extends BaseTaskHook { | |
private Meter ackMeter; | |
private Timer boltProcessLatencyTimer; | |
private Class<?> klass; | |
private ImmutableMap<Integer, Meter> taskToMeter; | |
/** | |
* Zero-arg constructor used when topology.auto.task.hooks is set in the Storm configuration. Metrics will be | |
* saved with the group "storm" and type "taskInfo". | |
*/ | |
public MetricsTaskHook() { } | |
public MetricsTaskHook(Class<?> klass) { | |
this.klass = klass; | |
} | |
@Override | |
public void prepare(Map conf, TopologyContext context) { | |
ackMeter = Metrics.newMeter(getMetricName(klass, "acks", context), "acked", TimeUnit.SECONDS); | |
boltProcessLatencyTimer = Metrics.newTimer(getMetricName(klass, "boltProcessLatency", context), TimeUnit.MILLISECONDS, TimeUnit.SECONDS); | |
ImmutableMap.Builder<Integer, Meter> taskToMeterBuilder = ImmutableMap.builder(); | |
ListMultimap<String, Integer> componentToTasks = Multimaps.invertFrom(Multimaps.forMap(context.getTaskToComponent()), | |
ArrayListMultimap.<String, Integer>create()); | |
//Map from stream id to component id to the Grouping used. | |
Map<String, Map<String, Grouping>> targets = context.getThisTargets(); | |
for (Map.Entry<String, Map<String, Grouping>> entry : targets.entrySet()) { | |
for (String componentId : entry.getValue().keySet()) { | |
Meter meter = Metrics.newMeter(getMetricName(klass, componentId + "Emits", context), componentId + "Emitted", TimeUnit.SECONDS); | |
for (Integer task : componentToTasks.get(componentId)) { | |
taskToMeterBuilder.put(task, meter); | |
} | |
} | |
} | |
taskToMeter = taskToMeterBuilder.build(); | |
} | |
@Override | |
public void boltAck(BoltAckInfo info) { | |
ackMeter.mark(); | |
if (info.processLatencyMs != null) { | |
boltProcessLatencyTimer.update(info.processLatencyMs, TimeUnit.MILLISECONDS); | |
} | |
} | |
@Override | |
public void emit(EmitInfo info) { | |
for (Integer task : info.outTasks) { | |
Meter meter = taskToMeter.get(task); | |
if (meter != null) { | |
meter.mark(); | |
} | |
} | |
} | |
protected MetricName getMetricName(Class<?> klass, String name, TopologyContext context) { | |
if (klass == null) { | |
return new MetricName("storm", "taskInfo", name, MetricsStormUtil.getTimerScope(context)); | |
} | |
return new MetricName(klass, name, MetricsStormUtil.getTimerScope(context)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Very useful. Thanks.!