Last active
September 22, 2015 15:19
-
-
Save lancearlaus/e6e52fc8c7ca534cb026 to your computer and use it in GitHub Desktop.
Akka user list examples
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env scalas | |
// NOTE: This is a self-encapsulated Scala script meant to be run with scalas | |
// See http://www.scala-sbt.org/0.13/docs/Scripts.html | |
/*** | |
scalaVersion := "2.11.6" | |
resolvers += Resolver.url("typesafe-ivy-repo", url("http://typesafe.artifactoryonline.com/typesafe/releases"))(Resolver.ivyStylePatterns) | |
libraryDependencies ++= Seq( | |
"com.typesafe.scala-logging" %% "scala-logging" % "3.+", | |
"com.typesafe.akka" %% "akka-agent" % "2.3.12", | |
"com.typesafe.akka" %% "akka-stream-experimental" % "1.0", | |
"com.typesafe.akka" %% "akka-http-experimental" % "1.0", | |
"org.scalatest" %% "scalatest" % "2.2.4" | |
) | |
*/ | |
import com.typesafe.scalalogging.StrictLogging | |
import akka.actor.{ActorSystem, Cancellable} | |
import akka.agent.Agent | |
import akka.pattern.after | |
import akka.stream._ | |
import akka.stream.io._ | |
import akka.stream.stage._ | |
import akka.stream.scaladsl.FlowGraph.Implicits._ | |
import akka.stream.scaladsl._ | |
import akka.http.scaladsl.model.HttpRequest | |
import akka.http.scaladsl.model.HttpRequest | |
import akka.http.scaladsl.model.headers.{Authorization, OAuth2BearerToken} | |
import akka.util.ByteString | |
import org.scalatest._ | |
import org.scalatest.concurrent.ScalaFutures | |
import org.scalatest.prop.TableDrivenPropertyChecks._ | |
import org.scalactic.Tolerance._ | |
import org.scalactic.TripleEqualsSupport.Spread | |
import scala.collection.immutable._ | |
import scala.collection.mutable | |
import scala.concurrent.{ExecutionContext, Future, Promise} | |
import scala.concurrent.duration._ | |
import scala.util.{Random, Try, Success, Failure} | |
import java.util.concurrent.atomic.AtomicBoolean | |
import java.io.ByteArrayOutputStream | |
import java.net.URL | |
case class Message[T](id: Long, body: T) | |
trait Queue { | |
def acknowledge(id: Long): Unit | |
} | |
type Handler[T] = Flow[Message[T], Try[_], _] | |
type AckSink = Sink[(Message[_], Try[_]), Future[_]] | |
def ackSink(queue: Queue) = | |
Sink.foreach[(Message[_], Try[_])] { | |
case (msg, result) => result match { | |
case Success(_) => queue.acknowledge(msg.id) | |
case Failure(t) => { | |
// Do something on failure | |
println(t) | |
} | |
} | |
} | |
def handleAndAckSink[T](handler: Handler[T], ackSink: AckSink) = | |
Sink(handler, ackSink, Broadcast[Message[T]](2), Zip[Message[T], Try[_]])((_, mat, _, _) => mat) { | |
implicit b => (handler, ackSink, bcast, zip) => | |
bcast ~> zip.in0 | |
bcast ~> handler ~> zip.in1 | |
zip.out ~> ackSink | |
(bcast.in) | |
} | |
// Create/destroy an actor system for testing | |
trait AkkaStreamsImplicits extends BeforeAndAfterAll { this: Suite => | |
implicit var system: ActorSystem = _ | |
implicit var materializer: Materializer = _ | |
override def beforeAll = { | |
super.beforeAll | |
system = ActorSystem(this.getClass.getSimpleName.replace("$", "_")) | |
materializer = ActorMaterializer() | |
} | |
override def afterAll = { | |
system.shutdown | |
super.afterAll | |
} | |
} | |
class AckSpec extends FlatSpec with AkkaStreamsImplicits with Matchers with ScalaFutures { | |
def testSource(n: Int) = Source((0 to n)).map(n => Message(n, s"message $n")) | |
val testQueue = new Queue { | |
def acknowledge(id: Long) = println(s"acknowledging message $id") | |
} | |
val testHandler = Flow[Message[String]].map { msg => | |
// Randomly fail | |
if (Random.nextBoolean) Failure(new Exception(s"failure processing message $msg")) | |
else Success(s"success processing message $msg") | |
} | |
"Acknowledge" should "ack messages" in { | |
val future = testSource(10).runWith(handleAndAckSink(testHandler, ackSink(testQueue))) | |
whenReady(future) { result => | |
} | |
} | |
} | |
// Run the test case | |
run(new AckSpec) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env scalas | |
// NOTE: This is a self-encapsulated Scala script meant to be run with scalas | |
// See http://www.scala-sbt.org/0.13/docs/Scripts.html | |
/*** | |
scalaVersion := "2.11.6" | |
resolvers += Resolver.url("typesafe-ivy-repo", url("http://typesafe.artifactoryonline.com/typesafe/releases"))(Resolver.ivyStylePatterns) | |
libraryDependencies ++= Seq( | |
"com.typesafe.scala-logging" %% "scala-logging" % "3.+", | |
"com.typesafe.akka" % "akka-stream-experimental_2.11" % "1.0", | |
"org.scalatest" %% "scalatest" % "2.2.4" | |
) | |
*/ | |
import com.typesafe.scalalogging.StrictLogging | |
import akka.actor.ActorSystem | |
import akka.stream._ | |
import akka.stream.io._ | |
import akka.stream.stage._ | |
import akka.stream.scaladsl.FlowGraph.Implicits._ | |
import akka.stream.scaladsl._ | |
import akka.util.ByteString | |
import org.scalatest._ | |
import org.scalatest.concurrent.ScalaFutures | |
import org.scalatest.prop.TableDrivenPropertyChecks._ | |
import org.scalactic.Tolerance._ | |
import org.scalactic.TripleEqualsSupport.Spread | |
import scala.collection.immutable._ | |
import scala.collection.mutable | |
import scala.concurrent.{ExecutionContext, Future} | |
import scala.util.Random | |
import java.io.ByteArrayOutputStream | |
// Creates an unbounded random alphanumeric source with a known seed (for repeatability) | |
def randomAlphaSource(seed: Int) = Source(() => new Random(seed).alphanumeric.iterator) | |
// Split input stream and write to a set of input-specific output streams | |
def splitWrite(in: Source[Char, Unit])(implicit m: Materializer): RunnableGraph[Future[Map[Char, ByteArrayOutputStream]]] = { | |
implicit val ec: ExecutionContext = m.executionContext | |
val outputMap = mutable.Map[Char, ByteArrayOutputStream]() | |
// Creates a new output stream sink for the given character | |
// Each stream is backed by a byte array output stream, but these could just as easily | |
// be file output streams to stream data to disk instead | |
def charSink(c: Char) = OutputStreamSink(() => outputMap.getOrElseUpdate(c, new ByteArrayOutputStream())) | |
// Split the input stream into a set of character-specific streams | |
// Note that that resulting flows must be run to ensure consumption (see groupBy docs) | |
val groupBy = in.groupBy(c => c).map { | |
case (c, source) => source.map(c => ByteString(c.toByte)).toMat(charSink(c))(Keep.right).run() | |
} | |
// Return a runnable graph that materializes into a future containing the output map | |
groupBy.toMat(Sink.ignore.mapMaterializedValue(_.map(_ => outputMap.toMap)))(Keep.right) | |
} | |
// Create/destroy an actor system for testing | |
trait AkkaStreamsImplicits extends BeforeAndAfterAll { this: Suite => | |
implicit var system: ActorSystem = _ | |
implicit var materializer: Materializer = _ | |
override def beforeAll = { | |
super.beforeAll | |
system = ActorSystem(this.getClass.getSimpleName.replace("$", "_")) | |
materializer = ActorMaterializer() | |
} | |
override def afterAll = { | |
system.shutdown | |
super.afterAll | |
} | |
} | |
class SplitWriteSpec extends FlatSpec with AkkaStreamsImplicits with Matchers with ScalaFutures { | |
val seed = 42 | |
"Split" should "handle random alphanumeric stream" in { | |
val size = 1000 | |
val randoms = randomAlphaSource(seed).take(size) | |
val future = splitWrite(randoms).run | |
// Split takes more than 150 ms to complete | |
import org.scalatest.time.{Millis, Seconds, Span} | |
implicit def patienceConfig = PatienceConfig(timeout = Span(1, Seconds), interval = Span(50, Millis)) | |
whenReady(future) { map => | |
// println(s"map: $map") | |
var length = 0 | |
map.foreach { | |
case (k, v) => { | |
val s = v.toString | |
// Each string should only contain the same character | |
s.filter(_ != k) should have length 0 | |
length += s.length | |
} | |
} | |
// All characters should be accounted for | |
length shouldBe size | |
} | |
} | |
} | |
// Run the test case | |
run(new SplitWriteSpec) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env scalas | |
// NOTE: This is a self-encapsulated Scala script meant to be run with scalas | |
// See http://www.scala-sbt.org/0.13/docs/Scripts.html | |
/*** | |
scalaVersion := "2.11.6" | |
resolvers += Resolver.url("typesafe-ivy-repo", url("http://typesafe.artifactoryonline.com/typesafe/releases"))(Resolver.ivyStylePatterns) | |
libraryDependencies ++= Seq( | |
"com.typesafe.scala-logging" %% "scala-logging" % "3.+", | |
"com.typesafe.akka" %% "akka-agent" % "2.3.12", | |
"com.typesafe.akka" %% "akka-stream-experimental" % "1.0", | |
"com.typesafe.akka" %% "akka-http-experimental" % "1.0", | |
"org.scalatest" %% "scalatest" % "2.2.4" | |
) | |
*/ | |
import com.typesafe.scalalogging.StrictLogging | |
import akka.actor.{ActorSystem, Cancellable} | |
import akka.agent.Agent | |
import akka.pattern.after | |
import akka.stream._ | |
import akka.stream.io._ | |
import akka.stream.stage._ | |
import akka.stream.scaladsl.FlowGraph.Implicits._ | |
import akka.stream.scaladsl._ | |
import akka.http.scaladsl.model.HttpRequest | |
import akka.http.scaladsl.model.HttpRequest | |
import akka.http.scaladsl.model.headers.{Authorization, OAuth2BearerToken} | |
import akka.util.ByteString | |
import org.scalatest._ | |
import org.scalatest.concurrent.ScalaFutures | |
import org.scalatest.prop.TableDrivenPropertyChecks._ | |
import org.scalactic.Tolerance._ | |
import org.scalactic.TripleEqualsSupport.Spread | |
import scala.collection.immutable._ | |
import scala.collection.mutable | |
import scala.concurrent.{ExecutionContext, Future, Promise} | |
import scala.util.Random | |
import scala.concurrent.duration._ | |
import java.util.concurrent.atomic.AtomicBoolean | |
import java.io.ByteArrayOutputStream | |
import java.net.URL | |
// A simple implementation of the Akka Cancellable trait | |
class AtomicCancellable extends Cancellable { | |
private val cancelled = new AtomicBoolean() | |
override def cancel() = cancelled.compareAndSet(false, true) | |
override def isCancelled() = cancelled.get | |
} | |
object AtomicCancellable { | |
def apply() = new AtomicCancellable | |
} | |
abstract class Token(prefix: String, sequence: Long) { | |
val code: String = prefix + sequence | |
} | |
case class RefreshToken(sequence: Long) extends Token("R", sequence) { | |
def this() = this(0) | |
def next = this.copy(sequence = sequence + 1) | |
} | |
case class AccessToken private (sequence: Long, expiresIn: FiniteDuration) extends Token("A", sequence) { | |
def this(token: RefreshToken, expiresIn: FiniteDuration) = this(token.sequence, expiresIn) | |
} | |
case class RefreshResponse(access: AccessToken, refresh: Option[RefreshToken]) | |
// The basic OAuth refresh token request/response flow (section 6 of the OAuth 2.0 spec) | |
// This flow is the fundamental source of access and refresh tokens | |
// and would be a real HTTP flow in a working example | |
def refreshRequest(endpoint: URL, clientId: String, clientSecret: String): Flow[RefreshToken, Future[RefreshResponse], Unit] = { | |
// val expiresIn = 10.minutes | |
// This would be the real flow that makes the request to the OAuth refresh endpoint | |
// using Http().singleRequest(...), for example | |
Flow[RefreshToken].map(refresh => Future.successful(RefreshResponse(new AccessToken(refresh, 10.minutes), Some(refresh.next)))) | |
} | |
// Creates a sink that materializes an Agent for obtaining the current Access Token | |
// The sink accepts a single element, the initial Refresh Token, and | |
// initiates an internal flow that automatically requests new Access Tokens | |
// and updates the materialized Agent accordingly | |
// The paired Cancellable can be used to stop the automatic update process | |
// Callers should provide a valid persist sink, typically backed by a database | |
// or other persistent store, to save the latest refresh token value | |
// The lead argument is the lead time to request the next Access Token prior to | |
// the current token's expiration | |
def autoRefresh( | |
request: Flow[RefreshToken, Future[RefreshResponse], _], | |
persist: Sink[RefreshToken, _], | |
lead: FiniteDuration = 30.seconds | |
)(implicit materializer: Materializer, system: ActorSystem) : Sink[RefreshToken, (Agent[Future[AccessToken]], Cancellable)] = { | |
Sink.head[RefreshToken].mapMaterializedValue { futureInitial => | |
implicit val executionContext = materializer.executionContext | |
val first = Promise[AccessToken] | |
val agent = Agent(first.future) | |
val cancellable = AtomicCancellable() | |
// Create the auto refresh flow that will run independently | |
// to periodically update the agent with the current Access Token | |
val auto = Sink( | |
Flow[(RefreshToken, Promise[AccessToken])], | |
Merge[(RefreshToken, Promise[AccessToken])](2), | |
request, | |
Unzip[RefreshToken, Promise[AccessToken]], | |
Zip[Future[RefreshResponse], Promise[AccessToken]], | |
Broadcast[(RefreshResponse, Promise[AccessToken])](3) | |
)((mat, _, _, _, _, _) => mat) { | |
implicit b => (initial, merge, request, unzip, zip, bcast) => | |
// Detect and handle cancellation | |
// The splitWhen diverts the flow of elements upon cancellation, | |
// ending the refresh process and allowing us to handle cleanup | |
// so we don't leave a dangling, uncompleted promise | |
val cancel = b.add(Flow[(RefreshToken, Promise[AccessToken])] | |
.splitWhen(_ => cancellable.isCancelled) | |
.prefixAndTail(1).map { | |
case (prefix, tail) => { | |
tail.map(_.map { | |
case (_, promise) => promise.failure(new Exception("auto refresh cancelled")) | |
}) | |
prefix.head | |
} | |
}.flatten(FlattenStrategy.concat)) | |
// Complete current promise and create next promise | |
val promise = b.add(Flow[(Future[RefreshResponse], Promise[AccessToken])] | |
.map { | |
case (fresponse, cur) => { | |
cur.completeWith(fresponse.map(_.access)) | |
val next = Promise[AccessToken] | |
// Update agent upon promise completion | |
// Note that this a side effect, hence the andThen | |
next.future.andThen { case _ => agent.send(next.future) } | |
(fresponse, next) | |
} | |
}) | |
// Unwrap the completed response future | |
val response = b.add(Flow[(Future[RefreshResponse], Promise[AccessToken])] | |
.map { case (fr, p) => Source(fr.map(r => (r, p))) } | |
.flatten(FlattenStrategy.concat)) | |
// Save the updated refresh token, if supplied | |
val save = b.add(Flow[(RefreshResponse, Promise[AccessToken])] | |
.collect { case (RefreshResponse(_, Some(refresh)), _) => refresh } | |
.to(persist)) | |
// Send the next future to the agent upon expiration of current to | |
// prevent users of the agent from using expired tokens | |
// Uses the Akka after pattern to schedule the send | |
val expiration = b.add(Sink.foreach[(RefreshResponse, Promise[AccessToken])] { | |
case (RefreshResponse(access, _), promise) => { | |
after(access.expiresIn, system.scheduler)(Future { | |
if (!promise.isCompleted) { | |
agent.send(promise.future) | |
} | |
}) | |
} | |
}) | |
// Feed back the refresh token after delay to initiate the next access token refresh | |
// Uses the Akka after pattern to schedule the Future with configured lead time | |
// prior to expiration | |
val refresh = b.add(Flow[(RefreshResponse, Promise[AccessToken])] | |
.collect { | |
case (RefreshResponse(access, Some(refresh)), promise) => { | |
val delay = access.expiresIn.minus(lead) | |
val future = after(delay, system.scheduler)(Future.successful((refresh, promise))) | |
Source(future) | |
} | |
}.flatten(FlattenStrategy.concat)) | |
// The rolling request flow, initiated with the initial refresh token | |
initial ~> merge ~> cancel ~> unzip.in | |
unzip.out0 ~> request ~> zip.in0 | |
unzip.out1 ~> zip.in1 | |
zip.out ~> promise ~> response ~> bcast ~> save | |
bcast ~> expiration | |
bcast ~> refresh | |
merge <~ refresh | |
initial.inlet | |
} | |
// Run the auto refresh flow | |
val initial = futureInitial.map(refresh => (refresh, first)) | |
auto.runWith(Source(initial)) | |
// Return the (Agent, Cancellable) pair | |
(agent, cancellable) | |
} | |
} | |
// Create an Access Token source that always retrieves the latest value from an agent | |
def accessTokenSource(agent: Agent[Future[AccessToken]]): Source[AccessToken, Unit] = | |
Source.repeat().map(_ => Source(agent())).flatten(FlattenStrategy.concat) | |
// Flow that decorates requests with the current access token | |
def addAccessToken(tokens: Source[AccessToken, Unit]): Flow[HttpRequest, HttpRequest, Unit] = | |
Flow(Flow[HttpRequest], tokens, Zip[HttpRequest, AccessToken])((mat, _, _) => mat) { | |
implicit b => (requests, tokens, zip) => | |
val addHeader = b.add(Flow[(HttpRequest, AccessToken)].map { | |
case (request, access) => request.withHeaders(new Authorization(new OAuth2BearerToken(access.code))) | |
}) | |
requests ~> zip.in0 | |
tokens ~> zip.in1 | |
zip.out ~> addHeader | |
(requests.inlet, addHeader.outlet) | |
} | |
// Create/destroy an actor system for testing | |
trait AkkaStreamsImplicits extends BeforeAndAfterAll { this: Suite => | |
implicit var system: ActorSystem = _ | |
implicit var materializer: Materializer = _ | |
override def beforeAll = { | |
super.beforeAll | |
system = ActorSystem(this.getClass.getSimpleName.replace("$", "_")) | |
materializer = ActorMaterializer() | |
} | |
override def afterAll = { | |
system.shutdown | |
super.afterAll | |
} | |
} | |
class OAuthRefreshSpec extends FlatSpec with AkkaStreamsImplicits with Matchers with ScalaFutures { | |
def createMockResponse(refresh: RefreshToken, expiresIn: FiniteDuration) = | |
RefreshResponse(new AccessToken(refresh, expiresIn), Some(refresh.next)) | |
"Auto refresh" should "generate access token" in { | |
val initialRefresh = new RefreshToken() | |
val expiresIn = 3.seconds | |
val mockRequestFlow = Flow[RefreshToken].map(r => Future.successful(createMockResponse(r, expiresIn))) | |
val persist = Sink.foreach[RefreshToken](r => println(s"Saving refresh token: $r")) | |
val autoRefreshSink = autoRefresh(mockRequestFlow, persist, 1.second) | |
val (agent, cancellable) = Source.single(initialRefresh).runWith(autoRefreshSink) | |
whenReady(agent()) { token => | |
val expected = new AccessToken(initialRefresh, expiresIn) | |
token shouldBe expected | |
cancellable.cancel() | |
} | |
} | |
} | |
// Run the test case | |
run(new OAuthRefreshSpec) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env scalas | |
// NOTE: This is a self-encapsulated Scala script meant to be run with scalas | |
// See http://www.scala-sbt.org/0.13/docs/Scripts.html | |
/*** | |
scalaVersion := "2.11.6" | |
resolvers += Resolver.url("typesafe-ivy-repo", url("http://typesafe.artifactoryonline.com/typesafe/releases"))(Resolver.ivyStylePatterns) | |
libraryDependencies ++= Seq( | |
"com.typesafe.scala-logging" %% "scala-logging" % "3.+", | |
"com.typesafe.akka" % "akka-stream-experimental_2.11" % "1.0", | |
"org.scalatest" %% "scalatest" % "2.2.4" | |
) | |
*/ | |
import com.typesafe.scalalogging.StrictLogging | |
import akka.actor.ActorSystem | |
import akka.stream._ | |
import akka.stream.stage._ | |
import akka.stream.scaladsl.FlowGraph.Implicits._ | |
import akka.stream.scaladsl._ | |
import org.scalatest._ | |
import org.scalatest.concurrent.ScalaFutures | |
import org.scalatest.prop.TableDrivenPropertyChecks._ | |
import org.scalactic.Tolerance._ | |
import org.scalactic.TripleEqualsSupport.Spread | |
import scala.collection.immutable._ | |
import scala.util.Random | |
// Creates an unbounded source of random ints with a known seed (for repeatability) | |
def randomSource(seed: Int) = Source(() => { | |
val random = new Random(seed) | |
Iterator.continually(random.nextInt) | |
}) | |
// Transform a source of integers into a normalized source of doubles where | |
// each element emitted is in the range of 0 to 1 | |
// Note that the incoming source must be both finite and support multiple subscribers | |
def normalize(in: Source[Int, Unit]): Source[Double, Unit] = { | |
// Fold over the input source to create a new source that emits a single element | |
// which is the range of integers over the entire stream | |
val fold = in.fold((Int.MaxValue, Int.MinValue)) { | |
(range, n) => range match { | |
case (l, u) => (l.min(n), u.max(n)) | |
} | |
} | |
// Transform the single element range source into an unbounded source | |
// that continually emits the same element | |
val range = fold.map(r => Source.repeat(r)).flatten(FlattenStrategy.concat) | |
// Create a stage that normalizes each value | |
val normalize = Flow[(Int, (Int, Int))].map { | |
case (n, (min, max)) if (min == max) => 1.0 | |
case (n, (min, max)) => (n.toDouble - min.toDouble) / (max.toDouble - min.toDouble) | |
} | |
// Create the final source using a flow that combines the prior constructs | |
Source(in, range, Zip[Int, (Int, Int)], normalize)((mat, _, _, _) => mat) { | |
implicit b => (in, range, zip, normalize) => | |
in ~> zip.in0 | |
range ~> zip.in1 | |
zip.out ~> normalize | |
normalize.outlet | |
} | |
} | |
// Create/destroy an actor system for testing | |
trait AkkaStreamsImplicits extends BeforeAndAfterAll { this: Suite => | |
implicit var system: ActorSystem = _ | |
implicit var materializer: Materializer = _ | |
override def beforeAll = { | |
super.beforeAll | |
system = ActorSystem(this.getClass.getSimpleName.replace("$", "_")) | |
materializer = ActorMaterializer() | |
} | |
override def afterAll = { | |
system.shutdown | |
super.afterAll | |
} | |
} | |
class NormalizeSpec extends FlatSpec with AkkaStreamsImplicits with Matchers with ScalaFutures { | |
val seed = 42 | |
"Normalize" should "properly calculate for constant stream" in { | |
val value = 5 | |
val size = 100 | |
val expected = Seq.fill(size)(1.0) | |
val constants = Source.repeat(value).take(size) | |
val normalized = normalize(constants) | |
val future = normalized.runWith(Sink.fold(List[Double]())(_ :+ _)) | |
whenReady(future) { result => | |
//println(s"result: $result") | |
result should have size expected.size | |
result.zip(expected).foreach { case (actual, expected) => | |
actual shouldBe expected | |
} | |
} | |
} | |
it should "properly calculate for random stream" in { | |
val size = 100 | |
val randoms = randomSource(seed).take(size) | |
val normalized = normalize(randoms) | |
val future = normalized.runWith(Sink.fold(List[Double]())(_ :+ _)) | |
whenReady(future) { result => | |
//println(s"result: $result") | |
result should have size size | |
result should contain (0.0) | |
result should contain (1.0) | |
result.exists(_ < 0.0) shouldBe false | |
result.exists(_ > 1.0) shouldBe false | |
} | |
} | |
} | |
// Run the test case | |
run(new NormalizeSpec) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment