Created
February 24, 2017 02:16
-
-
Save manuzhang/1fe3a12b6ae557d79d277eb34639e914 to your computer and use it in GitHub Desktop.
Patch of adding metrics to WindowOperator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Index: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java | |
IDEA additional info: | |
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP | |
<+>UTF-8 | |
=================================================================== | |
--- flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java (date 1487603477000) | |
+++ flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java (revision ) | |
@@ -38,6 +38,8 @@ | |
import org.apache.flink.core.fs.FSDataInputStream; | |
import org.apache.flink.core.memory.DataInputView; | |
import org.apache.flink.core.memory.DataInputViewStreamWrapper; | |
+import org.apache.flink.metrics.Counter; | |
+import org.apache.flink.metrics.Gauge; | |
import org.apache.flink.metrics.MetricGroup; | |
import org.apache.flink.runtime.state.ArrayListSerializer; | |
import org.apache.flink.runtime.state.VoidNamespace; | |
@@ -69,8 +71,11 @@ | |
import java.io.Serializable; | |
import java.util.Collection; | |
import java.util.Comparator; | |
+import java.util.HashSet; | |
+import java.util.LinkedList; | |
import java.util.List; | |
import java.util.PriorityQueue; | |
+import java.util.Set; | |
import static org.apache.flink.util.Preconditions.checkArgument; | |
import static org.apache.flink.util.Preconditions.checkNotNull; | |
@@ -189,6 +194,18 @@ | |
*/ | |
private transient PriorityQueue<Timer<K, W>> restoredFromLegacyEventTimeTimers; | |
+ private transient Set<Tuple2<W, K>> windows; | |
+ | |
+ private transient Gauge windowCount; | |
+ | |
+ private transient List<W> emittedWindows; | |
+ | |
+ private transient Counter emittedWindowCount; | |
+ | |
+ private transient Counter lateWindowCount; | |
+ | |
+ | |
+ | |
/** | |
* Creates a new {@code WindowOperator} based on the given policies and user functions. | |
*/ | |
@@ -300,6 +317,10 @@ | |
} | |
registerRestoredLegacyStateState(); | |
+ windows = new HashSet<>(); | |
+ windowCount = getMetricGroup().gauge("Window Count", new WindowCountGauge<>(windows)); | |
+ emittedWindowCount = getMetricGroup().counter("Emitted Window Count"); | |
+ lateWindowCount = getMetricGroup().counter("Late Window Count"); | |
} | |
@Override | |
@@ -354,8 +375,13 @@ | |
} | |
}); | |
+ if (actualWindow == window) { | |
+ windows.add(Tuple2.of(window, key)); | |
+ } | |
+ | |
// drop if the window is already late | |
if (isLate(actualWindow)) { | |
+ lateWindowCount.inc(); | |
mergingWindows.retireWindow(actualWindow); | |
continue; | |
} | |
@@ -392,8 +418,10 @@ | |
} else { | |
for (W window: elementWindows) { | |
+ windows.add(Tuple2.of(window, key)); | |
// drop if the window is already late | |
if (isLate(window)) { | |
+ lateWindowCount.inc(); | |
continue; | |
} | |
@@ -541,6 +569,7 @@ | |
*/ | |
@SuppressWarnings("unchecked") | |
private void emitWindowContents(W window, ACC contents) throws Exception { | |
+ emittedWindowCount.inc(); | |
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); | |
userFunction.apply(context.key, context.window, contents, timestampedCollector); | |
} | |
@@ -770,6 +799,20 @@ | |
} | |
} | |
+ private static class WindowCountGauge<W> implements Gauge<Integer> { | |
+ | |
+ private final Collection<W> windows; | |
+ | |
+ public WindowCountGauge(Collection<W> windows) { | |
+ this.windows = windows; | |
+ } | |
+ | |
+ @Override | |
+ public Integer getValue() { | |
+ return windows.size(); | |
+ } | |
+ } | |
+ | |
/** | |
* Internal class for keeping track of in-flight timers. | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment