Skip to content

Instantly share code, notes, and snippets.

@soumyasd
Last active May 28, 2016 10:36
Show Gist options
  • Save soumyasd/ac5b1d5f2ec3af21ace8 to your computer and use it in GitHub Desktop.
Save soumyasd/ac5b1d5f2ec3af21ace8 to your computer and use it in GitHub Desktop.
rediscala {
//loglevel = "DEBUG"
rediscala-client-worker-dispatcher {
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 1000
}
}
actor {
default-dispatcher {
# Must be one of the following
# Dispatcher, PinnedDispatcher, or a FQCN to a class inheriting
# MessageDispatcherConfigurator with a public constructor with
# both com.typesafe.config.Config parameter and
# akka.dispatch.DispatcherPrerequisites parameters.
# PinnedDispatcher must be used together with executor=thread-pool-executor.
type = "Dispatcher"
# Which kind of ExecutorService to use for this dispatcher
# Valid options:
# - "default-executor" requires a "default-executor" section
# - "fork-join-executor" requires a "fork-join-executor" section
# - "thread-pool-executor" requires a "thread-pool-executor" section
# - A FQCN of a class extending ExecutorServiceConfigurator
executor = "fork-join-executor"
# This will be used if you have set "executor = "default-executor"".
# If an ActorSystem is created with a given ExecutionContext, this
# ExecutionContext will be used as the default executor for all
# dispatchers in the ActorSystem configured with
# executor = "default-executor". Note that "default-executor"
# is the default value for executor, and therefore used if not
# specified otherwise. If no ExecutionContext is given,
# the executor configured in "fallback" will be used.
default-executor {
fallback = "fork-join-executor"
}
# This will be used if you have set "executor = "fork-join-executor""
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 5
# The parallelism factor is used to determine thread pool size using the
# following formula: ceil(available processors * factor). Resulting size
# is then bounded by the parallelism-min and parallelism-max values.
#parallelism-factor = 3.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 5
}
throughput = 1000
}
}
import sbtassembly.Plugin.AssemblyKeys
import AssemblyKeys._ // put this at the top of the file
import sbt.Keys._
name := """akkaperf"""
version := "1.0"
scalaVersion := "2.11.4"
resolvers += "Typesafe repository releases" at "http://repo.typesafe.com/typesafe/releases/"
resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots"
resolvers += "rediscala" at "http://dl.bintray.com/etaty/maven"
libraryDependencies ++= Seq(
"com.etaty.rediscala" %% "rediscala" % "1.4.0",
"com.github.scala-blitz" %% "scala-blitz" % "1.1",
"com.storm-enroute" %% "scalameter" % "0.7-SNAPSHOT",
"com.storm-enroute" %% "scalameter-core" % "0.7-SNAPSHOT",
"org.latencyutils" % "LatencyUtils" % "1.0.2",
"com.typesafe.akka" %% "akka-stream-experimental" % "1.0-M1",
)
testFrameworks += new TestFramework("org.scalameter.ScalaMeterFramework")
assemblySettings
parallelExecution in Test := false
logBuffered := false
javaOptions += "-Xmx8G"
package redisbenchmark
import java.util.UUID
import java.util.concurrent.ThreadLocalRandom
import akka.actor.ActorSystem
import akka.stream.{MaterializerSettings, OverflowStrategy, FlowMaterializer}
import akka.stream.scaladsl.{TickSource, IterableSource, Source}
import akka.util.ByteString
import redis.RedisClient
import akka.stream.scaladsl._
import scala.concurrent.{duration, Future}
import scala.concurrent.duration.Duration
object RedisStreamClient extends App {
val message = """How to explain ZeroMQ? Some of us start by saying all the wonderful things it does. It's sockets on steroids. It's like mailboxes with routing. It's fast! Others try to share their moment of enlightenment, that zap-pow-kaboom satori paradigm-shift moment when it all became obvious. Things just become simpler. Complexity goes away. It opens the mind. Others try to explain by comparison. It's smaller, simpler, but still looks familiar. Personally, I like to remember why we made ZeroMQ at all, because that's most likely where you, the reader, still are today.How to explain ZeroMQ? Some of us start by saying all the wonderful things it does. It's sockets on steroids. It's like mailboxes with routing. It's fast! Others try to share their moment of enlightenment, that zap-pow-kaboom satori paradigm-shift moment when it all became obvious. Things just become simpler. Complexity goes away. It opens the mind. Others try to explain by comparison. It's smaller, simpler, but still looks familiar. Personally, I like to remember why we made ZeroMQ at all, because that's most likely where"""
implicit val system = ActorSystem("Sys")
val settings = MaterializerSettings(system)
implicit val materializer = FlowMaterializer(settings.copy(maxInputBufferSize = 512, initialInputBufferSize = 512))
val msgSize = message.getBytes.size
val redis = RedisClient()
val random1 = UUID.randomUUID().toString
val random2 = UUID.randomUUID().toString
import scala.concurrent.duration._
def insertValues(rnd: String): Flow[Int, Boolean] = {
Flow[Int].mapAsyncUnordered(k => redis.set(k + rnd, message))
}
//Following will insert a lot of values
//val maxRandomNumberSize = 100000000
//val randomSource = Source(() => Iterator.continually(ThreadLocalRandom.current().nextInt(maxRandomNumberSize)))
//val streamWithRandomSource = source.via(insertValues).runWith(blackhole)
val blackhole = BlackholeSink
val maxSeq = 5000000
val seqSource = Source( () => (1 to maxSeq).iterator )
val streamWithSeqSource = seqSource.via(insertValues(random1)).runWith(blackhole)
//here is the grouped version where we are batching a bunch of Redis commands together
seqSource.grouped(100).mapAsyncUnordered { grp => {
val tran = redis.transaction()
for (i <- grp) yield {
tran.set(i + random2, message)
}
tran.exec()
}
}.runWith(blackhole)
//TODO figure out a way to stop the stream correctly after inserting 5 million entries.
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment