Last active
February 12, 2018 14:06
-
-
Save kell18/2cc8f8ae7b530299c172a4011e2050e6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ru.itclover.streammachine.http | |
import akka.actor.{Actor, ActorSystem, PoisonPill, Props, Terminated} | |
import akka.stream.ActorMaterializer | |
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment | |
import scala.concurrent.duration._ | |
import scala.concurrent.{Await, ExecutionContextExecutor} | |
import scala.io.StdIn | |
import akka.stream.ActorMaterializer | |
import org.apache.flink.api.scala._ | |
import scala.concurrent.{Await, ExecutionContextExecutor, Future} | |
import org.apache.flink.api.common.functions._ | |
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment | |
import org.apache.flink.api.java.io.CollectionInputFormat | |
import org.apache.flink.api.java.typeutils.TypeExtractor | |
import org.apache.flink.configuration.Configuration | |
import org.apache.flink.streaming.api.functions.source.{FromElementsFunction, InputFormatSourceFunction, RichSourceFunction, SourceFunction} | |
// format | |
case class SequentialSources[T](sources: Seq[InputFormatSourceFunction[T]]) extends RichSourceFunction[T] { | |
override def run(ctx: SourceFunction.SourceContext[T]): Unit = { | |
sources.foreach { src => | |
ctx.getCheckpointLock | |
src.run(ctx) | |
while (!src.getFormat.reachedEnd()) {} // ... NullPointerException getFormat == null | |
} | |
} | |
override def cancel(): Unit = { | |
sources.foreach(_.cancel()) | |
} | |
override def setRuntimeContext(t: RuntimeContext) = { | |
super.setRuntimeContext(t) | |
sources.foreach(_.setRuntimeContext(t)) | |
} | |
override def close() = { | |
super.close() | |
sources.foreach(_.close()) | |
} | |
override def open(parameters: Configuration) = { | |
super.open(parameters) | |
sources.foreach(_.open(parameters)) | |
} | |
} | |
object Launcher extends App { | |
implicit val system: ActorSystem = ActorSystem("my-system") | |
implicit val materializer: ActorMaterializer = ActorMaterializer() | |
implicit val executionContext: ExecutionContextExecutor = system.dispatcher | |
val streamEnvironment = StreamExecutionEnvironment.createLocalEnvironment() | |
import scala.collection.JavaConversions._ | |
val coll1: java.util.Collection[Int] = Seq(1, 2, 3, 4, 5) | |
val intCollType = TypeExtractor.getForObject(coll1.iterator().next()) | |
val srcFmt1 = new CollectionInputFormat(coll1, intCollType.createSerializer(streamEnvironment.getConfig)) | |
val coll2: java.util.Collection[Int] = Seq(20, 21) | |
val srcFmt2 = new CollectionInputFormat(coll2, intCollType.createSerializer(streamEnvironment.getConfig)) | |
val src1 = new InputFormatSourceFunction[Int](srcFmt1, intCollType) | |
val src2 = new InputFormatSourceFunction[Int](srcFmt2, intCollType) | |
private val value = SequentialSources(Seq(src1, src2)) | |
val a = streamEnvironment.addSource(value) | |
a.map(x => println(s"X=$x")) // X=4, 5, 1, 2, 3 or something like that | |
streamEnvironment.execute() | |
StdIn.readLine() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Error:
16:53:22 [flink-akka.actor.default-dispatcher-2] ERROR o.a.f.runtime.jobmanager.JobManager - No InputSplitAssigner for vertex ID bc764cd8ddf7a0cff126f51c16239658.
16:53:22 [flink-akka.actor.default-dispatcher-2] ERROR o.a.f.runtime.jobmanager.JobManager - No InputSplitAssigner for vertex ID bc764cd8ddf7a0cff126f51c16239658.
16:53:22 [Source: Custom Source (1/1)] INFO o.a.flink.runtime.taskmanager.Task - Source: Custom Source (1/1) (50b5c08b97e186346c1d629d00a13a5a) switched from RUNNING to FAILED.
java.lang.NullPointerException: null
at org.apache.flink.api.java.io.CollectionInputFormat.reachedEnd(CollectionInputFormat.java:64)
at ru.itclover.streammachine.http.SequentialSources$$anonfun$run$1.apply(Launcher.scala:26)
at ru.itclover.streammachine.http.SequentialSources$$anonfun$run$1.apply(Launcher.scala:23)
at scala.collection.immutable.List.foreach(List.scala:381)
at ru.itclover.streammachine.http.SequentialSources.run(Launcher.scala:23)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)