Skip to content

Instantly share code, notes, and snippets.

@kell18
Last active February 12, 2018 14:06
Show Gist options
  • Save kell18/2cc8f8ae7b530299c172a4011e2050e6 to your computer and use it in GitHub Desktop.
Save kell18/2cc8f8ae7b530299c172a4011e2050e6 to your computer and use it in GitHub Desktop.
package ru.itclover.streammachine.http
import akka.actor.{Actor, ActorSystem, PoisonPill, Props, Terminated}
import akka.stream.ActorMaterializer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContextExecutor}
import scala.io.StdIn
import akka.stream.ActorMaterializer
import org.apache.flink.api.scala._
import scala.concurrent.{Await, ExecutionContextExecutor, Future}
import org.apache.flink.api.common.functions._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.java.io.CollectionInputFormat
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{FromElementsFunction, InputFormatSourceFunction, RichSourceFunction, SourceFunction}
// format
case class SequentialSources[T](sources: Seq[InputFormatSourceFunction[T]]) extends RichSourceFunction[T] {
override def run(ctx: SourceFunction.SourceContext[T]): Unit = {
sources.foreach { src =>
ctx.getCheckpointLock
src.run(ctx)
while (!src.getFormat.reachedEnd()) {} // ... NullPointerException getFormat == null
}
}
override def cancel(): Unit = {
sources.foreach(_.cancel())
}
override def setRuntimeContext(t: RuntimeContext) = {
super.setRuntimeContext(t)
sources.foreach(_.setRuntimeContext(t))
}
override def close() = {
super.close()
sources.foreach(_.close())
}
override def open(parameters: Configuration) = {
super.open(parameters)
sources.foreach(_.open(parameters))
}
}
object Launcher extends App {
implicit val system: ActorSystem = ActorSystem("my-system")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executionContext: ExecutionContextExecutor = system.dispatcher
val streamEnvironment = StreamExecutionEnvironment.createLocalEnvironment()
import scala.collection.JavaConversions._
val coll1: java.util.Collection[Int] = Seq(1, 2, 3, 4, 5)
val intCollType = TypeExtractor.getForObject(coll1.iterator().next())
val srcFmt1 = new CollectionInputFormat(coll1, intCollType.createSerializer(streamEnvironment.getConfig))
val coll2: java.util.Collection[Int] = Seq(20, 21)
val srcFmt2 = new CollectionInputFormat(coll2, intCollType.createSerializer(streamEnvironment.getConfig))
val src1 = new InputFormatSourceFunction[Int](srcFmt1, intCollType)
val src2 = new InputFormatSourceFunction[Int](srcFmt2, intCollType)
private val value = SequentialSources(Seq(src1, src2))
val a = streamEnvironment.addSource(value)
a.map(x => println(s"X=$x")) // X=4, 5, 1, 2, 3 or something like that
streamEnvironment.execute()
StdIn.readLine()
}
@kell18
Copy link
Author

kell18 commented Feb 12, 2018

Error:
16:53:22 [flink-akka.actor.default-dispatcher-2] ERROR o.a.f.runtime.jobmanager.JobManager - No InputSplitAssigner for vertex ID bc764cd8ddf7a0cff126f51c16239658.
16:53:22 [flink-akka.actor.default-dispatcher-2] ERROR o.a.f.runtime.jobmanager.JobManager - No InputSplitAssigner for vertex ID bc764cd8ddf7a0cff126f51c16239658.
16:53:22 [Source: Custom Source (1/1)] INFO o.a.flink.runtime.taskmanager.Task - Source: Custom Source (1/1) (50b5c08b97e186346c1d629d00a13a5a) switched from RUNNING to FAILED.
java.lang.NullPointerException: null
at org.apache.flink.api.java.io.CollectionInputFormat.reachedEnd(CollectionInputFormat.java:64)
at ru.itclover.streammachine.http.SequentialSources$$anonfun$run$1.apply(Launcher.scala:26)
at ru.itclover.streammachine.http.SequentialSources$$anonfun$run$1.apply(Launcher.scala:23)
at scala.collection.immutable.List.foreach(List.scala:381)
at ru.itclover.streammachine.http.SequentialSources.run(Launcher.scala:23)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment