Skip to content

Instantly share code, notes, and snippets.

@searler
searler / PeekCodec.markdown
Created March 21, 2015 19:30
Codec to extract value w/o consuming source BitVector

The scodec discriminated.by(tag).typecase(n,message) does not suffice to match all message structures:

  1. tag is at start of structure
  2. value of tag is not required by the message codec.

The PeekCodec uses the target codec to extract a value, but returns the original BitVector so the tag value can be re-decoded by a subsequent codec.

@searler
searler / RangeDiscriminatorCodec.scala
Created March 21, 2015 19:39
Scodec Discrimination by tag value range
class RangeDiscriminatorCodec[T](start: Int, end: Int, tag: Codec[Int], child: Codec[Int ~ T]) extends Codec[Int ~ T] {
override def sizeBound = tag.sizeBound | child.sizeBound
override def encode(value: Int ~ T) = value match {
case t ~ _ if t >= start && t <= end => child.encode(value)
case _ => Attempt.failure(Err("out of range"))
}
override def decode(bits: BitVector) = tag.decode(bits) match {
case Attempt.Successful(DecodeResult(value, remainder)) =>
if (value >= start && value <= end)
child.decode(remainder)
@searler
searler / Iterating.markdown
Created March 24, 2015 03:05
Wrap a Codec in an Iterator that decodes InputStream

This code allows the usage of an scodec Codec with the Apache Spark StreamingContext.socketStream.

Note how the head of bitvector is not retained, allowing GC of the consumed BitVector pieces.

@searler
searler / SocketReactive.scala
Last active August 29, 2015 14:19
Create RxJava Observable from Socket
import rx.lang.scala._
import java.net.Socket
import scala.util.Try
import scala.io.Source
object SocketReactive extends App {
val s= Observable.using[Char,Socket](new Socket("localhost",1234))(
socket => Observable.from[Char](Source.fromInputStream(socket.getInputStream).toIterable),
socket=>Try{socket.close})
@searler
searler / GroupScan.md
Created April 25, 2015 14:48
RxJava groupBy and scan

This code is the RxJava equivalent of the Spark updateStateByKey operation

@searler
searler / Unfolder.scala
Created June 3, 2015 22:26
Simple example for scodec BitVector unfold
import scodec.bits._
object Unfolder extends App {
case class S(size:Int) {
def next:Option[(BitVector,S)] =
if (size > 0)
Some((BitVector(size), S(size-1)))
else
None
@searler
searler / Framed.scala
Created June 3, 2015 22:28
Extracting length delimited ByteStrings from Akka reactive stream
object Framed extends App {
def parseFrames =
() => new StatefulStage[ByteString, ByteString] {
private var buffer = ByteString.empty
def initial = new State {
override def onPush(chunk: ByteString, ctx: Context[ByteString]): SyncDirective = {
buffer ++= chunk
emit(doFraming(Vector.empty).iterator, ctx)
@searler
searler / Decoded.scala
Created June 3, 2015 22:29
Apply scodec Decoder to an Akka reactive stream
import akka.stream.stage.StatefulStage
import akka.util.ByteString
import akka.stream.stage.Context
import akka.stream.stage.SyncDirective
import scala.annotation.tailrec
import akka.stream.ActorFlowMaterializer
import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.Sink
import scala.concurrent.Await
@searler
searler / Encoded.scala
Created June 4, 2015 21:44
Apply scodec encoder to Akka reactive stream
package scodec
import scala.Vector
import scala.collection.immutable.Seq
import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.Source
import akka.stream.stage.Context
import akka.stream.stage.PushStage
@searler
searler / CentralFlowed.scala
Created June 4, 2015 21:45
Attempt to redirect Akka reactive stream Flow via actors
package actors
import scala.collection.immutable.Seq
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.actorRef2Scala
import akka.stream.ActorFlowMaterializer