Created
July 20, 2016 11:06
-
-
Save bobrik/81748aaec4c349b736e8a103d1515103 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.cloudflare; | |
import com.google.common.collect.Iterators; | |
import net.opentsdb.core.IllegalDataException; | |
import net.opentsdb.core.Internal; | |
import net.opentsdb.core.TSDB; | |
import net.opentsdb.utils.Config; | |
import org.apache.commons.lang.StringUtils; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.hbase.Cell; | |
import org.apache.hadoop.hbase.HBaseConfiguration; | |
import org.apache.hadoop.hbase.client.Result; | |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; | |
import org.apache.hadoop.hbase.mapreduce.TableInputFormat; | |
import org.apache.hadoop.hbase.util.Bytes; | |
import org.apache.spark.api.java.JavaPairRDD; | |
import org.apache.spark.api.java.JavaRDD; | |
import org.apache.spark.api.java.JavaSparkContext; | |
import org.apache.spark.api.java.function.FlatMapFunction; | |
import org.hbase.async.KeyValue; | |
import scala.Tuple2; | |
import java.io.IOException; | |
import java.io.Serializable; | |
import java.util.*; | |
public class OpenTSDBInput { | |
private OpenTSDBInput() { | |
} | |
public static JavaRDD<DataPoint> rdd(JavaSparkContext ctx, Config tsdbConfig, Configuration hbaseConfigOverrides) { | |
Configuration srcConf = HBaseConfiguration.create(); | |
for (Map.Entry<String, String> entry : hbaseConfigOverrides) { | |
srcConf.set(entry.getKey(), entry.getValue()); | |
} | |
srcConf.set("hbase.zookeeper.quorum", tsdbConfig.getString("tsd.storage.hbase.zk_quorum")); | |
srcConf.set("zookeeper.znode.parent", tsdbConfig.getString("tsd.storage.hbase.zk_basedir")); | |
srcConf.set(TableInputFormat.INPUT_TABLE, tsdbConfig.getString("tsd.storage.hbase.data_table")); | |
JavaPairRDD<ImmutableBytesWritable, Result> rdd = ctx.newAPIHadoopRDD(srcConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); | |
Map<String, String> config = new ExportableConfig(tsdbConfig).export(); | |
return rdd.mapPartitions(new DataPointPartitionMapper(config)) | |
.flatMap(dps -> dps); | |
} | |
public static class ExportableConfig extends Config { | |
public ExportableConfig(Config parent) { | |
super(parent); | |
} | |
public Map<String, String> export() { | |
return new HashMap<>(properties); | |
} | |
} | |
public static class DataPoint implements Serializable { | |
private String metric; | |
private Map<String, String> tags; | |
private Long timestamp; | |
private Number value; | |
public DataPoint(String metric, Map<String, String> tags, Long timestamp, Number value) { | |
this.metric = metric; | |
this.tags = tags; | |
this.timestamp = timestamp; | |
this.value = value; | |
} | |
public String getMetric() { | |
return metric; | |
} | |
public Map<String, String> getTags() { | |
return tags; | |
} | |
public Long getTimestamp() { | |
return timestamp; | |
} | |
public Number getValue() { | |
return value; | |
} | |
@Override | |
public String toString() { | |
String line = metric + " " + timestamp + " " + value; | |
if (tags.size() > 0) { | |
List<String> kvs = new LinkedList<>(); | |
for (Map.Entry<String, String> tag : tags.entrySet()) { | |
kvs.add(tag.getKey() + "=" + tag.getValue()); | |
} | |
return line + " " + StringUtils.join(kvs, " "); | |
} | |
return line; | |
} | |
} | |
public static class DataPointPartitionMapper implements FlatMapFunction<Iterator<Tuple2<ImmutableBytesWritable, Result>>, Iterable<DataPoint>> { | |
private Map<String, String> conf; | |
private TSDB tsdb; | |
public DataPointPartitionMapper(Map<String, String> conf) { | |
this.conf = conf; | |
} | |
protected TSDB getTSDB() throws IOException { | |
if (this.tsdb == null) { | |
Config config = new Config(false); | |
for (Map.Entry<String, String> entry : this.conf.entrySet()) { | |
config.overrideConfig(entry.getKey(), entry.getValue()); | |
} | |
this.tsdb = new TSDB(config); | |
} | |
return this.tsdb; | |
} | |
protected void shutdown() { | |
if (this.tsdb != null) { | |
try { | |
this.tsdb.shutdown().joinUninterruptibly(); | |
} catch (Exception e) { | |
throw new RuntimeException("Cannot shutdown OpenTSDB instance", e); | |
} | |
this.tsdb = null; | |
} | |
} | |
@Override | |
public Iterable<Iterable<DataPoint>> call(Iterator<Tuple2<ImmutableBytesWritable, Result>> rows) throws Exception { | |
return new Iterable<Iterable<DataPoint>>() { | |
@Override | |
public Iterator<Iterable<DataPoint>> iterator() { | |
Iterator<Iterable<DataPoint>> i = Iterators.transform(rows, (Tuple2<ImmutableBytesWritable, Result> row) -> process(row)); | |
return new Iterator<Iterable<DataPoint>>() { | |
@Override | |
public boolean hasNext() { | |
if (!i.hasNext()) { | |
shutdown(); | |
return false; | |
} | |
return i.hasNext(); | |
} | |
@Override | |
public Iterable<DataPoint> next() { | |
return i.next(); | |
} | |
}; | |
} | |
}; | |
} | |
public Iterable<DataPoint> process(Tuple2<ImmutableBytesWritable, Result> row) { | |
TSDB tsdb; | |
try { | |
tsdb = getTSDB(); | |
} catch (IOException e) { | |
throw new RuntimeException("Cannot get OpenTSDB instance", e); | |
} | |
byte[] key = row._1().get(); | |
String metric = Internal.metricName(tsdb, key); | |
long baseTime = Internal.baseTime(tsdb, key); | |
Map<String, String> tags = Internal.getTags(tsdb, key); | |
List<DataPoint> dps = new LinkedList<>(); | |
for (Cell cell : row._2().rawCells()) { | |
byte[] family = Arrays.copyOfRange(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyOffset() + cell.getFamilyLength()); | |
byte[] qualifier = Arrays.copyOfRange(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierOffset() + cell.getQualifierLength()); | |
byte[] value = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength()); | |
KeyValue kv = new KeyValue(key, family, qualifier, cell.getTimestamp(), value); | |
if (qualifier.length == 2 || qualifier.length == 4 && Internal.inMilliseconds(qualifier)) { | |
Internal.Cell c = Internal.parseSingleValue(kv); | |
if (c == null) { | |
throw new IllegalDataException("Unable to parse row: " + kv); | |
} | |
dps.add(parseCell(metric, tags, c, baseTime)); | |
} else { | |
// compacted column | |
ArrayList<Internal.Cell> cells; | |
try { | |
cells = Internal.extractDataPoints(kv); | |
} catch (IllegalDataException e) { | |
throw new IllegalDataException(Bytes.toStringBinary(key), e); | |
} | |
for (Internal.Cell c : cells) { | |
dps.add(parseCell(metric, tags, c, baseTime)); | |
} | |
} | |
} | |
return dps; | |
} | |
protected DataPoint parseCell(String metric, Map<String, String> tags, Internal.Cell cell, long baseTime) { | |
return new DataPoint(metric, tags, cell.absoluteTimestamp(baseTime), cell.parseValue()); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks, this was super helpful to start with processing OpenTSDB data via Spark. I did a lot of research when finding the right approach, you should post it as an article online ;-)