Created
May 24, 2023 21:26
-
-
Save egalpin/50edc34c7ca7f1c222fd5c4244a14e59 to your computer and use it in GitHub Desktop.
Tests for RateLimit.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import static org.junit.Assert.assertTrue; | |
import org.apache.beam.sdk.testing.NeedsRunner; | |
import org.apache.beam.sdk.testing.PAssert; | |
import org.apache.beam.sdk.transforms.Count; | |
import org.apache.beam.sdk.transforms.Create; | |
import org.apache.beam.sdk.transforms.MapElements; | |
import org.apache.beam.sdk.transforms.PeriodicImpulse; | |
import org.apache.beam.sdk.transforms.SerializableFunction; | |
import org.apache.beam.sdk.transforms.Values; | |
import org.apache.beam.sdk.transforms.WithKeys; | |
import org.apache.beam.sdk.transforms.windowing.AfterWatermark; | |
import org.apache.beam.sdk.transforms.windowing.GlobalWindows; | |
import org.apache.beam.sdk.transforms.windowing.Window; | |
import org.apache.beam.sdk.values.PCollection; | |
import org.apache.beam.sdk.values.TypeDescriptors; | |
import org.joda.time.Duration; | |
import org.joda.time.Instant; | |
import org.junit.Test; | |
import org.junit.experimental.categories.Category; | |
public class RateLimitTest extends BasePipelineTest { | |
@Test | |
@Category({NeedsRunner.class}) | |
public void testRateLimitWindowExpiration() throws Exception { | |
Instant start = Instant.now(); | |
PCollection<String> rawOutputs = | |
pipeline | |
.apply(Create.of("bar", "baz", "123")) | |
.apply(WithKeys.of("foo")) | |
.apply(RateLimit.<String, String>ofSize(1).per(Duration.standardSeconds(1))) | |
.apply(Values.create()); | |
PAssert.that(rawOutputs).containsInAnyOrder("bar", "baz", "123"); | |
pipeline.run(); | |
assertTrue(Instant.now().minus(start.getMillis()).getMillis() > 3000); | |
assertTrue(Instant.now().minus(start.getMillis()).getMillis() < 5000); | |
} | |
static MapElements<Instant, String> foo() { | |
return MapElements.into(TypeDescriptors.strings()) | |
.via((SerializableFunction<Instant, String>) input -> "foo"); | |
} | |
@Test | |
@Category({NeedsRunner.class}) | |
public void testRateLimitIntervalExpiration() throws Exception { | |
Duration rateLimitInterval = Duration.millis(100); | |
Instant watermark = Instant.now(); | |
PCollection<String> rawOutputs = | |
pipeline | |
.apply( | |
PeriodicImpulse.create() | |
.withInterval(rateLimitInterval.dividedBy(2)) | |
.startAt(watermark) | |
.stopAt(watermark.plus(rateLimitInterval.multipliedBy(5)))) | |
.apply(foo()) | |
.apply(WithKeys.of("foo")) | |
.apply(RateLimit.<String, String>ofSize(1).per(rateLimitInterval)) | |
.apply(Values.create()); | |
PCollection<Long> numElems = | |
rawOutputs | |
.apply( | |
Window.<String>into(new GlobalWindows()) | |
.triggering(AfterWatermark.pastEndOfWindow()) | |
.withAllowedLateness(Duration.ZERO) | |
.discardingFiredPanes()) | |
.apply(Count.globally()); | |
PAssert.that(rawOutputs) | |
.containsInAnyOrder( | |
"foo", "foo", "foo", "foo", "foo", "foo", "foo", "foo", "foo", "foo", "foo"); | |
PAssert.that(numElems).containsInAnyOrder(11L); | |
pipeline.run(); | |
} | |
@Test | |
@Category({NeedsRunner.class}) | |
public void testRateLimitBytesIntervalExpiration() throws Exception { | |
Duration rateLimitInterval = Duration.millis(100); | |
Instant watermark = Instant.now(); | |
PCollection<String> rawOutputs = | |
pipeline | |
.apply( | |
PeriodicImpulse.create() | |
.withInterval(rateLimitInterval.dividedBy(2)) | |
.startAt(watermark) | |
.stopAt(watermark.plus(rateLimitInterval.multipliedBy(5)))) | |
.apply(foo()) | |
.apply(WithKeys.of("foo")) | |
.apply(RateLimit.<String, String>ofByteSize(1).per(rateLimitInterval)) | |
.apply(Values.create()); | |
PCollection<Long> numElems = | |
rawOutputs | |
.apply( | |
Window.<String>into(new GlobalWindows()) | |
.triggering(AfterWatermark.pastEndOfWindow()) | |
.withAllowedLateness(Duration.ZERO) | |
.discardingFiredPanes()) | |
.apply(Count.globally()); | |
PAssert.that(rawOutputs) | |
.containsInAnyOrder( | |
"foo", "foo", "foo", "foo", "foo", "foo", "foo", "foo", "foo", "foo", "foo"); | |
PAssert.that(numElems).containsInAnyOrder(11L); | |
pipeline.run(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment