Skip to content

Instantly share code, notes, and snippets.

@SF-300
Last active August 29, 2015 14:19
Show Gist options
  • Select an option

  • Save SF-300/2aa30abb934b05729bda to your computer and use it in GitHub Desktop.

Select an option

Save SF-300/2aa30abb934b05729bda to your computer and use it in GitHub Desktop.
Akka-reactive-streams ByteString comparator junction for arbitary number of inputs. Emits stream of boolean true until first mismatch occured/main input terminated, then emits single false value and terminates.
import akka.util.ByteString
import akka.stream._
import FanInShape.Name
import FanInShape.Init
class BinaryComparatorPorts(n: Int, _init: Init[Boolean] = Name("BinaryComparator")) extends UniformFanInShape[ByteString, Boolean](n, _init) {
// comparison target; other ports are used for possible duplicates
val src = newInlet[ByteString]("src")
protected override def construct(init: Init[Boolean]) = new BinaryComparatorPorts(n, init)
}
class BinaryComparator(n: Int) extends FlexiMerge[Boolean, BinaryComparatorPorts](new BinaryComparatorPorts(n), OperationAttributes.name("BinaryComparator")) {
import scala.collection.mutable
import akka.stream.scaladsl.FlexiMerge._
override def createMergeLogic(p: PortT) = new MergeLogic[Boolean] {
val activePorts = mutable.ArrayBuffer() ++= p.inlets
def hanlder(ctx: MergeLogicContext, inPort: InPort, inputs: ReadAllInputs): State[ReadAllInputs] = {
val compareTo: ByteString = inputs(p.src)
var duplicates: Int = 0
for(port <- activePorts-p.src) {
if(inputs(port) != compareTo){
ctx.cancel(port)
activePorts -= port
}
else duplicates += 1
}
if(duplicates == 0) {
ctx.emit(false)
// println("Terminate self from handle")
ctx.finish()
SameState
} else {
ctx.emit(true)
State(ReadAll(activePorts: _*))(hanlder)
}
}
override def initialState = State(ReadAll(activePorts: _*))(hanlder)
override def initialCompletionHandling = CompletionHandling(
onUpstreamFinish = (ctx, port) => {
// println("Upstream finish")
activePorts -= port.asInstanceOf[Inlet[ByteString]]
if(port == p.src || activePorts.length == 1) {
// println("Terminate self from onUpstreamFinish")
// nothing to compare anymore
ctx.finish()
}
State(ReadAll(activePorts: _*))(hanlder)
},
onUpstreamFailure = (ctx, port, cause) => {
// println("Upstream fail")
ctx.fail(cause)
SameState
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment