Skip to content

Instantly share code, notes, and snippets.

@hakanilter
Created August 9, 2017 23:34
Show Gist options
  • Save hakanilter/b903992a18d51c6f4ab19361012b44b5 to your computer and use it in GitHub Desktop.
Save hakanilter/b903992a18d51c6f4ab19361012b44b5 to your computer and use it in GitHub Desktop.
Export data from Jdbc datasource to DynamoDB with Spark
SparkConf sparkConf = new SparkConf()
.setAppName(JdbcDynamoDbExportJob.class.getSimpleName())
.setMaster(config.getProperty("spark.master"));
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(jsc);
// read from database
Properties properties = new Properties();
properties.setProperty("user", config.getProperty("jdbc.user"));
properties.setProperty("password", config.getProperty("jdbc.pass"));
String driver = config.getProperty("jdbc.driver");
String url = config.getProperty("jdbc.url");
Class.forName(driver);
DataFrame df = sqlContext.read().jdbc(url, sqlTableName, properties);
// write to dynamodb
JobConf jobConf = new JobConf(jsc.hadoopConfiguration());
jobConf.set("dynamodb.servicename", "dynamodb");
jobConf.set("dynamodb.endpoint", config.getProperty("dynamodb.endpoint"));
jobConf.set("dynamodb.regionid", config.getProperty("dynamodb.regionid"));
jobConf.set("dynamodb.output.tableName", dynamoTableName);
jobConf.set("dynamodb.throughput.write", "1");
jobConf.set("dynamodb.throughput.write.percent", "1");
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat");
jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat");
// convert to dynamo objects and save
String[] columns = df.columns();
df.javaRDD()
.mapToPair(row -> new Tuple2(new Text(""), createItem(row, columns)))
.saveAsHadoopDataset(jobConf);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment