Skip to content

Instantly share code, notes, and snippets.

@JoshRosen
Last active August 29, 2015 14:20
Show Gist options
  • Save JoshRosen/286b26494ab27e657051 to your computer and use it in GitHub Desktop.
Save JoshRosen/286b26494ab27e657051 to your computer and use it in GitHub Desktop.
package org.apache.spark.sql
import org.apache.spark.unsafe.PlatformDependent
import org.apache.spark.unsafe.map.BytesToBytesMap
import org.apache.spark.unsafe.memory.{MemoryAllocator, TaskMemoryManager, ExecutorMemoryManager}
import scala.util.Random
/**
* This benchmark measures the time to iterate over a BytesToBytesMap.
*/
object BytesToBytesMapIterationBenchmark {
def createMap(numRecords: Int, memoryManager: TaskMemoryManager): BytesToBytesMap = {
val rand = new Random(42)
val map = new BytesToBytesMap(memoryManager, (numRecords * 1.5).toInt, false)
for (_ <- 1 to numRecords) {
val key = new Array[Byte](64)
val value = new Array[Byte](64)
rand.nextBytes(key)
rand.nextBytes(value)
val loc = map.lookup(key, PlatformDependent.BYTE_ARRAY_OFFSET, key.length)
loc.putNewKey(
key, PlatformDependent.BYTE_ARRAY_OFFSET, key.length,
value, PlatformDependent.BYTE_ARRAY_OFFSET, value.length
)
}
map
}
def runBenchmark(map: BytesToBytesMap): Unit = {
val iter = map.iterator()
val key = new Array[Byte](64)
val value = new Array[Byte](64)
while (iter.hasNext) {
val loc = iter.next()
PlatformDependent.copyMemory(
loc.getKeyAddress.getBaseObject,
loc.getKeyAddress.getBaseOffset,
key,
PlatformDependent.BYTE_ARRAY_OFFSET,
loc.getKeyLength)
PlatformDependent.copyMemory(
loc.getValueAddress.getBaseObject,
loc.getValueAddress.getBaseOffset,
value,
PlatformDependent.BYTE_ARRAY_OFFSET,
loc.getValueLength)
}
}
def main(args: Array[String]): Unit = {
val NUM_WARMUPS = 2
val NUM_ITERATIONS = 10
println("mode,numRecords,operationsPerSecond")
for (
numRecords <- (1 to 10).map(_ * 1000000);
allocator <- Seq(MemoryAllocator.HEAP, MemoryAllocator.UNSAFE)
) {
val memoryManager = new TaskMemoryManager(new ExecutorMemoryManager(allocator))
val map = createMap(numRecords, memoryManager)
try {
for (_ <- 1 to NUM_WARMUPS) {
runBenchmark(map)
}
for (_ <- 1 to NUM_ITERATIONS) {
System.gc()
val startTime = System.currentTimeMillis()
runBenchmark(map)
val endTime = System.currentTimeMillis()
println(s"${allocator.getClass.getName},$numRecords,${(1.0 * numRecords) / (endTime - startTime) * 1000}")
}
} finally {
map.free()
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment