Skip to content

Instantly share code, notes, and snippets.

@ktoso
Last active July 21, 2017 10:50
Show Gist options
  • Save ktoso/46d17f31a0c5a134903a to your computer and use it in GitHub Desktop.
Save ktoso/46d17f31a0c5a134903a to your computer and use it in GitHub Desktop.
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import java.io.{BufferedWriter, File, FileInputStream, FileWriter}
import java.nio.ByteBuffer
import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.util.ByteString
import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations._
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.AverageTime))
class fiStreamFileBenchmark {
val config = ConfigFactory.parseString(
"""
akka {
log-config-on-start = off
log-dead-letters-during-shutdown = off
loglevel = "WARNING"
stream {
file-io {
executor {
type = "thread-pool-executor"
}
}
}
}""".stripMargin).withFallback(ConfigFactory.load())
implicit val system = ActorSystem("file-stream-bench", config)
implicit val dispatcher = system.dispatcher
var mat: FlowMaterializer = _
final val abq = new ArrayBlockingQueue[Int](1)
final val sumFoldSink: FoldSink[Int, ByteString] = Sink.fold(0){ (acc, l) ⇒ acc + l.length }
final val sumFoldStringSink: FoldSink[Int, String] = Sink.fold(0){ (acc, l) ⇒ acc + l.length }
var runToFold: RunnableFlow = _
@Param(Array("1048576", "10485760"))
val fileSize = 0 // bytes
var f: File = _
@Setup
def setup() {
val settings = MaterializerSettings(system)
f = File.createTempFile("file-benchmark", ".tmp")
val w = new BufferedWriter(new FileWriter(f))
while (f.length() < fileSize)
w.write(fileSize.toString + "\n")
w.close()
runToFold = Source(f).to(sumFoldSink)
mat = FlowMaterializer(settings)
}
@TearDown
def shutdown() {
system.shutdown()
system.awaitTermination()
}
// @Benchmark
// def bufferedReader_linesStream_jdk8: Long = {
// val reader = new BufferedReader(new FileReader(f))
// val lines = reader.lines()
// val i = lines.count()
// reader.close()
// i
// }
@Benchmark
def getLines: Int = {
val source = scala.io.Source.fromFile(f)
val lines = source.getLines() // defaultCharBufferSize is 8192
val i = lines.map(_.length).sum
source.close()
i
}
@Benchmark
def fileChannel: Long = {
val Size = 512
val fs = new FileInputStream(f)
val ch = fs.getChannel
val bb = ByteBuffer.allocate( 512 * 8 )
val barray = Array.ofDim[Byte](Size)
var checkSum = 0L
var nRead = 0
var nGet = 0
while ({nRead=ch.read( bb ); nRead} != -1 ) {
if ( nRead != 0 ) {
bb.position(0)
bb.limit(nRead)
while (bb.hasRemaining) {
nGet = Math.min(bb.remaining(), Size)
bb.get(barray, 0, nGet)
var i = 0
while (i < nGet) {
checkSum += barray(i)
i += 1
}
}
bb.clear()
}
}
fs.close()
checkSum
}
@Benchmark
def getLines_asSource: Int = {
val source = scala.io.Source.fromFile(f)
val m = Source(() ⇒ source.getLines()).runWith(sumFoldStringSink)(mat)
m.map { i ⇒ abq.offer(i) }
try abq.take() finally source.close()
}
@Benchmark
def source_blockingIo_chunk_512_ahead_4: Int = {
val m = Source(f, chunkSize = 512, readAhead = 4).runWith(sumFoldSink)(mat)
m.map { i ⇒ abq.offer(i) }
abq.take()
}
@Benchmark
def source_blockingIo_chunk_512_ahead_8: Int = {
val m = Source(f, chunkSize = 512, readAhead = 8).runWith(sumFoldSink)(mat)
m.map { i ⇒ abq.offer(i) }
abq.take()
}
@Benchmark
def source_blockingIo_chunk_512_ahead_16: Int = {
val m = Source(f, chunkSize = 512, readAhead = 16).runWith(sumFoldSink)(mat)
m.map { i ⇒ abq.offer(i) }
abq.take()
}
}
[info] Benchmark (fileSize) Mode Cnt Score Error Units
// GOAL, fastest file reading:
[info] fiStreamFileBenchmark.fileChannel 1048576 avgt 5 1.208 ± 0.055 ms/op
[info] fiStreamFileBenchmark.fileChannel 10485760 avgt 5 11.661 ± 0.422 ms/op
// "scala style", io.Source.getLines()
[info] fiStreamFileBenchmark.getLines 1048576 avgt 5 6.763 ± 0.309 ms/op
[info] fiStreamFileBenchmark.getLines 10485760 avgt 5 64.057 ± 0.957 ms/op
// "people do this", passing in getLines (iterable) to akka-streams Source(() => it), bad!
[info] fiStreamFileBenchmark.getLines_asSource 1048576 avgt 5 222.514 ± 78.932 ms/op
[info] fiStreamFileBenchmark.getLines_asSource 10485760 avgt 5 1913.027 ± 1346.009 ms/op
// FileSource - blocking io, FileChannel, recycling ByteBuffers
[info] fiStreamFileBenchmark.source_blockingIo_chunk_512_ahead_16 1048576 avgt 5 10.111 ± 3.856 ms/op
[info] fiStreamFileBenchmark.source_blockingIo_chunk_512_ahead_16 10485760 avgt 5 142.122 ± 99.064 ms/op
[info] fiStreamFileBenchmark.source_blockingIo_chunk_512_ahead_4 1048576 avgt 5 14.851 ± 14.938 ms/op
[info] fiStreamFileBenchmark.source_blockingIo_chunk_512_ahead_4 10485760 avgt 5 99.120 ± 67.705 ms/op
[info] fiStreamFileBenchmark.source_blockingIo_chunk_512_ahead_8 1048576 avgt 5 11.262 ± 5.170 ms/op
[info] fiStreamFileBenchmark.source_blockingIo_chunk_512_ahead_8 10485760 avgt 5 108.066 ± 29.671 ms/op
// FileSource - blocking io, FileChannel, recycling DirectByteBuffers:
[info] Benchmark (fileSize) Mode Cnt Score Error Units
[info] fiStreamFileBenchmark.fileChannel 1048576 avgt 5 1.201 ± 0.024 ms/op
[info] fiStreamFileBenchmark.fileChannel 10485760 avgt 5 12.345 ± 1.483 ms/op
[info] fiStreamFileBenchmark.getLines 1048576 avgt 5 7.429 ± 5.344 ms/op
[info] fiStreamFileBenchmark.getLines 10485760 avgt 5 64.447 ± 2.960 ms/op
[info] fiStreamFileBenchmark.getLines_asSource 1048576 avgt 5 207.415 ± 188.209 ms/op
[info] fiStreamFileBenchmark.getLines_asSource 10485760 avgt 5 2094.457 ± 1139.516 ms/op
[info] fiStreamFileBenchmark.source_blockingIo_chunk_512_ahead_16 1048576 avgt 5 9.201 ± 0.588 ms/op
[info] fiStreamFileBenchmark.source_blockingIo_chunk_512_ahead_16 10485760 avgt 5 94.242 ± 34.930 ms/op
[info] fiStreamFileBenchmark.source_blockingIo_chunk_512_ahead_4 1048576 avgt 5 9.862 ± 5.036 ms/op
[info] fiStreamFileBenchmark.source_blockingIo_chunk_512_ahead_4 10485760 avgt 5 90.810 ± 30.827 ms/op
[info] fiStreamFileBenchmark.source_blockingIo_chunk_512_ahead_8 1048576 avgt 5 9.460 ± 1.203 ms/op
[info] fiStreamFileBenchmark.source_blockingIo_chunk_512_ahead_8 10485760 avgt 5 91.837 ± 44.826 ms/op
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment