Created
April 21, 2026 18:59
-
-
Save rustyrazorblade/d6796570fe81f481332b5fae1e2bb492 to your computer and use it in GitHub Desktop.
spark with creds
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| S3TransportExtension.java | |
| import org.apache.cassandra.spark.transports.storage.StorageCredentialPair; | |
| import org.apache.cassandra.spark.transports.storage.StorageCredentials; | |
| import org.apache.cassandra.spark.transports.storage.extensions.*; | |
| import org.apache.spark.SparkConf; | |
| public class S3TransportExtension implements StorageTransportExtension | |
| { | |
| @Override | |
| public void initialize(String jobId, SparkConf conf, boolean isOnDriver) {} | |
| @Override | |
| public StorageTransportConfiguration getStorageConfiguration() | |
| { | |
| StorageCredentials creds = new StorageCredentials("MY_ACCESS_KEY_ID", "MY_SECRET_KEY"); | |
| StorageCredentialPair pair = new StorageCredentialPair("us-east-1", creds, "us-east-1", creds); | |
| return new StorageTransportConfiguration("my-bucket", "us-east-1", | |
| "my-bucket", "us-east-1", | |
| "job-prefix", pair, ImmutableMap.of()); | |
| } | |
| @Override public void onTransportStart(long elapsedMillis) {} | |
| @Override public void setCredentialChangeListener(CredentialChangeListener l) {} | |
| @Override public void setObjectFailureListener(ObjectFailureListener l) {} | |
| @Override public void onObjectPersisted(String bucket, String key, long sizeInBytes) {} | |
| @Override public void onAllObjectsPersisted(long objects, long rows, long elapsedMillis) {} | |
| @Override public void onObjectApplied(String bucket, String key, long sizeInBytes, long elapsedMillis) {} | |
| @Override public void onJobSucceeded(long elapsedMillis) {} | |
| @Override public void onJobFailed(long elapsedMillis, Throwable t) {} | |
| @Override public void onStageSucceeded(String clusterId, long elapsedMillis) {} | |
| @Override public void onStageFailed(String clusterId, Throwable cause) {} | |
| @Override public void onImportSucceeded(String clusterId, long elapsedMillis) {} | |
| @Override public void onImportFailed(String clusterId, Throwable cause) {} | |
| @Override public void setCoordinationSignalListener(CoordinationSignalListener l) {} | |
| } | |
| S3WriteJob.java | |
| import org.apache.cassandra.spark.KryoRegister; | |
| import org.apache.cassandra.spark.bulkwriter.BulkSparkConf; | |
| import org.apache.spark.SparkConf; | |
| import org.apache.spark.api.java.JavaSparkContext; | |
| import org.apache.spark.sql.*; | |
| import org.apache.spark.sql.types.StructType; | |
| import static org.apache.spark.sql.types.DataTypes.*; | |
| public class S3WriteJob | |
| { | |
| public static void main(String[] args) | |
| { | |
| SparkConf sparkConf = new SparkConf().setAppName("S3WriteJob").set("spark.master", "local[8]"); | |
| BulkSparkConf.setupSparkConf(sparkConf, true); | |
| KryoRegister.setup(sparkConf); | |
| SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate(); | |
| StructType schema = new StructType() | |
| .add("id", LongType, false) | |
| .add("course", BinaryType, false) | |
| .add("marks", LongType, false); | |
| Dataset<Row> df = spark.createDataFrame(generateRows(spark.sparkContext()), schema); | |
| df.write() | |
| .format("org.apache.cassandra.spark.sparksql.CassandraDataSink") | |
| .option("sidecar_contact_points", "localhost") | |
| .option("keyspace", "spark_test") | |
| .option("table", "test") | |
| .option("local_dc", "datacenter1") | |
| .option("bulk_writer_cl", "LOCAL_QUORUM") | |
| .option("data_transport", "S3_COMPAT") | |
| .option("data_transport_extension_class", S3TransportExtension.class.getCanonicalName()) | |
| .mode("append") | |
| .save(); | |
| spark.stop(); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment