Created
July 30, 2018 11:41
-
-
Save sebge2emasphere/002faefe4c161d349c03728aec386eac to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.emasphere.poc.hbase; | |
import com.emasphere.data.executor.common.DataFormatUtils; | |
import com.emasphere.data.executor.common.utils.FlowExecutorUtils; | |
import com.jcraft.jsch.JSch; | |
import com.jcraft.jsch.Session; | |
import org.apache.commons.io.IOUtils; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.hbase.HBaseConfiguration; | |
import org.apache.hadoop.hbase.client.Mutation; | |
import org.apache.hadoop.hbase.client.Put; | |
import org.apache.hadoop.hbase.client.Result; | |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; | |
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; | |
import org.apache.hadoop.mapred.JobConf; | |
import org.apache.spark.api.java.JavaSparkContext; | |
import org.apache.spark.api.java.function.PairFunction; | |
import org.apache.spark.sql.Dataset; | |
import org.apache.spark.sql.Row; | |
import org.apache.spark.sql.SQLContext; | |
import org.postgresql.Driver; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import scala.Tuple2; | |
import java.io.File; | |
import java.io.IOException; | |
import java.math.BigDecimal; | |
import java.net.MalformedURLException; | |
import java.net.URL; | |
import java.sql.Connection; | |
import java.sql.Date; | |
import java.sql.SQLException; | |
import java.sql.Timestamp; | |
import java.util.Arrays; | |
import java.util.Properties; | |
import static org.apache.hadoop.hbase.HConstants.*; | |
import static org.apache.hadoop.hbase.mapreduce.TableInputFormat.*; | |
import static org.apache.hadoop.hbase.mapreduce.TableOutputFormat.*; | |
import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.*; | |
/** | |
* @author Sebastien Gerard | |
*/ | |
public class HBaseSqlImporter { | |
public static HBaseSqlImporter sqlImporter(String tableName, | |
String query, | |
JavaSparkContext context) { | |
try { | |
return new HBaseSqlImporter(tableName, query, context); | |
} catch (Exception e) { | |
throw new IllegalArgumentException("Cannot initialize the importer.", e); | |
} | |
} | |
private static final Logger logger = LoggerFactory.getLogger(HBaseSqlImporter.class); | |
private final String tableName; | |
private final String query; | |
private final JavaSparkContext context; | |
private final Properties properties; | |
public HBaseSqlImporter(String tableName, | |
String query, | |
JavaSparkContext context) throws IOException { | |
this.tableName = tableName; | |
this.query = query; | |
this.context = context; | |
this.properties = new Properties(); | |
this.properties.load(getClass().getResourceAsStream("/application.properties")); | |
} | |
public void load() throws MalformedURLException { | |
final String url = "jdbc:postgresql://localhost:" + properties.getProperty("importer.bastion.local-port") | |
+ "/" + properties.getProperty("importer.db.name") | |
+ "?user=" + properties.getProperty("importer.db.user") | |
+ "&password=" + properties.getProperty("importer.db.password"); | |
final Session session = doSshTunnel(properties); | |
try { | |
logDbDetails(url); | |
final Configuration configuration = initializeConfiguration(); | |
Dataset<Row> sqlDataSet = SQLContext | |
.getOrCreate(context.sc()) | |
.read() | |
.option("driver", Driver.class.getName()) | |
.jdbc(url, tableName, new Properties()); | |
if (query != null) { | |
sqlDataSet = sqlDataSet.filter(query); | |
} | |
final Mapper func = new Mapper(); | |
sqlDataSet | |
.toJavaRDD() | |
.mapToPair(func) | |
.saveAsNewAPIHadoopFile( | |
FlowExecutorUtils.TABLE, | |
ImmutableBytesWritable.class, | |
Result.class, | |
TableOutputFormat.class, | |
new JobConf(configuration) | |
); | |
} finally { | |
if (session != null) { | |
session.disconnect(); | |
} | |
} | |
} | |
private Configuration initializeConfiguration() throws MalformedURLException { | |
final Configuration configuration = HBaseConfiguration.create(); | |
final URL url = new URL("file:///etc/hbase/conf/hbase-site.xml"); | |
if (new File(url.getFile()).exists()) { | |
logger.info("Loading HBase configuration, found {}.", new File(url.getFile()).exists()); | |
configuration.addResource(url); | |
} else { | |
logger.info("HBase configuration file [{}] does not exist.", url); | |
} | |
configuration.set(INPUT_TABLE, "flow"); | |
configuration.set(OUTDIR, "flow"); | |
configuration.set(OUTPUT_TABLE, "flow"); | |
configuration.set(ZOOKEEPER_QUORUM, "localhost"); | |
configuration.set("hbase.zookeeper.property.clientPort", "2181"); | |
return configuration; | |
} | |
private void logDbDetails(String url) { | |
logger.info("Configured DB URL [{}].", url); | |
try { | |
final Connection connect = new Driver().connect(url, new Properties()); | |
logger.info("Connection to Posgresql established, schema [{}].", connect.getSchema()); | |
} catch (SQLException e) { | |
logger.error("Cannot connect to postgresql [" + e.getMessage() + "].", e); | |
} | |
} | |
private Session doSshTunnel(Properties properties) { | |
try { | |
final JSch jsch = new JSch(); | |
final Session session = jsch.getSession( | |
properties.getProperty("importer.bastion.user"), | |
properties.getProperty("importer.bastion.url"), | |
Integer.valueOf(properties.getProperty("importer.bastion.port")) | |
); | |
jsch.addIdentity( | |
"eb-preprod", | |
IOUtils.toByteArray( | |
HBaseSqlImporter.class.getResource(properties.getProperty("importer.bastion.key-file")) | |
), | |
null, | |
null | |
); | |
final Properties config = new Properties(); | |
config.put("StrictHostKeyChecking", "no"); | |
session.setConfig(config); | |
session.connect(); | |
session.setPortForwardingL( | |
Integer.valueOf(properties.getProperty("importer.bastion.local-port")), | |
properties.getProperty("importer.db.url"), | |
Integer.valueOf(properties.getProperty("importer.db.port")) | |
); | |
return session; | |
} catch (Exception e) { | |
logger.error("Cannot open SSH tunnel.", e); | |
return null; | |
} | |
} | |
private static class Mapper implements PairFunction<Row, ImmutableBytesWritable, Mutation> { | |
@Override | |
public Tuple2<ImmutableBytesWritable, Mutation> call(Row row) { | |
final byte[] rowKey = concat( | |
DataFormatUtils.toBytes(row.getDecimal(row.fieldIndex("id"))), | |
DataFormatUtils.toBytes(row.getString(row.fieldIndex("i"))) | |
); | |
final Put put = new Put(rowKey); | |
for (int i = 0; i < row.size(); i++) { | |
final Object value = row.get(i); | |
if (value == null) { | |
continue; | |
} | |
final String name = row.schema().fieldNames()[i]; | |
final byte[] bytesValue; | |
if (value instanceof BigDecimal) { | |
bytesValue = DataFormatUtils.toBytes((BigDecimal) value); | |
} else if (value instanceof Date) { | |
bytesValue = DataFormatUtils.toBytes(((Date) value).toLocalDate()); | |
} else if (value instanceof Timestamp) { | |
bytesValue = DataFormatUtils.toBytes(((Timestamp) value).toLocalDateTime()); | |
} else if (value instanceof Boolean) { | |
bytesValue = DataFormatUtils.toBytes((Boolean) value); | |
} else if (value instanceof String) { | |
bytesValue = DataFormatUtils.toBytes((String) value); | |
} else { | |
throw new UnsupportedOperationException("Unsupported type [" + value.getClass() + "]."); | |
} | |
put.addColumn( | |
"d".getBytes(), | |
name.getBytes(), | |
0, | |
bytesValue | |
); | |
} | |
return Tuple2.apply(new ImmutableBytesWritable(rowKey), put); | |
} | |
public byte[] concat(byte[] first, byte[] second) { | |
byte[] both = Arrays.copyOf(first, first.length + second.length); | |
System.arraycopy(second, 0, both, first.length, second.length); | |
return both; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment