Last active
November 19, 2019 02:27
-
-
Save vitojeng/6c75aa1c89f1532de51d749f5c829c23 to your computer and use it in GitHub Desktop.
[WIP]Learning Akka Stream Source API
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.ActorSystem | |
import akka.stream.Attributes.name | |
import akka.stream.scaladsl._ | |
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler} | |
import akka.stream.{ActorMaterializer, Attributes, Outlet, SourceShape} | |
class BaseStreamApp extends App { | |
implicit val system = ActorSystem("stream-test") | |
implicit val materializer = ActorMaterializer() | |
} | |
object SourceSingle extends BaseStreamApp { | |
val data = 100 | |
// val source = Source.single(data) | |
val source = Source.fromGraph(new _SingleSource(data)) | |
source.runForeach(println) | |
} | |
object SourceApply extends BaseStreamApp { | |
val data = 1 to 10 | |
// val source = Source(data) | |
val source = Source.single(data) | |
.mapConcat(_ConstantFun.scalaIdentityFunction) | |
.withAttributes(_DefaultAttributes.iterableSource) | |
source.runForeach(println) | |
} | |
final class _SingleSource[T](val elem: T) extends GraphStage[SourceShape[T]] { | |
override def initialAttributes: Attributes = _DefaultAttributes.singleSource | |
//ReactiveStreamsCompliance.requireNonNullElement(elem) | |
val out = Outlet[T]("single.out") | |
val shape = SourceShape(out) | |
def createLogic(attr: Attributes) = | |
new GraphStageLogic(shape) with OutHandler { | |
def onPull(): Unit = { | |
push(out, elem) | |
completeStage() | |
} | |
setHandler(out, this) | |
} | |
override def toString: String = "SingleSource" | |
} | |
object _DefaultAttributes { | |
val iterableSource = name("iterableSource") | |
val singleSource = name("singleSource") | |
} | |
object _ConstantFun { | |
private val conforms = (a: Any) => a | |
def scalaIdentityFunction[T]: T => T = conforms.asInstanceOf[Function[T, T]] | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment