Skip to content

Instantly share code, notes, and snippets.

View RussellSpitzer's full-sized avatar
🤷‍♀️
...

Russell Spitzer RussellSpitzer

🤷‍♀️
...
View GitHub Profile
package com.datastax.spark.connector.streaming
import akka.actor._
import akka.testkit.{ImplicitSender, TestKit}
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.embedded._
import com.datastax.spark.connector.streaming.StreamingEvent.ReceiverStarted
import com.datastax.spark.connector.testkit._
import org.apache.spark.SparkEnv
trait StreamingSpec extends AbstractSpec with SharedEmbeddedCassandra with SparkTemplate with BeforeAndAfterAll {
import org.apache.spark.streaming.StreamingContext
import scala.concurrent.duration._
val duration = 10.seconds
useCassandraConfig(Seq("cassandra-default.yaml.template"))
def withStreamingContext(testCode: (StreamingContext) => Any, checkpointed: Option[(()=>StreamingContext,String)] = None): Unit = {
val ssc = checkpointed match {
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext
def twoClusterExample ( sc: SparkContext) = {
val connectorToClusterOne = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.1"))
val connectorToClusterTwo = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.2"))
import com.datastax.bdp.spark.DseSparkConfHelper;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
@RussellSpitzer
RussellSpitzer / SparkCassandra.R
Created September 10, 2015 23:01
SparkR and Cassandra
./bin/sparkR --packages com.datastax.spark:spark-cassandra-connector_2.10:1.4.0-M3 --conf spark.cassandra.connection.host 127.0.0.1
myframe <-read.df(sqlContext, source = "org.apache.spark.sql.cassandra", keyspace = "important", table = "letters")
head(myframe)
mailbox touser fromuser
1 1 doc marty
2 1 lorraine marty
3 2 marty doc
body
package com.datastax.spark.connector.rdd
import com.datastax.driver.core.Session
import com.datastax.spark.connector.{PartitionKeyColumns, AllColumns, ColumnSelector}
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.rdd.RDD
import com.datastax.spark.connector.writer.{BoundStatementBuilder, RowWriter}
import com.datastax.spark.connector.rdd.reader.{PrefetchingResultSetIterator, RowReader}
/**

Configuration Reference

Cassandra Authentication Parameters

Property Name Default Description
spark.cassandra.auth.conf.factory com.datastax.spark.connector.cql.DefaultAuthConfFactory$ name of a Scala module or class
spark.cassandra.auth.username None Login name for password authentication
spark.cassandra.auth.password None password for password authentication

Configuration Reference

Cassandra Authentication Parameters

Property Name Default Description
spark.cassandra.auth.conf.factory com.datastax.spark.connector.cql.DefaultAuthConfFactory$ name of a Scala module or class implementing AuthConfFactory providing custom authentication configuration
spark.cassandra.auth.username None Login name for password authentication
spark.cassandra.auth.password None password for password authentication
@RussellSpitzer
RussellSpitzer / SparkGraphComputer.java
Created October 5, 2015 18:09
Thoughts on passing in persistContext
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0