-
-
Save ochafik/841b746615c0aede2145 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| libraryDependencies += "io.reactivex" %% "rxscala" % "0.24.1" | |
| libraryDependencies += "com.lmax" % "disruptor" % "3.3.2" | |
| libraryDependencies += "org.reactivestreams" % "reactive-streams" % "1.0.0.RC4" | |
| libraryDependencies += "com.nativelibs4java" %% "scalaxy-reified" % "0.4-SNAPSHOT" | |
| libraryDependencies += "com.nativelibs4java" %% "scalaxy-generic" % "0.4-SNAPSHOT" | |
| libraryDependencies += "org.scalatest" % "scalatest_2.11" % "2.2.3" % "test" | |
| libraryDependencies += "org.scalamock" % "scalamock-scalatest-support_2.11" % "3.1.2" % "test" | |
| resolvers += Resolver.sonatypeRepo("snapshots") | |
| resolvers += Resolver.defaultLocal | |
| scalaVersion := "2.11.6" | |
| fork := true | |
| // lazy val test = (project in file(".")). | |
| // dependsOn(reified, generic). | |
| // aggregate(reified, generic) | |
| // lazy val reified = (project in file("scalaxy-reified")). | |
| // dependsOn(generic) | |
| // lazy val generic = (project in file("scalaxy-generic")) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package example | |
| import rx.lang.scala.Observable | |
| import scala.reflect.runtime.universe.TypeTag | |
| import scalaxy.generic._ | |
| import scalaxy.reified._ | |
| trait Reduction[A, R] { | |
| /** Initial result accumulator value. */ | |
| val initialResult: Reified[() => R] | |
| /** Result + new value combination function. */ | |
| val combine: Reified[(R, A) => R] | |
| } | |
| object Reduction { | |
| def byGroup[A : TypeTag, | |
| G : Ordering : TypeTag, | |
| R : TypeTag] | |
| (getGroup: Reified[A => G], | |
| op: Reduction[A, R]) | |
| : Reified[Observable[A] => Observable[R]] = | |
| reified { o => | |
| Observable[R] { subscriber => | |
| var result: R = op.initialResult() | |
| // TODO(ochafik): Check these are implemented as fields in same class as local defs (otherwise, create that class explicitly) | |
| var gotPendingValues = false | |
| var gotGroup = false | |
| var lastGroup: G = null.asInstanceOf[G] | |
| def flushGroup() { | |
| if (gotPendingValues) { | |
| subscriber.onNext(result) | |
| result = op.initialResult() | |
| gotPendingValues = false | |
| } | |
| } | |
| def onNext(item: A) { | |
| val group = getGroup(item) | |
| if (gotGroup && group != lastGroup) { | |
| // TODO(ochafik): Elide this check for performance (assert?). | |
| if (lastGroup > group) { | |
| sys.error(s"Group are not ordered: last group was $lastGroup, new group for item $item is $group") | |
| } | |
| flushGroup() | |
| } else { | |
| lastGroup = group | |
| gotGroup = true | |
| } | |
| result = op.combine(result, item) | |
| gotPendingValues = true | |
| } | |
| def onComplete() = { | |
| flushGroup() | |
| subscriber.onCompleted() | |
| } | |
| o.foreach( | |
| onNext = onNext, | |
| onError = subscriber.onError, | |
| onComplete = onComplete | |
| ) | |
| } | |
| } | |
| } | |
| case class Sum | |
| [A : TypeTag, R : TypeTag : Generic] | |
| (getValue: Reified[A => R]) | |
| extends Reduction[A, R] | |
| { | |
| override val initialResult = reified { () => zero[R] } | |
| override val combine = reified { | |
| (result: R, value: A) => | |
| val v = getValue(value) | |
| result + v | |
| } | |
| } | |
| case class Min | |
| [A : TypeTag, R : TypeTag : Generic] | |
| (getValue: Reified[A => R]) | |
| extends Reduction[A, R] | |
| { | |
| override val initialResult = reified { () => maxValue[R] } | |
| override val combine = reified { | |
| (result: R, value: A) => | |
| val v = getValue(value) | |
| if (result < v) result else v | |
| } | |
| } | |
| case class Max | |
| [A : TypeTag, R : TypeTag : Generic] | |
| (getValue: Reified[A => R]) | |
| extends Reduction[A, R] | |
| { | |
| override val initialResult = reified { () => minValue[R] } | |
| override val combine = reified { | |
| (result: R, value: A) => | |
| val v = getValue(value) | |
| if (result > v) result else v | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package example | |
| package test | |
| import java.util.Date | |
| import org.scalatest.{ FlatSpecLike, Matchers } | |
| import rx.lang.scala.Observable | |
| import scala.reflect.runtime.universe._ | |
| import scala.tools.reflect.ToolBoxError | |
| import scalaxy.reified._ | |
| import scalaxy.generic._ | |
| import scala.concurrent.duration._ | |
| class Test extends FlatSpecLike with Matchers { | |
| behavior of "rxtest" | |
| class Row(values: Any*) { | |
| def getDate(index: Int): Date = values(index).asInstanceOf[Date] | |
| def getDouble(index: Int): Double = values(index).asInstanceOf[Double] | |
| def getInt(index: Int): Int = values(index).asInstanceOf[Int] | |
| def getString(index: Int): String = values(index).asInstanceOf[String] | |
| } | |
| //abstract class CanSerialize[T](implicit val typeTag: TypeTag[T]) { | |
| case class CanSerialize | |
| [T : TypeTag] | |
| (fromRow: Reified[(Row, Int) => T]) | |
| { | |
| def atIndex(index: Int): Reified[Row => T] = | |
| (row: Row) => this.fromRow(row, index) | |
| } | |
| val intDeserializer = new CanSerialize[Int]( | |
| fromRow = (row: Row, index: Int) => row.getString(index).toInt | |
| ) | |
| val doubleDeserializer = new CanSerialize[Double]( | |
| fromRow = (row: Row, index: Int) => row.getDouble(index) | |
| ) | |
| val dateDeserializer = new CanSerialize[Date]( | |
| fromRow = (row: Row, index: Int) => row.getDate(index) | |
| ) | |
| val directDeserializer = new CanSerialize[String]( | |
| fromRow = (row: Row, index: Int) => row.getString(index) | |
| ) | |
| it should "work" in { | |
| // def reduceKeyValue | |
| // [K : TypeTag, G : TypeTag, V : Generic : TypeTag] | |
| // (keyGetter: CanSerialize[K], | |
| // grouper: Reified[K => G], | |
| // valueSerializer: CanSerialize[V]) | |
| // : Reified[Observable[Row] => Observable[V]] = { | |
| // Reduction.byGroup[Row, G, V]( | |
| // getGroup = reified { (row: Row) => grouper(keySerializer.fromRow(row, 0)) }, | |
| // op = new Sum[Row, V]((row: Row) => valueSerializer.fromRow(row, 1))) | |
| // } | |
| def reduceKeyValue | |
| [K : TypeTag, G : TypeTag, V : Generic : TypeTag] | |
| (keyGetter: Reified[Row => K], | |
| valueGetter: Reified[Row => V], | |
| groupGetter: Reified[K => G]) | |
| : Reified[Observable[Row] => Observable[V]] = { | |
| Reduction.byGroup[Row, G, V]( | |
| getGroup = (row: Row) => groupGetter(keyGetter(row)), | |
| op = new Sum[Row, V](valueGetter) | |
| ) | |
| } | |
| val reducer = reduceKeyValue( | |
| keyGetter = dateDeserializer.atIndex(0), | |
| valueGetter = doubleDeserializer.atIndex(1), | |
| groupGetter = (_: Date).getMonth | |
| ) | |
| val compiledReducer = reducer.compile()() | |
| def input: Observable[Row] = Observable.from(List( | |
| new Row(new Date(1), 1.1), | |
| new Row(new Date(2), 2.2), | |
| new Row(new Date(3), 3.3) | |
| )) | |
| val a = reducer(input).toBlocking.toList | |
| val b = compiledReducer(input).toBlocking.toList | |
| println(s""" | |
| compiled: $a | |
| uncompiled: $b | |
| """) | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment