Skip to content

Instantly share code, notes, and snippets.

@damondouglas
Created August 23, 2023 23:26
Show Gist options
  • Save damondouglas/5deb0a2efa585b614a26b7891698fdc2 to your computer and use it in GitHub Desktop.
Save damondouglas/5deb0a2efa585b614a26b7891698fdc2 to your computer and use it in GitHub Desktop.
Can Beam slow down a PCollection<String>?
.gradle
build
.idea
.vscode
*.iml
.DS_Store
gradlew*
gradle*
.gitattributes
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
plugins {
java
id("com.diffplug.spotless") version "6.8.0"
}
val beamVersion = "2.49.0"
val junitVersion = "4.12"
val hamcrestVersion = "2.1"
val slf4jVersion = "1.7.36"
val solutionClass = "org.apache.beam.apioveruse.RunSolution"
spotless {
java {
importOrder()
removeUnusedImports()
googleJavaFormat()
}
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(11))
}
}
repositories {
mavenCentral()
maven {
// Required for Beam to resolve confluent dependency error
url = uri("https://packages.confluent.io/maven/")
}
}
dependencies {
implementation(platform("org.apache.beam:beam-sdks-java-bom:$beamVersion"))
implementation("org.apache.beam:beam-runners-direct-java")
implementation("org.slf4j:slf4j-simple:$slf4jVersion")
testImplementation("junit:junit:$junitVersion")
testImplementation("org.hamcrest:hamcrest:$hamcrestVersion")
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.throttle;
import org.apache.beam.sdk.values.PCollection;
public class Throttle {
public static PCollection<String> throttle(PCollection<String> input) {
// TODO: refactor to slow down the processing of elements from input.
return input;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.throttle;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import java.util.List;
import java.util.Spliterator;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Mean;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.ParDo.SingleOutput;
import org.apache.beam.sdk.transforms.PeriodicImpulse;
import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
public class ThrottleTest {
@Rule public transient TestPipeline p = TestPipeline.create();
@Test
public void throttle() {
// Generates a single PeriodicImpulse Instant.
Duration interval = Duration.standardSeconds(1L);
Instant instant = Instant.now();
Instant stop = instant.plus(interval.minus(Duration.millis(1L)));
PCollection<String> elements =
p.apply(
"impulse",
PeriodicImpulse.create().startAt(instant).stopAt(stop).withInterval(interval))
.apply("generateElements", generateElements(1000L))
.apply("preThrottle/applyTimestamps", applyTimestamps());
PCollection<String> throttled = Throttle.throttle(elements);
PCollection<Double> preThrottleIntervals =
computeWindowedMeanTimestampIntervals("preThrottle", interval, elements);
PCollection<Double> postThrottleIntervals =
computeWindowedMeanTimestampIntervals("postThrottle", interval, throttled);
PCollectionView<Double> preThrottleIntervalView =
preThrottleIntervals.apply("preThrottle/asView", View.asSingleton());
PCollection<Boolean> postGreaterThanPre =
postThrottleIntervals
.apply("postGreaterThanPre", compareToView(preThrottleIntervalView, (a, b) -> a > b))
.apply("distinct", Distinct.create());
// TODO: change expected to true after refactoring Throttle.throttle.
boolean expected = false;
PAssert.that(postGreaterThanPre).containsInAnyOrder(expected);
p.run();
}
private static SingleOutput<Instant, String> generateElements(Long size) {
return ParDo.of(
new DoFn<Instant, String>() {
@ProcessElement
public void process(@Element Instant instant, OutputReceiver<String> receiver) {
LongStream.range(1, size)
.mapToObj((String::valueOf))
.forEachOrdered((receiver::output));
}
});
}
private static SingleOutput<String, String> applyTimestamps() {
return ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void process(@Element String element, OutputReceiver<String> receiver) {
receiver.outputWithTimestamp(element, Instant.now());
}
});
}
private static PCollection<Double> computeWindowedMeanTimestampIntervals(
String stepName, Duration interval, PCollection<String> elements) {
return elements
.apply(stepName + "/extractTimestamps", extractTimestamps())
.apply(stepName + "/window", window(interval))
.apply(stepName + "/assignKey", assignBoundedWindowTimestampAsKey())
.apply(stepName + "/groupByKey", GroupByKey.create())
.apply(stepName + "/intervals", measureIntervalsPerWindow())
.apply(stepName + "/mean", Mean.globally().withoutDefaults());
}
private static SingleOutput<KV<Instant, Iterable<Instant>>, Number> measureIntervalsPerWindow() {
return ParDo.of(
new DoFn<KV<Instant, Iterable<Instant>>, Number>() {
@ProcessElement
public void process(
@Element KV<Instant, Iterable<Instant>> element, OutputReceiver<Number> receiver) {
Spliterator<Instant> spliterator = checkStateNotNull(element.getValue()).spliterator();
List<Instant> timestamps =
StreamSupport.stream(spliterator, false).sorted().collect(Collectors.toList());
checkState(!timestamps.isEmpty());
Instant prev = timestamps.get(0);
for (int i = 1; i < timestamps.size(); i++) {
Instant current = timestamps.get(i);
Long diff = current.getMillis() - prev.getMillis();
receiver.output(diff.doubleValue());
prev = current;
}
}
});
}
private static SingleOutput<String, Instant> extractTimestamps() {
return ParDo.of(
new DoFn<String, Instant>() {
@ProcessElement
public void process(@Timestamp Instant timestamp, OutputReceiver<Instant> receiver) {
receiver.output(timestamp);
}
});
}
private static <T> Window<T> window(Duration interval) {
return Window.<T>into(FixedWindows.of(interval))
.triggering(DefaultTrigger.of())
.discardingFiredPanes();
}
private static SingleOutput<Instant, KV<Instant, Instant>> assignBoundedWindowTimestampAsKey() {
return ParDo.of(
new DoFn<Instant, KV<Instant, Instant>>() {
@ProcessElement
public void process(
@Element Instant instant,
BoundedWindow window,
OutputReceiver<KV<Instant, Instant>> receiver) {
receiver.output(KV.of(window.maxTimestamp(), instant));
}
});
}
private static SingleOutput<Double, Boolean> compareToView(
PCollectionView<Double> view, SerializableBiFunction<Double, Double, Boolean> compareFn) {
return ParDo.of(
new DoFn<Double, Boolean>() {
@ProcessElement
public void process(
@Element Double value,
@SideInput("view") Double viewValue,
OutputReceiver<Boolean> receiver) {
receiver.output(compareFn.apply(value, viewValue));
}
})
.withSideInput("view", view);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment