Last active
July 21, 2017 10:50
-
-
Save ktoso/46d17f31a0c5a134903a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com> | |
*/ | |
package akka.stream | |
import java.io.{BufferedWriter, File, FileInputStream, FileWriter} | |
import java.nio.ByteBuffer | |
import java.util.concurrent.{ArrayBlockingQueue, TimeUnit} | |
import akka.actor.ActorSystem | |
import akka.stream.scaladsl._ | |
import akka.util.ByteString | |
import com.typesafe.config.ConfigFactory | |
import org.openjdk.jmh.annotations._ | |
@State(Scope.Benchmark) | |
@OutputTimeUnit(TimeUnit.MILLISECONDS) | |
@BenchmarkMode(Array(Mode.AverageTime)) | |
class fiStreamFileBenchmark { | |
val config = ConfigFactory.parseString( | |
""" | |
akka { | |
log-config-on-start = off | |
log-dead-letters-during-shutdown = off | |
loglevel = "WARNING" | |
stream { | |
file-io { | |
executor { | |
type = "thread-pool-executor" | |
} | |
} | |
} | |
}""".stripMargin).withFallback(ConfigFactory.load()) | |
implicit val system = ActorSystem("file-stream-bench", config) | |
implicit val dispatcher = system.dispatcher | |
var mat: FlowMaterializer = _ | |
final val abq = new ArrayBlockingQueue[Int](1) | |
final val sumFoldSink: FoldSink[Int, ByteString] = Sink.fold(0){ (acc, l) ⇒ acc + l.length } | |
final val sumFoldStringSink: FoldSink[Int, String] = Sink.fold(0){ (acc, l) ⇒ acc + l.length } | |
var runToFold: RunnableFlow = _ | |
@Param(Array("1048576", "10485760")) | |
val fileSize = 0 // bytes | |
var f: File = _ | |
@Setup | |
def setup() { | |
val settings = MaterializerSettings(system) | |
f = File.createTempFile("file-benchmark", ".tmp") | |
val w = new BufferedWriter(new FileWriter(f)) | |
while (f.length() < fileSize) | |
w.write(fileSize.toString + "\n") | |
w.close() | |
runToFold = Source(f).to(sumFoldSink) | |
mat = FlowMaterializer(settings) | |
} | |
@TearDown | |
def shutdown() { | |
system.shutdown() | |
system.awaitTermination() | |
} | |
// @Benchmark | |
// def bufferedReader_linesStream_jdk8: Long = { | |
// val reader = new BufferedReader(new FileReader(f)) | |
// val lines = reader.lines() | |
// val i = lines.count() | |
// reader.close() | |
// i | |
// } | |
@Benchmark | |
def getLines: Int = { | |
val source = scala.io.Source.fromFile(f) | |
val lines = source.getLines() // defaultCharBufferSize is 8192 | |
val i = lines.map(_.length).sum | |
source.close() | |
i | |
} | |
@Benchmark | |
def fileChannel: Long = { | |
val Size = 512 | |
val fs = new FileInputStream(f) | |
val ch = fs.getChannel | |
val bb = ByteBuffer.allocate( 512 * 8 ) | |
val barray = Array.ofDim[Byte](Size) | |
var checkSum = 0L | |
var nRead = 0 | |
var nGet = 0 | |
while ({nRead=ch.read( bb ); nRead} != -1 ) { | |
if ( nRead != 0 ) { | |
bb.position(0) | |
bb.limit(nRead) | |
while (bb.hasRemaining) { | |
nGet = Math.min(bb.remaining(), Size) | |
bb.get(barray, 0, nGet) | |
var i = 0 | |
while (i < nGet) { | |
checkSum += barray(i) | |
i += 1 | |
} | |
} | |
bb.clear() | |
} | |
} | |
fs.close() | |
checkSum | |
} | |
@Benchmark | |
def getLines_asSource: Int = { | |
val source = scala.io.Source.fromFile(f) | |
val m = Source(() ⇒ source.getLines()).runWith(sumFoldStringSink)(mat) | |
m.map { i ⇒ abq.offer(i) } | |
try abq.take() finally source.close() | |
} | |
@Benchmark | |
def source_blockingIo_chunk_512_ahead_4: Int = { | |
val m = Source(f, chunkSize = 512, readAhead = 4).runWith(sumFoldSink)(mat) | |
m.map { i ⇒ abq.offer(i) } | |
abq.take() | |
} | |
@Benchmark | |
def source_blockingIo_chunk_512_ahead_8: Int = { | |
val m = Source(f, chunkSize = 512, readAhead = 8).runWith(sumFoldSink)(mat) | |
m.map { i ⇒ abq.offer(i) } | |
abq.take() | |
} | |
@Benchmark | |
def source_blockingIo_chunk_512_ahead_16: Int = { | |
val m = Source(f, chunkSize = 512, readAhead = 16).runWith(sumFoldSink)(mat) | |
m.map { i ⇒ abq.offer(i) } | |
abq.take() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[info] Benchmark (fileSize) Mode Cnt Score Error Units | |
// GOAL, fastest file reading: | |
[info] fiStreamFileBenchmark.fileChannel 1048576 avgt 5 1.208 ± 0.055 ms/op | |
[info] fiStreamFileBenchmark.fileChannel 10485760 avgt 5 11.661 ± 0.422 ms/op | |
// "scala style", io.Source.getLines() | |
[info] fiStreamFileBenchmark.getLines 1048576 avgt 5 6.763 ± 0.309 ms/op | |
[info] fiStreamFileBenchmark.getLines 10485760 avgt 5 64.057 ± 0.957 ms/op | |
// "people do this", passing in getLines (iterable) to akka-streams Source(() => it), bad! | |
[info] fiStreamFileBenchmark.getLines_asSource 1048576 avgt 5 222.514 ± 78.932 ms/op | |
[info] fiStreamFileBenchmark.getLines_asSource 10485760 avgt 5 1913.027 ± 1346.009 ms/op | |
// FileSource - blocking io, FileChannel, recycling ByteBuffers | |
[info] fiStreamFileBenchmark.source_blockingIo_chunk_512_ahead_16 1048576 avgt 5 10.111 ± 3.856 ms/op | |
[info] fiStreamFileBenchmark.source_blockingIo_chunk_512_ahead_16 10485760 avgt 5 142.122 ± 99.064 ms/op | |
[info] fiStreamFileBenchmark.source_blockingIo_chunk_512_ahead_4 1048576 avgt 5 14.851 ± 14.938 ms/op | |
[info] fiStreamFileBenchmark.source_blockingIo_chunk_512_ahead_4 10485760 avgt 5 99.120 ± 67.705 ms/op | |
[info] fiStreamFileBenchmark.source_blockingIo_chunk_512_ahead_8 1048576 avgt 5 11.262 ± 5.170 ms/op | |
[info] fiStreamFileBenchmark.source_blockingIo_chunk_512_ahead_8 10485760 avgt 5 108.066 ± 29.671 ms/op | |
// FileSource - blocking io, FileChannel, recycling DirectByteBuffers: | |
[info] Benchmark (fileSize) Mode Cnt Score Error Units | |
[info] fiStreamFileBenchmark.fileChannel 1048576 avgt 5 1.201 ± 0.024 ms/op | |
[info] fiStreamFileBenchmark.fileChannel 10485760 avgt 5 12.345 ± 1.483 ms/op | |
[info] fiStreamFileBenchmark.getLines 1048576 avgt 5 7.429 ± 5.344 ms/op | |
[info] fiStreamFileBenchmark.getLines 10485760 avgt 5 64.447 ± 2.960 ms/op | |
[info] fiStreamFileBenchmark.getLines_asSource 1048576 avgt 5 207.415 ± 188.209 ms/op | |
[info] fiStreamFileBenchmark.getLines_asSource 10485760 avgt 5 2094.457 ± 1139.516 ms/op | |
[info] fiStreamFileBenchmark.source_blockingIo_chunk_512_ahead_16 1048576 avgt 5 9.201 ± 0.588 ms/op | |
[info] fiStreamFileBenchmark.source_blockingIo_chunk_512_ahead_16 10485760 avgt 5 94.242 ± 34.930 ms/op | |
[info] fiStreamFileBenchmark.source_blockingIo_chunk_512_ahead_4 1048576 avgt 5 9.862 ± 5.036 ms/op | |
[info] fiStreamFileBenchmark.source_blockingIo_chunk_512_ahead_4 10485760 avgt 5 90.810 ± 30.827 ms/op | |
[info] fiStreamFileBenchmark.source_blockingIo_chunk_512_ahead_8 1048576 avgt 5 9.460 ± 1.203 ms/op | |
[info] fiStreamFileBenchmark.source_blockingIo_chunk_512_ahead_8 10485760 avgt 5 91.837 ± 44.826 ms/op |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment