Last active
July 13, 2021 21:00
-
-
Save marcoslin/e1e19afdbacac9757f6974592cfd8d7f to your computer and use it in GitHub Desktop.
Kotlin Apache Beam and Iterable
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package app.fp8.beam | |
import org.apache.beam.sdk.Pipeline | |
import org.apache.beam.sdk.transforms.Create | |
import org.apache.beam.sdk.transforms.DoFn | |
import org.apache.beam.sdk.transforms.ParDo | |
import org.apache.beam.sdk.values.KV | |
import org.junit.jupiter.api.Test | |
import org.junit.jupiter.api.Assertions as assert | |
class PrintString: DoFn<KV<String, Iterable<String>>, String>() { | |
// @Element input: KV<String, Iterable<String>>, receiver: OutputReceiver<String> | |
@ProcessElement | |
fun processElement( | |
c: ProcessContext | |
) { | |
val input = c.element() | |
val output = input.key + "|" + input.value.toString() | |
println("output: $output") | |
c.output(output) | |
} | |
} | |
class TestIterableProcessor { | |
@Test | |
fun `Test Iterable`() { | |
val pipe = Pipeline.create() | |
pipe | |
.apply( | |
Create.of<KV<String, Iterable<String>>>( | |
KV.of("A", listOf("one", "um").asIterable()), | |
KV.of("A", listOf("two", "dois").asIterable()) | |
) | |
) | |
.apply( | |
ParDo.of(PrintString()) | |
) | |
pipe.run().waitUntilFinish() | |
} | |
} | |
// USING: | |
/* | |
java.lang.IllegalArgumentException: app.fp8.beam.PrintString, @ProcessElement processElement(ProcessContext), @ProcessElement processElement(ProcessContext), parameter of type DoFn<KV<String, Iterable<String>>, String>.ProcessContext at index 0: ProcessContext argument must have type DoFn<KV<String, Iterable<? extends String>>, String>.ProcessContext | |
at org.apache.beam.sdk.transforms.reflect.DoFnSignatures$ErrorReporter.throwIllegalArgument(DoFnSignatures.java:1501) | |
at org.apache.beam.sdk.transforms.reflect.DoFnSignatures$ErrorReporter.checkArgument(DoFnSignatures.java:1506) | |
at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.analyzeExtraParameter(DoFnSignatures.java:908) | |
at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.analyzeProcessElementMethod(DoFnSignatures.java:823) | |
at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.parseSignature(DoFnSignatures.java:394) | |
at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.lambda$getSignature$0(DoFnSignatures.java:139) | |
at java.util.HashMap.computeIfAbsent(HashMap.java:1127) | |
at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getSignature(DoFnSignatures.java:139) | |
at org.apache.beam.sdk.transforms.ParDo.validate(ParDo.java:546) | |
at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:396) | |
at app.fp8.beam.TestIterableProcessor.Test Iterable(TestIterableProcessor.kt:40) | |
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) | |
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) | |
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) | |
at java.lang.reflect.Method.invoke(Method.java:498) | |
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:628) | |
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:117) | |
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:184) | |
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) | |
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:180) | |
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:127) | |
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:135) | |
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125) | |
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123) | |
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80) | |
at java.util.ArrayList.forEach(ArrayList.java:1257) | |
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139) | |
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125) | |
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123) | |
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80) | |
at java.util.ArrayList.forEach(ArrayList.java:1257) | |
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139) | |
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125) | |
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123) | |
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80) | |
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32) | |
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) | |
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51) | |
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:229) | |
at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:197) | |
at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:211) | |
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:191) | |
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128) | |
at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:74) | |
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) | |
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) | |
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) | |
*/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.beam.sdk.Pipeline | |
import org.apache.beam.sdk.transforms.Create | |
import org.apache.beam.sdk.transforms.DoFn | |
import org.apache.beam.sdk.transforms.ParDo | |
import org.apache.beam.sdk.values.KV | |
import org.junit.jupiter.api.Test | |
class PrintString: DoFn<KV<String, Iterable<String>>, String>() { | |
@ProcessElement | |
fun processElement(@Element input: KV<String, Iterable<String>>, receiver: OutputReceiver<String>) { | |
val output = input.key + "|" + input.value.toString() | |
println("output: $output") | |
receiver.output(output) | |
} | |
} | |
class TestIterableProcessor { | |
@Test | |
fun `Test Iterable`() { | |
val pipe = Pipeline.create() | |
pipe | |
.apply( | |
Create.of( | |
KV.of("A", listOf("one", "um") as Iterable<String>), | |
KV.of("A", listOf("two", "dois") as Iterable<String>) | |
) | |
) | |
.apply( | |
ParDo.of(PrintString()) | |
) | |
pipe.run().waitUntilFinish() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kotlin("jvm") version "1.3.21" | |
implementation("org.apache.beam:beam-sdks-java-core:2.12.0") | |
implementation("org.apache.beam:beam-runners-direct-java:2.12.0") | |
implementation("org.apache.beam:beam-runners-google-cloud-dataflow-java:2.12.0") | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
java.lang.IllegalArgumentException: app.fp8.phi.managed.datafeed.procs.PrintString, @ProcessElement processElement(KV, OutputReceiver), @ProcessElement processElement(KV, OutputReceiver): @Element argument must have type org.apache.beam.sdk.values.KV<java.lang.String, java.lang.Iterable<? extends java.lang.String>> | |
at org.apache.beam.sdk.transforms.reflect.DoFnSignatures$ErrorReporter.throwIllegalArgument(DoFnSignatures.java:1526) | |
at org.apache.beam.sdk.transforms.reflect.DoFnSignatures$ErrorReporter.checkArgument(DoFnSignatures.java:1531) | |
at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.analyzeExtraParameter(DoFnSignatures.java:900) | |
at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.analyzeProcessElementMethod(DoFnSignatures.java:825) | |
at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.parseSignature(DoFnSignatures.java:395) | |
at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.lambda$getSignature$0(DoFnSignatures.java:140) | |
at java.util.HashMap.computeIfAbsent(HashMap.java:1127) | |
at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getSignature(DoFnSignatures.java:140) | |
at org.apache.beam.sdk.transforms.ParDo.validate(ParDo.java:549) | |
at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:396) | |
at app.fp8.phi.managed.datafeed.procs.TestIterableProcessor.Test Iterable(TestIterableProcessor.kt:39) | |
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) | |
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) | |
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) | |
at java.lang.reflect.Method.invoke(Method.java:498) | |
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:628) | |
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:117) | |
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:184) | |
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) | |
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:180) | |
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:127) | |
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:135) | |
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125) | |
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123) | |
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80) | |
at java.util.ArrayList.forEach(ArrayList.java:1257) | |
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139) | |
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125) | |
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123) | |
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80) | |
at java.util.ArrayList.forEach(ArrayList.java:1257) | |
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139) | |
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125) | |
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123) | |
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122) | |
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80) | |
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32) | |
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) | |
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51) | |
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:229) | |
at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:197) | |
at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:211) | |
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:191) | |
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128) | |
at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:74) | |
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) | |
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) | |
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.beam.sdk.Pipeline | |
import org.apache.beam.sdk.transforms.Create | |
import org.apache.beam.sdk.transforms.DoFn | |
import org.apache.beam.sdk.transforms.ParDo | |
import org.apache.beam.sdk.values.KV | |
import org.junit.jupiter.api.Test | |
import org.junit.jupiter.api.Assertions as assert | |
import java.lang.Iterable as JavaIterable | |
class PrintString: DoFn<KV<String, JavaIterable<String>>, String>() { | |
@ProcessElement | |
fun processElement(@Element input: KV<String, JavaIterable<String>>, receiver: OutputReceiver<String>) { | |
val output = input.key + "|" + input.value.toString() | |
println("output: $output") | |
receiver.output(output) | |
} | |
} | |
class TestIterableProcessor { | |
@Test | |
fun `Test Iterable`() { | |
val pipe = Pipeline.create() | |
pipe | |
.apply( | |
Create.of( | |
KV.of("A", listOf("one", "um") as JavaIterable<String>), | |
KV.of("A", listOf("two", "dois") as JavaIterable<String>) | |
) | |
) | |
.apply( | |
ParDo.of(PrintString()) | |
) | |
pipe.run().waitUntilFinish() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I just ran into the same problem.