Created
July 30, 2018 11:43
-
-
Save sebge2emasphere/6c4df97ae86e7cdcad064137b3b26dcb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.emasphere.poc.parquetspark; | |
import com.jcraft.jsch.JSch; | |
import com.jcraft.jsch.Session; | |
import org.apache.commons.io.IOUtils; | |
import org.apache.spark.api.java.JavaSparkContext; | |
import org.apache.spark.sql.Dataset; | |
import org.apache.spark.sql.Row; | |
import org.apache.spark.sql.SQLContext; | |
import org.apache.spark.sql.SaveMode; | |
import org.postgresql.Driver; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.IOException; | |
import java.sql.Connection; | |
import java.sql.SQLException; | |
import java.util.Properties; | |
/** | |
* @author Sebastien Gerard | |
*/ | |
public class ParquetSqlImporter { | |
public static ParquetSqlImporter sqlImporter(String tableName, | |
String query, | |
JavaSparkContext context) { | |
try { | |
return new ParquetSqlImporter(tableName, query, context); | |
} catch (Exception e) { | |
throw new IllegalArgumentException("Cannot initialize the importer.", e); | |
} | |
} | |
public static final String LOCATION = "s3a://ema-data-lake/parquet-test/test.parquet"; | |
private static final Logger logger = LoggerFactory.getLogger(ParquetSqlImporter.class); | |
private final String tableName; | |
private final String query; | |
private final JavaSparkContext context; | |
private final Properties properties; | |
public ParquetSqlImporter(String tableName, | |
String query, | |
JavaSparkContext context) throws IOException { | |
this.tableName = tableName; | |
this.query = query; | |
this.context = context; | |
this.properties = new Properties(); | |
this.properties.load(getClass().getResourceAsStream("/application.properties")); | |
} | |
public void load() { | |
final String url = "jdbc:postgresql://localhost:" + properties.getProperty("importer.bastion.local-port") | |
+ "/" + properties.getProperty("importer.db.name") | |
+ "?user=" + properties.getProperty("importer.db.user") | |
+ "&password=" + properties.getProperty("importer.db.password"); | |
final Session session = doSshTunnel(properties); | |
try { | |
logDbDetails(url); | |
SQLContext | |
.getOrCreate(context.sc()) | |
.read() | |
.option("driver", Driver.class.getName()) | |
.jdbc(url, tableName, new Properties()) | |
.filter(query) | |
.write() | |
.mode(SaveMode.Append) | |
.parquet(LOCATION); | |
} finally { | |
if (session != null) { | |
session.disconnect(); | |
} | |
} | |
} | |
private void logDbDetails(String url) { | |
logger.info("Configured DB URL [{}].", url); | |
try { | |
final Connection connect = new Driver().connect(url, new Properties()); | |
logger.info("Connection to Posgresql established, schema [{}].", connect.getSchema()); | |
} catch (SQLException e) { | |
logger.error("Cannot connect to postgresql [" + e.getMessage() + "].", e); | |
} | |
} | |
private Session doSshTunnel(Properties properties) { | |
try { | |
final JSch jsch = new JSch(); | |
final Session session = jsch.getSession( | |
properties.getProperty("importer.bastion.user"), | |
properties.getProperty("importer.bastion.url"), | |
Integer.valueOf(properties.getProperty("importer.bastion.port")) | |
); | |
jsch.addIdentity( | |
"eb-preprod", | |
IOUtils.toByteArray( | |
ParquetSqlImporter.class.getResource(properties.getProperty("importer.bastion.key-file")) | |
), | |
null, | |
null | |
); | |
final Properties config = new Properties(); | |
config.put("StrictHostKeyChecking", "no"); | |
session.setConfig(config); | |
session.connect(); | |
session.setPortForwardingL( | |
Integer.valueOf(properties.getProperty("importer.bastion.local-port")), | |
properties.getProperty("importer.db.url"), | |
Integer.valueOf(properties.getProperty("importer.db.port")) | |
); | |
return session; | |
} catch (Exception e) { | |
logger.error("Cannot open SSH tunnel.", e); | |
return null; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment