Skip to content

Instantly share code, notes, and snippets.

@ochafik
Last active August 29, 2015 14:20
Show Gist options
  • Select an option

  • Save ochafik/841b746615c0aede2145 to your computer and use it in GitHub Desktop.

Select an option

Save ochafik/841b746615c0aede2145 to your computer and use it in GitHub Desktop.
libraryDependencies += "io.reactivex" %% "rxscala" % "0.24.1"
libraryDependencies += "com.lmax" % "disruptor" % "3.3.2"
libraryDependencies += "org.reactivestreams" % "reactive-streams" % "1.0.0.RC4"
libraryDependencies += "com.nativelibs4java" %% "scalaxy-reified" % "0.4-SNAPSHOT"
libraryDependencies += "com.nativelibs4java" %% "scalaxy-generic" % "0.4-SNAPSHOT"
libraryDependencies += "org.scalatest" % "scalatest_2.11" % "2.2.3" % "test"
libraryDependencies += "org.scalamock" % "scalamock-scalatest-support_2.11" % "3.1.2" % "test"
resolvers += Resolver.sonatypeRepo("snapshots")
resolvers += Resolver.defaultLocal
scalaVersion := "2.11.6"
fork := true
// lazy val test = (project in file(".")).
// dependsOn(reified, generic).
// aggregate(reified, generic)
// lazy val reified = (project in file("scalaxy-reified")).
// dependsOn(generic)
// lazy val generic = (project in file("scalaxy-generic"))
package example
import rx.lang.scala.Observable
import scala.reflect.runtime.universe.TypeTag
import scalaxy.generic._
import scalaxy.reified._
trait Reduction[A, R] {
/** Initial result accumulator value. */
val initialResult: Reified[() => R]
/** Result + new value combination function. */
val combine: Reified[(R, A) => R]
}
object Reduction {
def byGroup[A : TypeTag,
G : Ordering : TypeTag,
R : TypeTag]
(getGroup: Reified[A => G],
op: Reduction[A, R])
: Reified[Observable[A] => Observable[R]] =
reified { o =>
Observable[R] { subscriber =>
var result: R = op.initialResult()
// TODO(ochafik): Check these are implemented as fields in same class as local defs (otherwise, create that class explicitly)
var gotPendingValues = false
var gotGroup = false
var lastGroup: G = null.asInstanceOf[G]
def flushGroup() {
if (gotPendingValues) {
subscriber.onNext(result)
result = op.initialResult()
gotPendingValues = false
}
}
def onNext(item: A) {
val group = getGroup(item)
if (gotGroup && group != lastGroup) {
// TODO(ochafik): Elide this check for performance (assert?).
if (lastGroup > group) {
sys.error(s"Group are not ordered: last group was $lastGroup, new group for item $item is $group")
}
flushGroup()
} else {
lastGroup = group
gotGroup = true
}
result = op.combine(result, item)
gotPendingValues = true
}
def onComplete() = {
flushGroup()
subscriber.onCompleted()
}
o.foreach(
onNext = onNext,
onError = subscriber.onError,
onComplete = onComplete
)
}
}
}
case class Sum
[A : TypeTag, R : TypeTag : Generic]
(getValue: Reified[A => R])
extends Reduction[A, R]
{
override val initialResult = reified { () => zero[R] }
override val combine = reified {
(result: R, value: A) =>
val v = getValue(value)
result + v
}
}
case class Min
[A : TypeTag, R : TypeTag : Generic]
(getValue: Reified[A => R])
extends Reduction[A, R]
{
override val initialResult = reified { () => maxValue[R] }
override val combine = reified {
(result: R, value: A) =>
val v = getValue(value)
if (result < v) result else v
}
}
case class Max
[A : TypeTag, R : TypeTag : Generic]
(getValue: Reified[A => R])
extends Reduction[A, R]
{
override val initialResult = reified { () => minValue[R] }
override val combine = reified {
(result: R, value: A) =>
val v = getValue(value)
if (result > v) result else v
}
}
package example
package test
import java.util.Date
import org.scalatest.{ FlatSpecLike, Matchers }
import rx.lang.scala.Observable
import scala.reflect.runtime.universe._
import scala.tools.reflect.ToolBoxError
import scalaxy.reified._
import scalaxy.generic._
import scala.concurrent.duration._
class Test extends FlatSpecLike with Matchers {
behavior of "rxtest"
class Row(values: Any*) {
def getDate(index: Int): Date = values(index).asInstanceOf[Date]
def getDouble(index: Int): Double = values(index).asInstanceOf[Double]
def getInt(index: Int): Int = values(index).asInstanceOf[Int]
def getString(index: Int): String = values(index).asInstanceOf[String]
}
//abstract class CanSerialize[T](implicit val typeTag: TypeTag[T]) {
case class CanSerialize
[T : TypeTag]
(fromRow: Reified[(Row, Int) => T])
{
def atIndex(index: Int): Reified[Row => T] =
(row: Row) => this.fromRow(row, index)
}
val intDeserializer = new CanSerialize[Int](
fromRow = (row: Row, index: Int) => row.getString(index).toInt
)
val doubleDeserializer = new CanSerialize[Double](
fromRow = (row: Row, index: Int) => row.getDouble(index)
)
val dateDeserializer = new CanSerialize[Date](
fromRow = (row: Row, index: Int) => row.getDate(index)
)
val directDeserializer = new CanSerialize[String](
fromRow = (row: Row, index: Int) => row.getString(index)
)
it should "work" in {
// def reduceKeyValue
// [K : TypeTag, G : TypeTag, V : Generic : TypeTag]
// (keyGetter: CanSerialize[K],
// grouper: Reified[K => G],
// valueSerializer: CanSerialize[V])
// : Reified[Observable[Row] => Observable[V]] = {
// Reduction.byGroup[Row, G, V](
// getGroup = reified { (row: Row) => grouper(keySerializer.fromRow(row, 0)) },
// op = new Sum[Row, V]((row: Row) => valueSerializer.fromRow(row, 1)))
// }
def reduceKeyValue
[K : TypeTag, G : TypeTag, V : Generic : TypeTag]
(keyGetter: Reified[Row => K],
valueGetter: Reified[Row => V],
groupGetter: Reified[K => G])
: Reified[Observable[Row] => Observable[V]] = {
Reduction.byGroup[Row, G, V](
getGroup = (row: Row) => groupGetter(keyGetter(row)),
op = new Sum[Row, V](valueGetter)
)
}
val reducer = reduceKeyValue(
keyGetter = dateDeserializer.atIndex(0),
valueGetter = doubleDeserializer.atIndex(1),
groupGetter = (_: Date).getMonth
)
val compiledReducer = reducer.compile()()
def input: Observable[Row] = Observable.from(List(
new Row(new Date(1), 1.1),
new Row(new Date(2), 2.2),
new Row(new Date(3), 3.3)
))
val a = reducer(input).toBlocking.toList
val b = compiledReducer(input).toBlocking.toList
println(s"""
compiled: $a
uncompiled: $b
""")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment