Skip to content

Instantly share code, notes, and snippets.

@davideanastasia
Last active June 12, 2018 21:33
Show Gist options
  • Save davideanastasia/23290c918987457871380a7d6c12764b to your computer and use it in GitHub Desktop.
Save davideanastasia/23290c918987457871380a7d6c12764b to your computer and use it in GitHub Desktop.
public class StopWordRemoveFnTest {
@Rule
public final transient TestPipeline pipeline = TestPipeline.create();
@Test
public void testDoFn_TestPipeline() throws Exception {
PCollection<KV<Empty, String>> input = pipeline.apply(Create.of(
KV.of(Empty.EMPTY, "be"), KV.of(Empty.EMPTY, "is"), KV.of(Empty.EMPTY, "night"), KV.of(Empty.EMPTY, "dream")
).withCoder(KvCoder.of(AvroCoder.of(Empty.class), StringUtf8Coder.of())));
PCollection<KV<Empty, String>> output = input.apply(ParDo.of(new StopWordRemoveFn<>()));
PAssert.that(output)
.containsInAnyOrder(
KV.of(Empty.EMPTY, "night"),
KV.of(Empty.EMPTY, "dream")
);
// run pipeline!
pipeline.run();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment