Created
July 30, 2018 13:26
-
-
Save sebge2emasphere/5b4cbeb089b845ea5f348abba2bd6e30 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.emasphere.poc.hbase.sample; | |
import com.emasphere.data.executor.common.DataFormatUtils; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.hbase.HBaseConfiguration; | |
import org.apache.hadoop.hbase.client.Result; | |
import org.apache.hadoop.hbase.client.Scan; | |
import org.apache.hadoop.hbase.filter.BinaryComparator; | |
import org.apache.hadoop.hbase.filter.CompareFilter; | |
import org.apache.hadoop.hbase.filter.FilterList; | |
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; | |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; | |
import org.apache.hadoop.hbase.mapreduce.TableInputFormat; | |
import org.apache.spark.api.java.JavaSparkContext; | |
import org.apache.spark.api.java.function.Function; | |
import org.apache.spark.api.java.function.Function2; | |
import scala.Tuple2; | |
import java.math.BigDecimal; | |
import java.net.URL; | |
import java.time.LocalDate; | |
import java.time.LocalDateTime; | |
import java.time.ZoneId; | |
import static org.apache.hadoop.hbase.HConstants.*; | |
import static org.apache.hadoop.hbase.mapreduce.TableInputFormat.*; | |
import static org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.*; | |
import static org.apache.hadoop.hbase.mapreduce.TableOutputFormat.*; | |
import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.*; | |
/** | |
* @author Sebastien Gerard | |
*/ | |
public class Sample01 { | |
public static BigDecimal compute(JavaSparkContext context) throws Exception { | |
// "code_vendeur = 'JVA' and datefact >= date '2018-01-01' and datefact <= date '2018-06-30'" | |
final Mapper mapper = new Mapper(); | |
final Additionner additionner = new Additionner(); | |
return context | |
.newAPIHadoopRDD( | |
initializeConfiguration(), | |
TableInputFormat.class, | |
ImmutableBytesWritable.class, | |
Result.class | |
) | |
.map(mapper) | |
.reduce(additionner); | |
} | |
private static Configuration initializeConfiguration() throws Exception { | |
final Configuration configuration = HBaseConfiguration.create(); | |
configuration.addResource(new URL("file:///etc/hbase/conf/hbase-site.xml")); | |
configuration.set(INPUT_TABLE, "flow"); | |
configuration.set(OUTDIR, "flow"); | |
configuration.set(OUTPUT_TABLE, "flow"); | |
configuration.set(ZOOKEEPER_QUORUM, "localhost"); | |
configuration.set("hbase.zookeeper.property.clientPort", "2181"); | |
final Scan scan = new Scan(); | |
scan.setFilter( | |
new FilterList( | |
new SingleColumnValueFilter( | |
"d".getBytes(), | |
"code_vendeur".getBytes(), | |
CompareFilter.CompareOp.EQUAL, | |
new BinaryComparator(DataFormatUtils.toBytes("JVA")) | |
), | |
new SingleColumnValueFilter( | |
"d".getBytes(), | |
"datefact".getBytes(), | |
CompareFilter.CompareOp.GREATER_OR_EQUAL, | |
new BinaryComparator( | |
DataFormatUtils.toBytes( | |
LocalDate.of(2018, 1, 1).atTime(0, 0, 0) | |
) | |
) | |
), | |
new SingleColumnValueFilter( | |
"d".getBytes(), | |
"datefact".getBytes(), | |
CompareFilter.CompareOp.LESS_OR_EQUAL, | |
new BinaryComparator( | |
DataFormatUtils.toBytes( | |
LocalDate.of(2018, 6, 30).atTime(23, 59, 59) | |
) | |
) | |
) | |
) | |
); | |
scan.addColumn("d".getBytes(), "montant".getBytes()); | |
scan.addColumn("d".getBytes(), "code_vendeur".getBytes()); | |
scan.addColumn("d".getBytes(), "datefact".getBytes()); | |
configuration.set(TableInputFormat.SCAN, convertScanToString(scan)); | |
return configuration; | |
} | |
private static class Mapper implements Function<Tuple2<ImmutableBytesWritable, Result>, BigDecimal> { | |
@Override | |
public BigDecimal call(Tuple2<ImmutableBytesWritable, Result> row) { | |
return DataFormatUtils.toBigDecimal(row._2().getValue("d".getBytes(), "montant".getBytes())); | |
} | |
} | |
private static class Additionner implements Function2<BigDecimal, BigDecimal, BigDecimal> { | |
@Override | |
public BigDecimal call(BigDecimal first, BigDecimal second) { | |
if (first == null) { | |
return null; | |
} else if (second == null) { | |
return null; | |
} else { | |
return first.add(second); | |
} | |
} | |
} | |
private static class Filter implements Function<Tuple2<ImmutableBytesWritable, Result>, Boolean> { | |
@Override | |
public Boolean call(Tuple2<ImmutableBytesWritable, Result> result) { | |
final String vendeur = DataFormatUtils.toString(result._2().getValue("d".getBytes(), "code_vendeur".getBytes())); | |
final LocalDateTime dateFact = DataFormatUtils.toLocalDateTime(result._2().getValue("d".getBytes(), "datefact".getBytes())); | |
return (vendeur != null) | |
&& vendeur.contains("JVA") | |
&& (dateFact != null) | |
&& dateFact.isAfter(LocalDate.of(2018, 1, 1).atStartOfDay(ZoneId.systemDefault()).toLocalDateTime()) | |
&& dateFact.isBefore(LocalDate.of(2018, 6, 30).atStartOfDay(ZoneId.systemDefault()).toLocalDateTime()); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment