Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save rustyrazorblade/d6796570fe81f481332b5fae1e2bb492 to your computer and use it in GitHub Desktop.

Select an option

Save rustyrazorblade/d6796570fe81f481332b5fae1e2bb492 to your computer and use it in GitHub Desktop.
spark with creds
S3TransportExtension.java
import org.apache.cassandra.spark.transports.storage.StorageCredentialPair;
import org.apache.cassandra.spark.transports.storage.StorageCredentials;
import org.apache.cassandra.spark.transports.storage.extensions.*;
import org.apache.spark.SparkConf;
public class S3TransportExtension implements StorageTransportExtension
{
@Override
public void initialize(String jobId, SparkConf conf, boolean isOnDriver) {}
@Override
public StorageTransportConfiguration getStorageConfiguration()
{
StorageCredentials creds = new StorageCredentials("MY_ACCESS_KEY_ID", "MY_SECRET_KEY");
StorageCredentialPair pair = new StorageCredentialPair("us-east-1", creds, "us-east-1", creds);
return new StorageTransportConfiguration("my-bucket", "us-east-1",
"my-bucket", "us-east-1",
"job-prefix", pair, ImmutableMap.of());
}
@Override public void onTransportStart(long elapsedMillis) {}
@Override public void setCredentialChangeListener(CredentialChangeListener l) {}
@Override public void setObjectFailureListener(ObjectFailureListener l) {}
@Override public void onObjectPersisted(String bucket, String key, long sizeInBytes) {}
@Override public void onAllObjectsPersisted(long objects, long rows, long elapsedMillis) {}
@Override public void onObjectApplied(String bucket, String key, long sizeInBytes, long elapsedMillis) {}
@Override public void onJobSucceeded(long elapsedMillis) {}
@Override public void onJobFailed(long elapsedMillis, Throwable t) {}
@Override public void onStageSucceeded(String clusterId, long elapsedMillis) {}
@Override public void onStageFailed(String clusterId, Throwable cause) {}
@Override public void onImportSucceeded(String clusterId, long elapsedMillis) {}
@Override public void onImportFailed(String clusterId, Throwable cause) {}
@Override public void setCoordinationSignalListener(CoordinationSignalListener l) {}
}
S3WriteJob.java
import org.apache.cassandra.spark.KryoRegister;
import org.apache.cassandra.spark.bulkwriter.BulkSparkConf;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.types.DataTypes.*;
public class S3WriteJob
{
public static void main(String[] args)
{
SparkConf sparkConf = new SparkConf().setAppName("S3WriteJob").set("spark.master", "local[8]");
BulkSparkConf.setupSparkConf(sparkConf, true);
KryoRegister.setup(sparkConf);
SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();
StructType schema = new StructType()
.add("id", LongType, false)
.add("course", BinaryType, false)
.add("marks", LongType, false);
Dataset<Row> df = spark.createDataFrame(generateRows(spark.sparkContext()), schema);
df.write()
.format("org.apache.cassandra.spark.sparksql.CassandraDataSink")
.option("sidecar_contact_points", "localhost")
.option("keyspace", "spark_test")
.option("table", "test")
.option("local_dc", "datacenter1")
.option("bulk_writer_cl", "LOCAL_QUORUM")
.option("data_transport", "S3_COMPAT")
.option("data_transport_extension_class", S3TransportExtension.class.getCanonicalName())
.mode("append")
.save();
spark.stop();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment