Skip to content

Instantly share code, notes, and snippets.

@umbertogriffo
Created February 23, 2018 11:50
Show Gist options
  • Save umbertogriffo/cfd9ec9f2a3c3b4b50fa8ef7e8e84db2 to your computer and use it in GitHub Desktop.
Save umbertogriffo/cfd9ec9f2a3c3b4b50fa8ef7e8e84db2 to your computer and use it in GitHub Desktop.
This is a collections of examples about Apache Spark's JavaRDD Api. These examples aim to help me test the JavaRDD functionality.
package test.idlike.spark.datastructure;
import org.apache.commons.lang3.SystemUtils;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
public class JavaRddAPI {
private static JavaSparkContext sc;
public static void main(String[] args) throws Exception {
if (SystemUtils.IS_OS_WINDOWS) {
System.setProperty("hadoop.home.dir", "c:/winutil/");
}
// Create Context
sc = new JavaSparkContext("local",
"KeyValueMapFilter", System.getenv("SPARK_HOME"),
System.getenv("JARS"));
class Print1 implements VoidFunction<Tuple2<Integer, Integer>> {
private static final long serialVersionUID = 1L;
public void call(Tuple2<Integer, Integer> arg0) throws Exception {
System.out.println(arg0._1() + ":" + arg0._2());
}
}
class PrintStr implements VoidFunction<Tuple2<String, String>> {
private static final long serialVersionUID = 1L;
public void call(Tuple2<String, String> arg0) throws Exception {
System.out.println(arg0._1() + ":" + arg0._2());
}
}
class PrintIntStr implements VoidFunction<Tuple2<Integer, String>> {
private static final long serialVersionUID = 1L;
public void call(Tuple2<Integer, String> arg0) throws Exception {
System.out.println(arg0._1() + ":" + arg0._2());
}
}
class PrintStrInt implements VoidFunction<Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
public void call(Tuple2<String, Integer> arg0) throws Exception {
System.out.println(arg0._1() + ":" + arg0._2());
}
}
/******************************************************
* MAP
******************************************************/
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
// Map a List of Integer into a JavaDoubleRDD
JavaDoubleRDD doubles = rdd.mapToDouble(x -> x.doubleValue());
System.out.println("Map a List of Integer into a JavaDoubleRDD");
System.out.println(doubles.collect().toString());
// Map a List of Integer into a JavaPairRDD
JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<Integer, Integer>(x, x));
System.out.println("Map a List of Integer into a JavaPairRDD");
System.out.println(pairs.collect().toString());
System.out.println(pairs.take(8).size());
// Map a List of Integer into a JavaRDD<String>
JavaRDD<String> strings = rdd.map((Function<Integer, String>) x -> x.toString());
System.out.println("Map a List of Integer into a JavaRDD<String>");
System.out.println(strings.collect().toString());
/*******************************************************
* FLAT MAP
*******************************************************/
JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("Hello World!",
"The quick brown fox jumps over the lazy dog."));
// Tokenization of String
JavaRDD<String> words = rdd2
.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
System.out.println("Tokenization of String");
System.out.println(words.collect().toString());
JavaPairRDD<String, String> pairsRDD = rdd2
.flatMapToPair(s -> {
List<Tuple2<String, String>> pairs1 = new LinkedList<Tuple2<String, String>>();
for (String word : s.split(" ")) {
pairs1.add(new Tuple2<String, String>(word, word));
}
return pairs1.iterator();
});
System.out.println("Tokenization of String2");
pairsRDD.foreach(new PrintStr());
JavaDoubleRDD doubles2 = rdd2
.flatMapToDouble(s -> {
List<Double> lengths = new LinkedList<Double>();
for (String word : s.split(" ")) {
lengths.add((double) word.length());
}
return lengths.iterator();
});
System.out.println("Tokenization of String3");
System.out.println(doubles2.collect());
/*
* MAP FROM PAIRS TO PAIRS
*/
List<Tuple2<Integer, String>> pairs2 = Arrays.asList(
new Tuple2<Integer, String>(1, "a"),
new Tuple2<Integer, String>(2, "aa"),
new Tuple2<Integer, String>(3, "aaa"));
JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs2);
JavaPairRDD<String, Integer> swapped = pairRDD
.flatMapToPair(item -> Collections.singletonList(item.swap()).iterator());
System.out.println("MAP FROM PAIRS TO PAIRS - Swap value to key");
swapped.foreach(new PrintStrInt());
/*
* MAP PARTITIONS
*/
JavaRDD<Integer> rdd3 = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
JavaRDD<Integer> partitionSums = rdd3.mapPartitions(
iter -> {
int sum = 0;
while (iter.hasNext()) {
sum += iter.next();
}
return Collections.singletonList(sum).iterator();
});
// return [3,7]
System.out.println("Partition");
System.out.println(partitionSums.collect());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment