Skip to content

Instantly share code, notes, and snippets.

@JoshRosen
Created April 27, 2015 19:09
Show Gist options
  • Save JoshRosen/305954666a587117092c to your computer and use it in GitHub Desktop.
Save JoshRosen/305954666a587117092c to your computer and use it in GitHub Desktop.
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.expressions.{UnsafeFixedWidthAggregationMap, SpecificMutableRow, MutableRow, GenericRow}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.memory.MemoryAllocator
import org.openjdk.jmh.annotations._
import scala.util.Random
@State(Scope.Thread)
class MyBenchmark {
@Param(Array("1000000"))
var numKeys: Int = _
private val rand = new Random(42)
private val rowArr = new Array[Any](4)
private val row: Row = new GenericRow(rowArr)
/**
* Generates a new random row. For efficiency, each call returns the same mutable row.
*/
def randomRow(): Row = {
rowArr(0) = UTF8String(rand.nextString(10).toString)
rowArr(1) = rand.nextInt()
rowArr(2) = rand.nextLong()
rowArr(3) = rand.nextDouble()
row
}
/**
* Generates the grouping projection for a given row (in this case, a new row containing only
* the string column).
*/
def groupProjection(row: Row): Row = {
new GenericRow(Array[Any](row.get(0)))
}
/** The schema of the map values (aggegation buffers) */
val aggregationBufferTypes = IntegerType :: LongType :: DoubleType :: Nil
/** Generates new empty values */
def emptyAggregationBuffer(): MutableRow = {
new SpecificMutableRow(aggregationBufferTypes)
}
/**
* Updates an aggregation buffer by adding the given row's values to it.
*/
def updateAggregationBuffer(buffer: MutableRow, row: Row): Unit = {
buffer.setInt(0, buffer.getInt(0) + row.getInt(1))
buffer.setLong(1, buffer.getLong(1) + row.getLong(2))
buffer.setDouble(2, buffer.getDouble(2) + row.getDouble(3))
}
/**
* Run the benchmark using a hashmap implementation based on Java objects.
*/
@Benchmark
@OperationsPerInvocation(1000000)
def benchmarkJavaObjects(): Unit = {
val map = new java.util.HashMap[Row, MutableRow]()
var i = 0
while (i < numKeys) {
i += 1
val currentRow = randomRow()
val currentGroup = groupProjection(currentRow)
var currentBuffer = map.get(currentGroup)
if (currentBuffer == null) {
currentBuffer = emptyAggregationBuffer()
map.put(currentGroup, currentBuffer)
}
updateAggregationBuffer(currentBuffer, currentRow)
}
}
@Benchmark
@OperationsPerInvocation(1000000)
def benchmarkUnsafeOnHeap(): Unit = {
benchmarkWithUnsafe(MemoryAllocator.HEAP)
}
@Benchmark
@OperationsPerInvocation(1000000)
def benchmarkUnsafeOffHeap(): Unit = {
benchmarkWithUnsafe(MemoryAllocator.UNSAFE)
}
var unsafeMap: UnsafeFixedWidthAggregationMap = _
/**
* Run the benchmark using a hashmap implementation that uses managed memory.
*/
def benchmarkWithUnsafe(allocator: MemoryAllocator): Unit = {
unsafeMap = new UnsafeFixedWidthAggregationMap(
emptyAggregationBuffer(),
StructType(Seq(
StructField("a", IntegerType),
StructField("b", LongType),
StructField("c", DoubleType))),
StructType(Seq(
StructField("d", StringType))),
allocator,
1024, // initial map size
false) // disable perf. metrics
var i = 0
while (i < numKeys) {
i += 1
val currentRow = randomRow()
val currentGroup = groupProjection(currentRow)
val currentBuffer = unsafeMap.getAggregationBuffer(currentGroup)
updateAggregationBuffer(currentBuffer, currentRow)
}
}
@TearDown
def teardown(): Unit = {
if (unsafeMap != null) {
unsafeMap.free()
unsafeMap = null
}
}
}
<!--
Copyright (c) 2014, Oracle America, Inc.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of Oracle nor the names of its contributors may be used
to endorse or promote products derived from this software without
specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
THE POSSIBILITY OF SUCH DAMAGE.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.sample</groupId>
<artifactId>test</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<name>JMH benchmark sample: Scala</name>
<!--
This is the demo/sample template build script for building Groovy benchmarks with JMH.
Edit as needed. It needs a proper dependency on Scala library to compile.
-->
<dependencies>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>${jmh.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.4.0-SNAPSHOT</version>
</dependency>
</dependencies>
<prerequisites>
<maven>3.0.4</maven>
</prerequisites>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jmh.version>1.9.1</jmh.version>
<jmh.generator>default</jmh.generator> <!-- or "asm", or "reflection" -->
<javac.target>1.6</javac.target>
<uberjar.name>benchmarks</uberjar.name>
</properties>
<build>
<plugins>
<!--
1. Add source directories for both scalac and javac.
-->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.8</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${project.basedir}/</source>
<source>${project.basedir}/target/generated-sources/jmh</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<!--
2. Compile Scala sources
-->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<recompileMode>incremental</recompileMode>
</configuration>
<executions>
<execution>
<phase>process-sources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!--
3. Invoke JMH generators to produce benchmark code
-->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<phase>process-sources</phase>
<goals>
<goal>java</goal>
</goals>
<configuration>
<includePluginDependencies>true</includePluginDependencies>
<mainClass>org.openjdk.jmh.generators.bytecode.JmhBytecodeGenerator</mainClass>
<arguments>
<argument>${project.basedir}/target/classes/</argument>
<argument>${project.basedir}/target/generated-sources/jmh/</argument>
<argument>${project.basedir}/target/classes/</argument>
<argument>${jmh.generator}</argument>
</arguments>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-bytecode</artifactId>
<version>${jmh.version}</version>
</dependency>
</dependencies>
</plugin>
<!--
4. Compile JMH generated code.
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<compilerVersion>${javac.target}</compilerVersion>
<source>${javac.target}</source>
<target>${javac.target}</target>
<compilerArgument>-proc:none</compilerArgument>
</configuration>
</plugin>
<!--
5. Package all the dependencies into the JAR
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>${uberjar.name}</finalName>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.openjdk.jmh.Main</mainClass>
</transformer>
</transformers>
<filters>
<filter>
<!--
Shading signed JARs will fail without this.
http://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar
-->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment