Skip to content

Instantly share code, notes, and snippets.

@facboy
Created November 19, 2014 12:12
Show Gist options
  • Save facboy/8387e950ffb0746a8272 to your computer and use it in GitHub Desktop.
Save facboy/8387e950ffb0746a8272 to your computer and use it in GitHub Desktop.
ClassCastException from JavaPairRDD.collectAsMap()
package org.facboy.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import com.google.common.collect.ImmutableList;
/**
* @author Christopher Ng
*/
public class CheckpointBug {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Checkpoint Application").setMaster("local[8]");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
JavaSparkContext sparkContext = new JavaSparkContext(conf);
sparkContext.setCheckpointDir("checkpoints");
JavaPairRDD<String, Integer> rdd = JavaPairRDD.fromJavaRDD(sparkContext.parallelize(ImmutableList.of(
new Tuple2<>("a", 1),
new Tuple2<>("a", 3),
new Tuple2<>("b", 2),
new Tuple2<>("b", 6),
new Tuple2<>("c", 3),
new Tuple2<>("c", 4)
)))
.reduceByKey((v1, v2) -> v1 + v2);
rdd.checkpoint();
System.out.println("rdd: " + rdd.collectAsMap());
String checkpointFile = rdd.getCheckpointFile().get();
ClassTag<Tuple2<String, Integer>> classTag = ClassTag$.MODULE$.apply(Tuple2.class);
JavaPairRDD<String, Integer> checkpointedClassTag = JavaPairRDD.fromJavaRDD(
new JavaRDD<>(JavaSparkContext.toSparkContext(sparkContext).checkpointFile(checkpointFile, classTag), classTag));
System.out.println("class tag checkpoint rdd: " + checkpointedClassTag.collectAsMap());
JavaPairRDD<String, Integer> checkpointed = JavaPairRDD.fromJavaRDD(sparkContext.<Tuple2<String, Integer>>checkpointFile(checkpointFile));
System.out.println("java checkpoint rdd: " + checkpointed.collectAsMap());
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.facboy.spark</groupId>
<artifactId>spark-bugs</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<spark.version>1.1.0</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<version>3.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
</dependency>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment