Skip to content

Instantly share code, notes, and snippets.

@joyoyoyoyoyo
Forked from ambud/KVMetricsSource.java
Created September 17, 2019 11:32
Show Gist options
  • Select an option

  • Save joyoyoyoyoyo/1adb370f56ba62c167c4f1de3f7d6291 to your computer and use it in GitHub Desktop.

Select an option

Save joyoyoyoyoyo/1adb370f56ba62c167c4f1de3f7d6291 to your computer and use it in GitHub Desktop.
Spark Custom Metrics Source
/**
* Copyright 2017 Ambud Sharma
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ambud.sparkmetrics;
import java.io.Serializable;
import org.apache.spark.metrics.source.Source;
import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
public class KVMetricsSource implements Source, Serializable {
public static final String KV_METRICS_SOURCE = "hbasesource";
private static final long serialVersionUID = 1L;
private transient MetricRegistry registry;
private transient Counter putCounter;
private transient Counter getCounter;
private transient Counter deleteCounter;
private transient Timer putLatency;
private transient Timer getLatency;
private transient Timer deleteLatency;
public KVMetricsSource() {
registry = new MetricRegistry();
initialize();
}
private void initialize() {
putCounter = new Counter();
getCounter = new Counter();
deleteCounter = new Counter();
putLatency = new Timer();
getLatency = new Timer();
deleteLatency = new Timer();
registry.register("puts", putCounter);
registry.register("gets", getCounter);
registry.register("deletes", deleteCounter);
registry.register("putLatency", putLatency);
registry.register("getLatency", getLatency);
registry.register("deleteLatency", deleteLatency);
}
@Override
public MetricRegistry metricRegistry() {
return registry;
}
@Override
public String sourceName() {
return KV_METRICS_SOURCE;
}
public MetricRegistry getRegistry() {
return registry;
}
public Counter getPutCounter() {
return putCounter;
}
public Counter getGetCounter() {
return getCounter;
}
public Counter getDeleteCounter() {
return deleteCounter;
}
public Timer getPutLatency() {
return putLatency;
}
public Timer getGetLatency() {
return getLatency;
}
public Timer getDeleteLatency() {
return deleteLatency;
}
}
/**
* Copyright 2017 Ambud Sharma
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ambud.sparkmetrics;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkEnv;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.metrics.MetricsSystem;
import org.apache.spark.metrics.source.Source;
import scala.collection.JavaConversions;
import scala.collection.Seq;
/**
*
*/
public class SparkMetrics {
public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext("local[*]", "hbase");
MetricsSystem system = SparkEnv.get().metricsSystem();
KVMetricsSource source = new KVMetricsSource();
system.registerSource(source);
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("sdasd", "asdasd", "asdsad", "asdsadasdasd"));
rdd.foreachPartition(p -> {
Seq<Source> sourcesByName = SparkEnv.get().metricsSystem().getSourcesByName(KVMetricsSource.KV_METRICS_SOURCE);
List<Source> sources = JavaConversions.seqAsJavaList(sourcesByName);
p.forEachRemaining(v -> {
KVMetricsSource src = (KVMetricsSource) sources.get(0);
src.getPutCounter().inc();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
});
sc.close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment