Created
July 22, 2015 09:01
-
-
Save mosser/a7c40bec78a7c5b4fed0 to your computer and use it in GitHub Desktop.
Statically Typed Pipeline Model
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Structural type model for sources, sinks and processors | |
// Pipeline := Source ->- Processors* ->- Sink | |
/** | |
* A flushable behavior is needed to flush the end of the data handling in the pipeline | |
*/ | |
trait Flushable { | |
/** | |
* Method to flush the pipeline and end the data processing | |
*/ | |
def flush() | |
} | |
/** | |
* A (flushable) data provider, connected to another element in the pipeline that consumes the provided data | |
* @tparam T the type of data this providers can feed the pipeline with | |
*/ | |
trait Provider[T] extends Flushable { | |
/** | |
* the next element in the pipeline | |
*/ | |
val next: Consumer[T] | |
} | |
/** | |
* A pipeline element that requires a data to proceed. | |
* @tparam T the type of data (contravariant: T' isSubClassOf T => Requires[T] isSubClassOf Requires[T']) | |
*/ | |
trait Consumer[-T] extends Flushable { | |
/** | |
* The method used to push a data to this consumer | |
*/ | |
def push(t: T): Unit | |
} | |
/** | |
* In the "pipe'n filter" pattern, the initial provider is named a "Source", and acts as an iterator over the data | |
*/ | |
trait Source[T] extends Provider[T] { | |
/** | |
* The read method returns the next element available in the source, and move forward in the data set | |
* @return a data | |
*/ | |
def read(): T | |
/** | |
* check if one can read from the data set | |
* @return true if data is still available, false elsewhere | |
*/ | |
def canRead: Boolean | |
/** | |
* This method fires the pipeline reading everything that can be read from the source and flushing it at the end | |
*/ | |
final def fire(): Unit = { | |
while (canRead) { next.push(read()) } | |
flush() | |
} | |
/** | |
* We propagate the flush to the next element (it will eventually reach a Sink) | |
*/ | |
final def flush() = next.flush() | |
} | |
/** | |
* In the "pipe'n filter" pattern, the final consumer is named a "Sink" | |
*/ | |
trait Sink[T] extends Consumer[T] { } | |
/** | |
* In the "pipe'n filter" pattern, intermediary elements are processors, transforming TIn to TOut | |
* @tparam TIn the required type to work with | |
* @tparam TOut the produced type given to the next consumer | |
*/ | |
trait Processor[TIn, TOut] extends Consumer[TIn] with Provider[TOut] { | |
/** | |
* A processor applies a transformation of TIn to TOut to the data feed. | |
* @param data the data to process, typed as TIn | |
* @return the processed data, as TOut | |
*/ | |
def transform(data: TIn): Option[TOut] | |
/** | |
* Pushing a data to a processor means to push the transformed data to the next element in the pipeline | |
* @param t the data to be pushed | |
*/ | |
override final def push(t: TIn): Unit = transform(t) match { | |
case Some(out) => next.push(out) | |
case None => | |
} | |
/** | |
* Flushing the processor means to propagate the flush to the next element (and eventually reach a sink) | |
*/ | |
override final def flush(): Unit = next.flush() | |
} | |
/********************************** | |
** Example of pipeline elements ** | |
**********************************/ | |
// Source providing integers in [start, ..., end]. | |
case class IntSource(var start: Int = 0, var end: Int, override val next: Consumer[Int]) extends Source[Int] { | |
override def read(): Int = { start += 1; start } | |
override def canRead: Boolean = start < end | |
} | |
// Print any value on the screen, flushing the printer means to write "done" | |
case class Printer() extends Sink[Any] { | |
override def push(t: Any): Unit = println(t) | |
override def flush(): Unit = println("done") | |
} | |
// Transforming an int to a float, with a given offset | |
case class ToFloat(offset: Float, override val next: Consumer[Float]) extends Processor[Int,Float] { | |
override def transform(data: Int): Option[Float] = Some(data.toFloat + offset) | |
} | |
// Transforming an int to a float, with a given offset | |
case class ToString(prefix: String, suffix: String, override val next: Consumer[String]) extends Processor[Any,String] { | |
override def transform(data: Any): Option[String] = Some(s"$prefix$data$suffix") | |
} | |
case class EvenOnly(override val next: Consumer[Int]) extends Processor[Int,Int] { | |
/** | |
* A processor applies a transformation of TIn to TOut to the data feed. | |
* @param data the data to process, typed as TIn | |
* @return the processed data, as TOut | |
*/ | |
override def transform(data: Int): Option[Int] = if( data % 2 == 0) Some(data) else None | |
} | |
object Foo extends App { | |
val pipeline = IntSource(0,5,EvenOnly(ToFloat(2,ToString("**","**",Printer())))) | |
pipeline.fire() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
For what I understand, this solution would not fit because the state that "moves" into the pipeline has to be of the same pre-defined type during the whole process (?)
(This looks similar to data streams structures, like observables or iteratees, doesn't it?)
I tried again to write down what I have in mind.
For once I believe this can be understood by someone else: https://gist.github.com/dsferruzza/f53134b075d374a504f9
What do you think?