Skip to content

Instantly share code, notes, and snippets.

@mosser
Created July 22, 2015 09:01
Show Gist options
  • Save mosser/a7c40bec78a7c5b4fed0 to your computer and use it in GitHub Desktop.
Save mosser/a7c40bec78a7c5b4fed0 to your computer and use it in GitHub Desktop.
Statically Typed Pipeline Model
// Structural type model for sources, sinks and processors
// Pipeline := Source ->- Processors* ->- Sink
/**
* A flushable behavior is needed to flush the end of the data handling in the pipeline
*/
trait Flushable {
/**
* Method to flush the pipeline and end the data processing
*/
def flush()
}
/**
* A (flushable) data provider, connected to another element in the pipeline that consumes the provided data
* @tparam T the type of data this providers can feed the pipeline with
*/
trait Provider[T] extends Flushable {
/**
* the next element in the pipeline
*/
val next: Consumer[T]
}
/**
* A pipeline element that requires a data to proceed.
* @tparam T the type of data (contravariant: T' isSubClassOf T => Requires[T] isSubClassOf Requires[T'])
*/
trait Consumer[-T] extends Flushable {
/**
* The method used to push a data to this consumer
*/
def push(t: T): Unit
}
/**
* In the "pipe'n filter" pattern, the initial provider is named a "Source", and acts as an iterator over the data
*/
trait Source[T] extends Provider[T] {
/**
* The read method returns the next element available in the source, and move forward in the data set
* @return a data
*/
def read(): T
/**
* check if one can read from the data set
* @return true if data is still available, false elsewhere
*/
def canRead: Boolean
/**
* This method fires the pipeline reading everything that can be read from the source and flushing it at the end
*/
final def fire(): Unit = {
while (canRead) { next.push(read()) }
flush()
}
/**
* We propagate the flush to the next element (it will eventually reach a Sink)
*/
final def flush() = next.flush()
}
/**
* In the "pipe'n filter" pattern, the final consumer is named a "Sink"
*/
trait Sink[T] extends Consumer[T] { }
/**
* In the "pipe'n filter" pattern, intermediary elements are processors, transforming TIn to TOut
* @tparam TIn the required type to work with
* @tparam TOut the produced type given to the next consumer
*/
trait Processor[TIn, TOut] extends Consumer[TIn] with Provider[TOut] {
/**
* A processor applies a transformation of TIn to TOut to the data feed.
* @param data the data to process, typed as TIn
* @return the processed data, as TOut
*/
def transform(data: TIn): Option[TOut]
/**
* Pushing a data to a processor means to push the transformed data to the next element in the pipeline
* @param t the data to be pushed
*/
override final def push(t: TIn): Unit = transform(t) match {
case Some(out) => next.push(out)
case None =>
}
/**
* Flushing the processor means to propagate the flush to the next element (and eventually reach a sink)
*/
override final def flush(): Unit = next.flush()
}
/**********************************
** Example of pipeline elements **
**********************************/
// Source providing integers in [start, ..., end].
case class IntSource(var start: Int = 0, var end: Int, override val next: Consumer[Int]) extends Source[Int] {
override def read(): Int = { start += 1; start }
override def canRead: Boolean = start < end
}
// Print any value on the screen, flushing the printer means to write "done"
case class Printer() extends Sink[Any] {
override def push(t: Any): Unit = println(t)
override def flush(): Unit = println("done")
}
// Transforming an int to a float, with a given offset
case class ToFloat(offset: Float, override val next: Consumer[Float]) extends Processor[Int,Float] {
override def transform(data: Int): Option[Float] = Some(data.toFloat + offset)
}
// Transforming an int to a float, with a given offset
case class ToString(prefix: String, suffix: String, override val next: Consumer[String]) extends Processor[Any,String] {
override def transform(data: Any): Option[String] = Some(s"$prefix$data$suffix")
}
case class EvenOnly(override val next: Consumer[Int]) extends Processor[Int,Int] {
/**
* A processor applies a transformation of TIn to TOut to the data feed.
* @param data the data to process, typed as TIn
* @return the processed data, as TOut
*/
override def transform(data: Int): Option[Int] = if( data % 2 == 0) Some(data) else None
}
object Foo extends App {
val pipeline = IntSource(0,5,EvenOnly(ToFloat(2,ToString("**","**",Printer()))))
pipeline.fire()
}
@dsferruzza
Copy link

For what I understand, this solution would not fit because the state that "moves" into the pipeline has to be of the same pre-defined type during the whole process (?)

(This looks similar to data streams structures, like observables or iteratees, doesn't it?)

I tried again to write down what I have in mind.
For once I believe this can be understood by someone else: https://gist.github.com/dsferruzza/f53134b075d374a504f9

What do you think?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment