Skip to content

Instantly share code, notes, and snippets.

@vitojeng
Last active November 19, 2019 02:27
Show Gist options
  • Save vitojeng/6c75aa1c89f1532de51d749f5c829c23 to your computer and use it in GitHub Desktop.
Save vitojeng/6c75aa1c89f1532de51d749f5c829c23 to your computer and use it in GitHub Desktop.
[WIP]Learning Akka Stream Source API
import akka.actor.ActorSystem
import akka.stream.Attributes.name
import akka.stream.scaladsl._
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
import akka.stream.{ActorMaterializer, Attributes, Outlet, SourceShape}
class BaseStreamApp extends App {
implicit val system = ActorSystem("stream-test")
implicit val materializer = ActorMaterializer()
}
object SourceSingle extends BaseStreamApp {
val data = 100
// val source = Source.single(data)
val source = Source.fromGraph(new _SingleSource(data))
source.runForeach(println)
}
object SourceApply extends BaseStreamApp {
val data = 1 to 10
// val source = Source(data)
val source = Source.single(data)
.mapConcat(_ConstantFun.scalaIdentityFunction)
.withAttributes(_DefaultAttributes.iterableSource)
source.runForeach(println)
}
final class _SingleSource[T](val elem: T) extends GraphStage[SourceShape[T]] {
override def initialAttributes: Attributes = _DefaultAttributes.singleSource
//ReactiveStreamsCompliance.requireNonNullElement(elem)
val out = Outlet[T]("single.out")
val shape = SourceShape(out)
def createLogic(attr: Attributes) =
new GraphStageLogic(shape) with OutHandler {
def onPull(): Unit = {
push(out, elem)
completeStage()
}
setHandler(out, this)
}
override def toString: String = "SingleSource"
}
object _DefaultAttributes {
val iterableSource = name("iterableSource")
val singleSource = name("singleSource")
}
object _ConstantFun {
private val conforms = (a: Any) => a
def scalaIdentityFunction[T]: T => T = conforms.asInstanceOf[Function[T, T]]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment