-
-
Save jacek-lewandowski/278bfc936ca990bee35a to your computer and use it in GitHub Desktop.
package com.datastax.spark.demo; | |
import com.datastax.driver.core.Session; | |
import com.datastax.spark.connector.cql.CassandraConnector; | |
import com.google.common.base.Optional; | |
import org.apache.spark.SparkConf; | |
import org.apache.spark.api.java.JavaPairRDD; | |
import org.apache.spark.api.java.JavaRDD; | |
import org.apache.spark.api.java.JavaSparkContext; | |
import org.apache.spark.api.java.function.FlatMapFunction; | |
import org.apache.spark.api.java.function.Function; | |
import org.apache.spark.api.java.function.Function2; | |
import org.apache.spark.api.java.function.PairFlatMapFunction; | |
import scala.Tuple2; | |
import java.io.Serializable; | |
import java.math.BigDecimal; | |
import java.text.MessageFormat; | |
import java.util.*; | |
import static com.datastax.spark.connector.CassandraJavaUtil.*; | |
public class JavaDemo implements Serializable { | |
private transient SparkConf conf; | |
private JavaDemo(SparkConf conf) { | |
this.conf = conf; | |
} | |
private void run() { | |
JavaSparkContext sc = new JavaSparkContext(conf); | |
generateData(sc); | |
compute(sc); | |
showResults(sc); | |
sc.stop(); | |
} | |
private void generateData(JavaSparkContext sc) { | |
CassandraConnector connector = CassandraConnector.apply(sc.getConf()); | |
// Prepare the schema | |
try (Session session = connector.openSession()) { | |
session.execute("DROP KEYSPACE IF EXISTS java_api"); | |
session.execute("CREATE KEYSPACE java_api WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}"); | |
session.execute("CREATE TABLE java_api.products (id INT PRIMARY KEY, name TEXT, parents LIST<INT>)"); | |
session.execute("CREATE TABLE java_api.sales (id UUID PRIMARY KEY, product INT, price DECIMAL)"); | |
session.execute("CREATE TABLE java_api.summaries (product INT PRIMARY KEY, summary DECIMAL)"); | |
} | |
// Prepare the products hierarchy | |
List<Product> products = Arrays.asList( | |
new Product(0, "All products", Collections.<Integer>emptyList()), | |
new Product(1, "Product A", Arrays.asList(0)), | |
new Product(4, "Product A1", Arrays.asList(0, 1)), | |
new Product(5, "Product A2", Arrays.asList(0, 1)), | |
new Product(2, "Product B", Arrays.asList(0)), | |
new Product(6, "Product B1", Arrays.asList(0, 2)), | |
new Product(7, "Product B2", Arrays.asList(0, 2)), | |
new Product(3, "Product C", Arrays.asList(0)), | |
new Product(8, "Product C1", Arrays.asList(0, 3)), | |
new Product(9, "Product C2", Arrays.asList(0, 3)) | |
); | |
JavaRDD<Product> productsRDD = sc.parallelize(products); | |
javaFunctions(productsRDD, Product.class).saveToCassandra("java_api", "products"); | |
JavaRDD<Sale> salesRDD = productsRDD.filter(new Function<Product, Boolean>() { | |
@Override | |
public Boolean call(Product product) throws Exception { | |
return product.getParents().size() == 2; | |
} | |
}).flatMap(new FlatMapFunction<Product, Sale>() { | |
@Override | |
public Iterable<Sale> call(Product product) throws Exception { | |
Random random = new Random(); | |
List<Sale> sales = new ArrayList<>(1000); | |
for (int i = 0; i < 1000; i++) { | |
sales.add(new Sale(UUID.randomUUID(), product.getId(), BigDecimal.valueOf(random.nextDouble()))); | |
} | |
return sales; | |
} | |
}); | |
javaFunctions(salesRDD, Sale.class).saveToCassandra("java_api", "sales"); | |
} | |
private void compute(JavaSparkContext sc) { | |
JavaPairRDD<Integer, Product> productsRDD = javaFunctions(sc) | |
.cassandraTable("java_api", "products", Product.class) | |
.keyBy(new Function<Product, Integer>() { | |
@Override | |
public Integer call(Product product) throws Exception { | |
return product.getId(); | |
} | |
}); | |
JavaPairRDD<Integer, Sale> salesRDD = javaFunctions(sc) | |
.cassandraTable("java_api", "sales", Sale.class) | |
.keyBy(new Function<Sale, Integer>() { | |
@Override | |
public Integer call(Sale sale) throws Exception { | |
return sale.getProduct(); | |
} | |
}); | |
JavaPairRDD<Integer, Tuple2<Sale, Product>> joinedRDD = salesRDD.join(productsRDD); | |
JavaPairRDD<Integer, BigDecimal> allSalesRDD = joinedRDD.flatMap(new PairFlatMapFunction<Tuple2<Integer, Tuple2<Sale, Product>>, Integer, BigDecimal>() { | |
@Override | |
public Iterable<Tuple2<Integer, BigDecimal>> call(Tuple2<Integer, Tuple2<Sale, Product>> input) throws Exception { | |
Tuple2<Sale, Product> saleWithProduct = input._2(); | |
List<Tuple2<Integer, BigDecimal>> allSales = new ArrayList<>(saleWithProduct._2().getParents().size() + 1); | |
allSales.add(new Tuple2<>(saleWithProduct._1().getProduct(), saleWithProduct._1().getPrice())); | |
for (Integer parentProduct : saleWithProduct._2().getParents()) { | |
allSales.add(new Tuple2<>(parentProduct, saleWithProduct._1().getPrice())); | |
} | |
return allSales; | |
} | |
}); | |
JavaRDD<Summary> summariesRDD = allSalesRDD.reduceByKey(new Function2<BigDecimal, BigDecimal, BigDecimal>() { | |
@Override | |
public BigDecimal call(BigDecimal v1, BigDecimal v2) throws Exception { | |
return v1.add(v2); | |
} | |
}).map(new Function<Tuple2<Integer, BigDecimal>, Summary>() { | |
@Override | |
public Summary call(Tuple2<Integer, BigDecimal> input) throws Exception { | |
return new Summary(input._1(), input._2()); | |
} | |
}); | |
javaFunctions(summariesRDD, Summary.class).saveToCassandra("java_api", "summaries"); | |
} | |
private void showResults(JavaSparkContext sc) { | |
JavaPairRDD<Integer, Summary> summariesRdd = javaFunctions(sc) | |
.cassandraTable("java_api", "summaries", Summary.class) | |
.keyBy(new Function<Summary, Integer>() { | |
@Override | |
public Integer call(Summary summary) throws Exception { | |
return summary.getProduct(); | |
} | |
}); | |
JavaPairRDD<Integer, Product> productsRdd = javaFunctions(sc) | |
.cassandraTable("java_api", "products", Product.class) | |
.keyBy(new Function<Product, Integer>() { | |
@Override | |
public Integer call(Product product) throws Exception { | |
return product.getId(); | |
} | |
}); | |
List<Tuple2<Product, Optional<Summary>>> results = productsRdd.leftOuterJoin(summariesRdd).values().toArray(); | |
for (Tuple2<Product, Optional<Summary>> result : results) { | |
System.out.println(result); | |
} | |
} | |
public static void main(String[] args) { | |
if (args.length != 2) { | |
System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>"); | |
System.exit(1); | |
} | |
SparkConf conf = new SparkConf(); | |
conf.setAppName("Java API demo"); | |
conf.setMaster(args[0]); | |
conf.set("spark.cassandra.connection.host", args[1]); | |
JavaDemo app = new JavaDemo(conf); | |
app.run(); | |
} | |
public static class Product implements Serializable { | |
private Integer id; | |
private String name; | |
private List<Integer> parents; | |
public Product() { } | |
public Product(Integer id, String name, List<Integer> parents) { | |
this.id = id; | |
this.name = name; | |
this.parents = parents; | |
} | |
public Integer getId() { return id; } | |
public void setId(Integer id) { this.id = id; } | |
public String getName() { return name; } | |
public void setName(String name) { this.name = name; } | |
public List<Integer> getParents() { return parents; } | |
public void setParents(List<Integer> parents) { this.parents = parents; } | |
@Override | |
public String toString() { | |
return MessageFormat.format("Product'{'id={0}, name=''{1}'', parents={2}'}'", id, name, parents); | |
} | |
} | |
public static class Sale implements Serializable { | |
private UUID id; | |
private Integer product; | |
private BigDecimal price; | |
public Sale() { } | |
public Sale(UUID id, Integer product, BigDecimal price) { | |
this.id = id; | |
this.product = product; | |
this.price = price; | |
} | |
public UUID getId() { return id; } | |
public void setId(UUID id) { this.id = id; } | |
public Integer getProduct() { return product; } | |
public void setProduct(Integer product) { this.product = product; } | |
public BigDecimal getPrice() { return price; } | |
public void setPrice(BigDecimal price) { this.price = price; } | |
@Override | |
public String toString() { | |
return MessageFormat.format("Sale'{'id={0}, product={1}, price={2}'}'", id, product, price); | |
} | |
} | |
public static class Summary implements Serializable { | |
private Integer product; | |
private BigDecimal summary; | |
public Summary() { } | |
public Summary(Integer product, BigDecimal summary) { | |
this.product = product; | |
this.summary = summary; | |
} | |
public Integer getProduct() { return product; } | |
public void setProduct(Integer product) { this.product = product; } | |
public BigDecimal getSummary() { return summary; } | |
public void setSummary(BigDecimal summary) { this.summary = summary; } | |
@Override | |
public String toString() { | |
return MessageFormat.format("Summary'{'product={0}, summary={1}'}'", product, summary); | |
} | |
} | |
} |
<?xml version="1.0" encoding="UTF-8"?> | |
<project xmlns="http://maven.apache.org/POM/4.0.0" | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>com.datastax.spark.demo</groupId> | |
<artifactId>java-demo</artifactId> | |
<version>1.0-SNAPSHOT</version> | |
<dependencies> | |
<!--Spark Cassandra Connector--> | |
<dependency> | |
<groupId>com.datastax.spark</groupId> | |
<artifactId>spark-cassandra-connector_2.10</artifactId> | |
<version>1.0.0-rc4</version> | |
</dependency> | |
<dependency> | |
<groupId>com.datastax.spark</groupId> | |
<artifactId>spark-cassandra-connector-java_2.10</artifactId> | |
<version>1.0.0-rc4</version> | |
</dependency> | |
<!--Spark--> | |
<dependency> | |
<groupId>org.apache.spark</groupId> | |
<artifactId>spark-core_2.10</artifactId> | |
<version>0.9.2</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.spark</groupId> | |
<artifactId>spark-streaming_2.10</artifactId> | |
<version>0.9.2</version> | |
</dependency> | |
</dependencies> | |
</project> |
I have added updated pom.xml and updated class at https://gist.github.com/baghelamit/f2963d9e37acc55474559104f5f16cf1. This is working for spark - 2.0.2, spark-cassandra-connector - 2.0.0-M3 and cassandra-driver-core - 3.1.2
I have this problem using CassandraConnectorConf class. Any ideas?
"main" java.lang.NoSuchMethodError: org.apache.commons.codec.binary.Base64.encodeBase64String([B)Ljava/lang/String;
at com.datastax.spark.connector.cql.CassandraConnectorConf.serializedConfString$lzycompute(CassandraConnectorConf.scala:40)
at com.datastax.spark.connector.cql.CassandraConnectorConf.serializedConfString(CassandraConnectorConf.scala:34)
at com.datastax.spark.connector.cql.CassandraConnectorConf.hashCode(CassandraConnectorConf.scala:43)
at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:206)
at scala.collection.concurrent.TrieMap$MangledHashing.hash(TrieMap.scala:955)
at scala.collection.concurrent.TrieMap.computeHash(TrieMap.scala:827)
at scala.collection.concurrent.TrieMap.get(TrieMap.scala:842)
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:50)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:79)
at customCassandraSpark.JavaDemo.generateData(JavaDemo.java:53)
at customCassandraSpark.JavaDemo.run(JavaDemo.java:43)
at customCassandraSpark.JavaDemo.main(JavaDemo.java:188)
Here is a working sample base on the sample
4.0.0
<groupId>com.datastax.spark.demo</groupId>
<artifactId>java-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!--Spark Cassandra Connector -->
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>2.0.5</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java_2.11</artifactId>
<version>1.5.2</version>
</dependency>
<!--Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>
package com.datastax.spark.demo;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;
import java.io.Serializable;
import java.math.BigDecimal;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import scala.Tuple2;
public class Test {
public static void main(String[] args) {
// TODO Auto-generated method stub
SparkConf conf = new SparkConf();
conf.setAppName("Java API demo");
conf.setMaster("local[*]");
conf.set("spark.cassandra.connection.host", "127.0.0.1");
JavaSparkContext sc = new JavaSparkContext(conf);
generateData(sc);
compute(sc);
showResults(sc);
sc.stop();
}
private static void showResults(JavaSparkContext sc) {
// TODO Auto-generated method stub
JavaPairRDD<Integer, Summary> summariesRdd = javaFunctions(sc)
.cassandraTable("java_api", "summaries", mapRowTo(Summary.class))
.keyBy(new Function<Summary, Integer>() {
@Override
public Integer call(Summary summary) throws Exception {
return summary.getProduct();
}
});
JavaPairRDD<Integer, Product> productsRdd = javaFunctions(sc)
.cassandraTable("java_api", "products", mapRowTo(Product.class))
.keyBy(new Function<Product, Integer>() {
@Override
public Integer call(Product product) throws Exception {
return product.getId();
}
});
List<Tuple2<Product, Optional<Summary>>> results = productsRdd.leftOuterJoin(summariesRdd).values().collect();
for (Tuple2<Product, Optional<Summary>> result : results) {
System.out.println(result);
}
}
private static void compute(JavaSparkContext sc) {
// TODO Auto-generated method stub
// JavaPairRDD<Integer, Product> productsRDD = javaFunctions(sc)
// .cassandraTable("java_api", "products", mapColumnTo(Product.class))
// .keyBy(new Function<Product, Integer>() {
// @OverRide
// public Integer call(Product product) throws Exception {
// return product.getId();
// }
// });
JavaPairRDD<Integer, Product> productsRDD = javaFunctions(sc).cassandraTable("java_api", "products",
mapRowTo(Product.class)).keyBy(new Function<Product, Integer>() {
@Override
public Integer call(Product product) throws Exception {
return product.getId();
}
});
// JavaRDD productsRDD = CassandraJavaUtil.javaFunctions(sc)
// .cassandraTable("my_keyspace", "my_table", CassandraJavaUtil.mapColumnTo(Product.class))
// .select("id","name","parents");
JavaPairRDD<Integer, Sale> salesRDD = javaFunctions(sc)
.cassandraTable("java_api", "sales",mapRowTo(Sale.class))
.keyBy(new Function<Sale, Integer>() {
@Override
public Integer call(Sale sale) throws Exception {
return sale.getProduct();
}
});
JavaPairRDD<Integer, Tuple2<Sale, Product>> joinedRDD = salesRDD.join(productsRDD);
JavaPairRDD<Integer, BigDecimal> allSalesRDD = joinedRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<Integer, Tuple2<Sale, Product>>, Integer, BigDecimal>() {
@Override
public Iterator<Tuple2<Integer, BigDecimal>> call(Tuple2<Integer, Tuple2<Sale, Product>> input) throws Exception {
Tuple2<Sale, Product> saleWithProduct = input._2();
List<Tuple2<Integer, BigDecimal>> allSales = new ArrayList<>(saleWithProduct._2().getParents().size() + 1);
allSales.add(new Tuple2<>(saleWithProduct._1().getProduct(), saleWithProduct._1().getPrice()));
for (Integer parentProduct : saleWithProduct._2().getParents()) {
allSales.add(new Tuple2<>(parentProduct, saleWithProduct._1().getPrice()));
}
return allSales.iterator();
}
});
JavaRDD<Summary> summariesRDD = allSalesRDD.reduceByKey(new Function2<BigDecimal, BigDecimal, BigDecimal>() {
@Override
public BigDecimal call(BigDecimal v1, BigDecimal v2) throws Exception {
return v1.add(v2);
}
}).map(new Function<Tuple2<Integer, BigDecimal>, Summary>() {
@Override
public Summary call(Tuple2<Integer, BigDecimal> input) throws Exception {
return new Summary(input._1(), input._2());
}
});
javaFunctions(summariesRDD).writerBuilder("java_api", "summaries", mapToRow(Summary.class)).saveToCassandra();
}
private static void generateData(JavaSparkContext sc) {
CassandraConnector connector = CassandraConnector.apply(sc.getConf());
// Prepare the schema
try (Session session = connector.openSession()) {
session.execute("DROP KEYSPACE IF EXISTS java_api");
session.execute("CREATE KEYSPACE java_api WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
session.execute("CREATE TABLE java_api.products (id INT PRIMARY KEY, name TEXT, parents LIST<INT>)");
session.execute("CREATE TABLE java_api.sales (id UUID PRIMARY KEY, product INT, price DECIMAL)");
session.execute("CREATE TABLE java_api.summaries (product INT PRIMARY KEY, summary DECIMAL)");
}
// Prepare the products hierarchy
List<Product> products = Arrays.asList(
new Product(0, "All products", Collections.<Integer>emptyList()),
new Product(1, "Product A", Arrays.asList(0)),
new Product(4, "Product A1", Arrays.asList(0, 1)),
new Product(5, "Product A2", Arrays.asList(0, 1)),
new Product(2, "Product B", Arrays.asList(0)),
new Product(6, "Product B1", Arrays.asList(0, 2)),
new Product(7, "Product B2", Arrays.asList(0, 2)),
new Product(3, "Product C", Arrays.asList(0)),
new Product(8, "Product C1", Arrays.asList(0, 3)),
new Product(9, "Product C2", Arrays.asList(0, 3))
);
JavaRDD<Product> productsRDD = sc.parallelize(products);
javaFunctions(productsRDD).writerBuilder("java_api", "products",mapToRow(Product.class)).saveToCassandra();
JavaRDD<Sale> salesRDD = productsRDD.filter(new Function<Product, Boolean>() {
@Override
public Boolean call(Product product) throws Exception {
return product.getParents().size() == 2;
}
}).flatMap(new FlatMapFunction<Product, Sale>() {
@Override
public Iterator<Sale> call(Product product) throws Exception {
// TODO Auto-generated method stub
Random random = new Random();
List<Sale> sales = new ArrayList<Sale>(1000);
for (int i = 0; i < 1000; i++) {
sales.add(new Sale(UUID.randomUUID(), product.getId(), BigDecimal.valueOf(random.nextDouble())));
}
return sales.iterator();
}
});
javaFunctions(salesRDD).writerBuilder("java_api", "sales",mapToRow(Sale.class)).saveToCassandra();
}
public static class Product implements Serializable {
private Integer id;
private String name;
private List<Integer> parents;
public Product() { }
public Product(Integer id, String name, List<Integer> parents) {
this.id = id;
this.name = name;
this.parents = parents;
}
public Integer getId() { return id; }
public void setId(Integer id) { this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public List<Integer> getParents() { return parents; }
public void setParents(List<Integer> parents) { this.parents = parents; }
@Override
public String toString() {
return MessageFormat.format("Product'{'id={0}, name=''{1}'', parents={2}'}'", id, name, parents);
}
}
public static class Sale implements Serializable {
private UUID id;
private Integer product;
private BigDecimal price;
public Sale() { }
public Sale(UUID id, Integer product, BigDecimal price) {
this.id = id;
this.product = product;
this.price = price;
}
public UUID getId() { return id; }
public void setId(UUID id) { this.id = id; }
public Integer getProduct() { return product; }
public void setProduct(Integer product) { this.product = product; }
public BigDecimal getPrice() { return price; }
public void setPrice(BigDecimal price) { this.price = price; }
@Override
public String toString() {
return MessageFormat.format("Sale'{'id={0}, product={1}, price={2}'}'", id, product, price);
}
}
public static class Summary implements Serializable {
private Integer product;
private BigDecimal summary;
public Summary() { }
public Summary(Integer product, BigDecimal summary) {
this.product = product;
this.summary = summary;
}
public Integer getProduct() { return product; }
public void setProduct(Integer product) { this.product = product; }
public BigDecimal getSummary() { return summary; }
public void setSummary(BigDecimal summary) { this.summary = summary; }
@Override
public String toString() {
return MessageFormat.format("Summary'{'product={0}, summary={1}'}'", product, summary);
}
}
}
Completely rewritten with ( java version "1.8.0_131" & javac 1.8.0_131 ). For those who newbies, complete application can be found here.
https://github.com/sunone5/BigData/tree/master/spark-cassandra-streaming
Thanks for your tutorials. There are lots of people wonder how to make a very good, functional blog. That’s why this tutorial may come in handy, especially for Java programmers, who are just beginning to code. It is not a surprise Java is one of the oldest, still, the most popular programming language, which inspired lots of other languages. For instance, 3 Billion Mobile units and 125 million television units run Java. In my opinion, starting coding with Java would be very beneficial, as, apparently, Java has become a great foundation for creating other languages.
This example no longer works as : com.datastax.spark.connector.CassandraJavaUtil. no longer exists in its old form. The new one is in separate package (com.datastax.spark.connector.japi.CassandraJavaUtil.) but does not have the overloaded .javaFunctions(). Some functions exist n other forms but no 1:1 match, couldn't find any equivalent to the one that can take .class as input. (at least I have not yet found )
So this needs to be totally refactored- has any one done that.