Created
December 8, 2022 20:47
-
-
Save mohnishkodnani/2668285229b928228f3b48f3f4a08e75 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.symphony.framework.akkastreams.v1; | |
import akka.stream.Attributes; | |
import akka.stream.FlowShape; | |
import akka.stream.Inlet; | |
import akka.stream.Outlet; | |
import akka.stream.stage.GraphStage; | |
import akka.stream.stage.GraphStageLogic; | |
import akka.stream.stage.InHandler; | |
import akka.stream.stage.OutHandler; | |
import akka.stream.stage.TimerGraphStageLogic; | |
import com.symphony.framework.akkastreams.v1.helper.MetricUtils; | |
import com.symphony.framework.akkastreams.v1.helper.PlatformProviderRegistry; | |
import io.micrometer.core.instrument.Counter; | |
import io.micrometer.core.instrument.Tag; | |
import io.micrometer.core.instrument.Timer; | |
import java.time.Duration; | |
import java.util.List; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
/** | |
* | |
* | |
* <pre> | |
* Task 1 Instrumented Stage Task 2 | |
* | |
* +------------------------+ +-------------------------------+ +---------------------+ | |
* | | | | | | | |
* | | | | | | | |
* +-----+ | +----+ | +----+ | | |
* | | | | | | Push | | | | |
* | IN <------Pull----+ | Push | IN <---------Pull---------+ | +----------> IN <----Pull-----+ | | |
* | | | | +----------> | | | | | | | | | |
* +--+--+ | | | +--+-+ | | | +--+-+ | | | |
* | | | | | | | | | | | | | | | |
* | | | | | | | upstream = | | | | | | | | |
* | | | | | | | now - last pull | | | | | | | | |
* | | | | | | | | | | | | | | | |
* | | | | | | | downstream = | | | | | | | | |
* | | | | | | | now - last push | | | | | | | | |
* | | +--+---+ | | | +--+---+ | | | +--+--+ | |
* | | Push | | | | | Push | | | | | Push | | | |
* | +--------------> OUT +-----------+ | +---------------------> OUT +-----------+ | +------------> OUT | | |
* | | | | | | | | | | |
* | +------+ | +------+ | +-----+ | |
* | | | | | | | |
* +------------------------+ +-------------------------------+ +---------------------+ | |
* </pre> | |
*/ | |
// @spotless:on | |
public class InstrumentedStage<In> extends GraphStage<FlowShape<In, In>> { | |
private static final Logger LOGGER = LoggerFactory.getLogger(InstrumentedStage.class); | |
private final Inlet<In> in = Inlet.create("InstrumentedStage.in"); | |
private final Outlet<In> out = Outlet.create("InstrumentedStage.out"); | |
private final FlowShape<In, In> shape = FlowShape.of(in, out); | |
private final String workflowName; | |
private final String workflowVersion; | |
private final String taskName; | |
private static final String LATENCY_METRIC_NAME = "stage_latency"; | |
private static final String OP_NAME = "op"; | |
private static final String OP_NAME_UPSTREAM = "up"; | |
private static final String OP_NAME_DOWNSTREAM = "down"; | |
private static final String WORKFLOW_NAME_TAG = "workflow"; | |
private static final String WORKFLOW_VER_TAG = "workflow_version"; | |
private static final String TASK_REF_TAG = "task_reference_id"; | |
public InstrumentedStage(final String workflowName, String workflowVersion, String taskName) { | |
this.workflowName = workflowName; | |
this.workflowVersion = workflowVersion; | |
this.taskName = taskName; | |
} | |
@Override | |
public FlowShape<In, In> shape() { | |
return shape; | |
} | |
@Override | |
public GraphStageLogic createLogic(Attributes inheritedAttributes) { | |
LOGGER.error("createLogic called. " + taskName); | |
return new TimerGraphStageLogic(shape) { | |
long lastPulledTime = 0L; | |
long lastPushedTime = 0L; | |
private final Timer upstreamLatency = | |
createTimerMetrics( | |
OP_NAME_UPSTREAM, | |
"Measures the upstream latency for a given stage in akka workflow graph", | |
MetricUtils.INSTANCE.getBuckets()); | |
private final Timer downstreamLatency = | |
createTimerMetrics( | |
OP_NAME_DOWNSTREAM, | |
"Measures the downstream latency for a given stage in akka workflow graph", | |
MetricUtils.INSTANCE.getBuckets()); | |
private final AtomicInteger isBackpressured = | |
PlatformProviderRegistry.getMetricRegistry() | |
.gauge( | |
"backpressured", | |
List.of( | |
Tag.of(WORKFLOW_NAME_TAG, workflowName), | |
Tag.of(WORKFLOW_VER_TAG, workflowVersion), | |
Tag.of(TASK_REF_TAG, taskName)), | |
new AtomicInteger(1)); | |
private final Counter throughput = | |
Counter.builder("throughput") | |
.tag(WORKFLOW_NAME_TAG, workflowName) | |
.tag(WORKFLOW_VER_TAG, workflowVersion) | |
.tag(TASK_REF_TAG, taskName) | |
.register(PlatformProviderRegistry.getMetricRegistry()); | |
public void preStart() { | |
LOGGER.error("Prestart Called. " + taskName); | |
lastPulledTime = System.nanoTime(); | |
lastPushedTime = lastPulledTime; | |
scheduleWithFixedDelay( | |
"update_backpressure", Duration.ofMillis(100), Duration.ofMillis(100)); | |
} | |
{ | |
setHandler( | |
in, | |
new InHandler() { | |
@Override | |
public void onPush() { | |
// The time between last pull to onPush will be the upstream latency for this stage. | |
// The picture above this section | |
// https://doc.akka.io/docs/akka/current/stream/stream-customize.html#completion | |
// Downstream latency > upstream latency for this stage then there is back pressure | |
// for this stage. | |
var now = System.nanoTime(); | |
var upstreamTime = Duration.ofNanos(now - lastPulledTime); | |
LOGGER.debug("{} now= {}ns, upstream latency = {}ns", taskName, now, upstreamTime); | |
upstreamLatency.record(upstreamTime); | |
push(out, grab(in)); | |
throughput.increment(); | |
lastPushedTime = System.nanoTime(); | |
} | |
}); | |
setHandler( | |
out, | |
new OutHandler() { | |
@Override | |
public void onPull() { | |
// The time between this call and the next call to onPush will be the upstream | |
// latency for this stage. | |
// The time from last push to receive another onPull should be the downstream | |
// latency. | |
// The picture above this section | |
// https://doc.akka.io/docs/akka/current/stream/stream-customize.html#completion | |
var now = System.nanoTime(); | |
var downstreamTime = Duration.ofNanos(now - lastPushedTime); | |
LOGGER.debug( | |
"{} now = {}ns, downstream latency = {}ns", taskName, now, downstreamTime); | |
downstreamLatency.record(downstreamTime); | |
pull(in); | |
lastPulledTime = System.nanoTime(); | |
} | |
}); | |
} | |
@Override | |
public void onTimer(Object timerKey) { | |
if (downstreamLatency.mean(TimeUnit.NANOSECONDS) | |
> upstreamLatency.mean(TimeUnit.NANOSECONDS)) { | |
isBackpressured.set(1); | |
} else { | |
isBackpressured.set(0); | |
} | |
} | |
}; | |
} | |
private Timer createTimerMetrics(String opNameDownstream, String description, Duration... slos) { | |
return Timer.builder(LATENCY_METRIC_NAME) | |
.description(description) | |
.tag(OP_NAME, opNameDownstream) | |
.tag(WORKFLOW_NAME_TAG, workflowName) | |
.tag(WORKFLOW_VER_TAG, workflowVersion) | |
.tag(TASK_REF_TAG, taskName) | |
.serviceLevelObjectives(slos) | |
.register(PlatformProviderRegistry.getMetricRegistry()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment