This code is the RxJava equivalent of the Spark updateStateByKey
operation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package interop; | |
public interface JavaData { | |
String getValue(); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package actors | |
import scala.collection.immutable.Seq | |
import akka.actor.Actor | |
import akka.actor.ActorLogging | |
import akka.actor.ActorRef | |
import akka.actor.ActorSystem | |
import akka.actor.Props | |
import akka.actor.actorRef2Scala | |
import akka.stream.ActorFlowMaterializer |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package scodec | |
import scala.Vector | |
import scala.collection.immutable.Seq | |
import akka.actor.ActorSystem | |
import akka.stream.ActorFlowMaterializer | |
import akka.stream.scaladsl.Source | |
import akka.stream.stage.Context | |
import akka.stream.stage.PushStage |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.stream.stage.StatefulStage | |
import akka.util.ByteString | |
import akka.stream.stage.Context | |
import akka.stream.stage.SyncDirective | |
import scala.annotation.tailrec | |
import akka.stream.ActorFlowMaterializer | |
import akka.actor.ActorSystem | |
import akka.stream.scaladsl.Source | |
import akka.stream.scaladsl.Sink | |
import scala.concurrent.Await |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object Framed extends App { | |
def parseFrames = | |
() => new StatefulStage[ByteString, ByteString] { | |
private var buffer = ByteString.empty | |
def initial = new State { | |
override def onPush(chunk: ByteString, ctx: Context[ByteString]): SyncDirective = { | |
buffer ++= chunk | |
emit(doFraming(Vector.empty).iterator, ctx) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scodec.bits._ | |
object Unfolder extends App { | |
case class S(size:Int) { | |
def next:Option[(BitVector,S)] = | |
if (size > 0) | |
Some((BitVector(size), S(size-1))) | |
else | |
None |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rx.lang.scala._ | |
import java.net.Socket | |
import scala.util.Try | |
import scala.io.Source | |
object SocketReactive extends App { | |
val s= Observable.using[Char,Socket](new Socket("localhost",1234))( | |
socket => Observable.from[Char](Source.fromInputStream(socket.getInputStream).toIterable), | |
socket=>Try{socket.close}) |
This code allows the usage of an scodec Codec with the Apache Spark StreamingContext.socketStream.
Note how the head of bitvector is not retained, allowing GC of the consumed BitVector pieces.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class RangeDiscriminatorCodec[T](start: Int, end: Int, tag: Codec[Int], child: Codec[Int ~ T]) extends Codec[Int ~ T] { | |
override def sizeBound = tag.sizeBound | child.sizeBound | |
override def encode(value: Int ~ T) = value match { | |
case t ~ _ if t >= start && t <= end => child.encode(value) | |
case _ => Attempt.failure(Err("out of range")) | |
} | |
override def decode(bits: BitVector) = tag.decode(bits) match { | |
case Attempt.Successful(DecodeResult(value, remainder)) => | |
if (value >= start && value <= end) | |
child.decode(remainder) |