Skip to content

Instantly share code, notes, and snippets.

@mohnishkodnani
Created December 8, 2022 20:47
Show Gist options
  • Save mohnishkodnani/2668285229b928228f3b48f3f4a08e75 to your computer and use it in GitHub Desktop.
Save mohnishkodnani/2668285229b928228f3b48f3f4a08e75 to your computer and use it in GitHub Desktop.
package com.symphony.framework.akkastreams.v1;
import akka.stream.Attributes;
import akka.stream.FlowShape;
import akka.stream.Inlet;
import akka.stream.Outlet;
import akka.stream.stage.GraphStage;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.InHandler;
import akka.stream.stage.OutHandler;
import akka.stream.stage.TimerGraphStageLogic;
import com.symphony.framework.akkastreams.v1.helper.MetricUtils;
import com.symphony.framework.akkastreams.v1.helper.PlatformProviderRegistry;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*
* <pre>
* Task 1 Instrumented Stage Task 2
*
* +------------------------+ +-------------------------------+ +---------------------+
* | | | | | |
* | | | | | |
* +-----+ | +----+ | +----+ |
* | | | | | | Push | | |
* | IN <------Pull----+ | Push | IN <---------Pull---------+ | +----------> IN <----Pull-----+ |
* | | | | +----------> | | | | | | | |
* +--+--+ | | | +--+-+ | | | +--+-+ | |
* | | | | | | | | | | | | | |
* | | | | | | | upstream = | | | | | | |
* | | | | | | | now - last pull | | | | | | |
* | | | | | | | | | | | | | |
* | | | | | | | downstream = | | | | | | |
* | | | | | | | now - last push | | | | | | |
* | | +--+---+ | | | +--+---+ | | | +--+--+
* | | Push | | | | | Push | | | | | Push | |
* | +--------------> OUT +-----------+ | +---------------------> OUT +-----------+ | +------------> OUT |
* | | | | | | | | |
* | +------+ | +------+ | +-----+
* | | | | | |
* +------------------------+ +-------------------------------+ +---------------------+
* </pre>
*/
// @spotless:on
public class InstrumentedStage<In> extends GraphStage<FlowShape<In, In>> {
private static final Logger LOGGER = LoggerFactory.getLogger(InstrumentedStage.class);
private final Inlet<In> in = Inlet.create("InstrumentedStage.in");
private final Outlet<In> out = Outlet.create("InstrumentedStage.out");
private final FlowShape<In, In> shape = FlowShape.of(in, out);
private final String workflowName;
private final String workflowVersion;
private final String taskName;
private static final String LATENCY_METRIC_NAME = "stage_latency";
private static final String OP_NAME = "op";
private static final String OP_NAME_UPSTREAM = "up";
private static final String OP_NAME_DOWNSTREAM = "down";
private static final String WORKFLOW_NAME_TAG = "workflow";
private static final String WORKFLOW_VER_TAG = "workflow_version";
private static final String TASK_REF_TAG = "task_reference_id";
public InstrumentedStage(final String workflowName, String workflowVersion, String taskName) {
this.workflowName = workflowName;
this.workflowVersion = workflowVersion;
this.taskName = taskName;
}
@Override
public FlowShape<In, In> shape() {
return shape;
}
@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
LOGGER.error("createLogic called. " + taskName);
return new TimerGraphStageLogic(shape) {
long lastPulledTime = 0L;
long lastPushedTime = 0L;
private final Timer upstreamLatency =
createTimerMetrics(
OP_NAME_UPSTREAM,
"Measures the upstream latency for a given stage in akka workflow graph",
MetricUtils.INSTANCE.getBuckets());
private final Timer downstreamLatency =
createTimerMetrics(
OP_NAME_DOWNSTREAM,
"Measures the downstream latency for a given stage in akka workflow graph",
MetricUtils.INSTANCE.getBuckets());
private final AtomicInteger isBackpressured =
PlatformProviderRegistry.getMetricRegistry()
.gauge(
"backpressured",
List.of(
Tag.of(WORKFLOW_NAME_TAG, workflowName),
Tag.of(WORKFLOW_VER_TAG, workflowVersion),
Tag.of(TASK_REF_TAG, taskName)),
new AtomicInteger(1));
private final Counter throughput =
Counter.builder("throughput")
.tag(WORKFLOW_NAME_TAG, workflowName)
.tag(WORKFLOW_VER_TAG, workflowVersion)
.tag(TASK_REF_TAG, taskName)
.register(PlatformProviderRegistry.getMetricRegistry());
public void preStart() {
LOGGER.error("Prestart Called. " + taskName);
lastPulledTime = System.nanoTime();
lastPushedTime = lastPulledTime;
scheduleWithFixedDelay(
"update_backpressure", Duration.ofMillis(100), Duration.ofMillis(100));
}
{
setHandler(
in,
new InHandler() {
@Override
public void onPush() {
// The time between last pull to onPush will be the upstream latency for this stage.
// The picture above this section
// https://doc.akka.io/docs/akka/current/stream/stream-customize.html#completion
// Downstream latency > upstream latency for this stage then there is back pressure
// for this stage.
var now = System.nanoTime();
var upstreamTime = Duration.ofNanos(now - lastPulledTime);
LOGGER.debug("{} now= {}ns, upstream latency = {}ns", taskName, now, upstreamTime);
upstreamLatency.record(upstreamTime);
push(out, grab(in));
throughput.increment();
lastPushedTime = System.nanoTime();
}
});
setHandler(
out,
new OutHandler() {
@Override
public void onPull() {
// The time between this call and the next call to onPush will be the upstream
// latency for this stage.
// The time from last push to receive another onPull should be the downstream
// latency.
// The picture above this section
// https://doc.akka.io/docs/akka/current/stream/stream-customize.html#completion
var now = System.nanoTime();
var downstreamTime = Duration.ofNanos(now - lastPushedTime);
LOGGER.debug(
"{} now = {}ns, downstream latency = {}ns", taskName, now, downstreamTime);
downstreamLatency.record(downstreamTime);
pull(in);
lastPulledTime = System.nanoTime();
}
});
}
@Override
public void onTimer(Object timerKey) {
if (downstreamLatency.mean(TimeUnit.NANOSECONDS)
> upstreamLatency.mean(TimeUnit.NANOSECONDS)) {
isBackpressured.set(1);
} else {
isBackpressured.set(0);
}
}
};
}
private Timer createTimerMetrics(String opNameDownstream, String description, Duration... slos) {
return Timer.builder(LATENCY_METRIC_NAME)
.description(description)
.tag(OP_NAME, opNameDownstream)
.tag(WORKFLOW_NAME_TAG, workflowName)
.tag(WORKFLOW_VER_TAG, workflowVersion)
.tag(TASK_REF_TAG, taskName)
.serviceLevelObjectives(slos)
.register(PlatformProviderRegistry.getMetricRegistry());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment