Last active
November 17, 2017 18:49
-
-
Save regis-leray/fa42d4885a21376acc7088f0b26ef875 to your computer and use it in GitHub Desktop.
WriteChannelSink
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// with graph | |
private object WriteChannelSource { | |
def apply(channel: WriteChannel): WriteChannelSource = new WriteChannelSource(channel) | |
} | |
private class WriteChannelSource(channel: WriteChannel) extends GraphStage[SinkShape[ByteString]] { | |
val in: Inlet[ByteString] = Inlet.create[ByteString]("WriteChannelSource.in") | |
val shape: SinkShape[ByteString] = SinkShape.of(in) | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new WriteChannelStageLogic() | |
private class WriteChannelStageLogic extends GraphStageLogic(shape) with InHandler { | |
override def onPush(): Unit = { | |
val data = grab(in) | |
channel.write(data.toByteBuffer) | |
pull(in) | |
} | |
override def onUpstreamFinish(): Unit = { | |
super.onUpstreamFinish() | |
channel.close() | |
} | |
override def onUpstreamFailure(ex: Throwable): Unit = { | |
super.onUpstreamFailure(ex) | |
channel.close() | |
} | |
override def preStart(): Unit = { // initiate the flow of data by issuing a first pull on materialization: | |
pull(in) | |
} | |
setHandler(in, this) | |
} | |
} | |
// Or with DSL flavor | |
val writeChannel: WritableByteChannel = ??? | |
val sink = Sink.onComplete(_ => writeChannel.close()).mapMaterializedValue(_ => Future.successful(Done)) | |
Flow.fromFunction[ByteString, Unit](data => writeChannel.write(data.toByteBuffer)) | |
.toMat(sink)(Keep.right) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment