Skip to content

Instantly share code, notes, and snippets.

@PierreZ
Created December 13, 2016 09:10
Show Gist options
  • Save PierreZ/3388aa757fbdffcfa1335768daa686f6 to your computer and use it in GitHub Desktop.
Save PierreZ/3388aa757fbdffcfa1335768daa686f6 to your computer and use it in GitHub Desktop.
prometheus Psuhgateway
package io.warp10.plugins.prometheus;
import io.warp10.continuum.Configuration;
import io.warp10.continuum.TimeSource;
import io.warp10.continuum.store.Constants;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.HttpURLConnection;
import java.net.URL;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.codec.binary.Base64;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
public class PrometheusPushGatewayHandler extends AbstractHandler {
private final URL url;
private final String token;
public PrometheusPushGatewayHandler(URL url, String token) {
this.url = url;
this.token = token;
}
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
baseRequest.setHandled(true);
if (!request.getMethod().equals("POST")){
throw new IOException("Wrong HTTP method used for pushGateway");
}
String path = request.getRequestURI();
if (!path.contains("/metrics/job/")){
throw new IOException("Wrong path used for pushGateway");
}
// Path for PushGateway is /metrics/job/<JOBNAME>{/<LABEL_NAME>/<LABEL_VALUE>}
path = path.replace("/metrics/", "");
String[] queryLabels = path.split("/");
if (queryLabels.length %2 != 0){
throw new IOException("odd number of parameter used for PushGateway");
}
String labels = null;
for (int i = 0; i < queryLabels.length; i = i + 2) {
labels = labels + queryLabels[i] + "=" + queryLabels[i+1] + ",";
}
labels = labels.substring(0,labels.length()-1);
labels = labels.replaceAll("\"","");
String token = this.token;
String basicAuth = request.getHeader("Authorization");
if (null != basicAuth) {
String[] userpass = new String(Base64.decodeBase64(basicAuth.substring(6))).split(":");
token = userpass[1];
}
if (null == token) {
throw new IOException("Missing token, basicAuth and no default token.");
}
HttpURLConnection conn = null;
try {
conn = (HttpURLConnection) url.openConnection();
conn.setDoOutput(true);
conn.setDoInput(true);
conn.setRequestMethod("POST");
conn.setRequestProperty(Constants.getHeader(Configuration.HTTP_HEADER_UPDATE_TOKENX), token);
//conn.setRequestProperty("Content-Type", "application/gzip");
conn.setChunkedStreamingMode(16384);
conn.connect();
OutputStream os = conn.getOutputStream();
//GZIPOutputStream out = new GZIPOutputStream(os);
PrintWriter pw = new PrintWriter(os);
BufferedReader br = request.getReader();
while(true) {
String line = br.readLine();
if (null == line) {
break;
}
parse(pw,line,labels);
}
br.close();
pw.flush();
if (HttpServletResponse.SC_OK != conn.getResponseCode()) {
throw new IOException(conn.getResponseMessage());
}
} finally {
if (null != conn) {
conn.disconnect();
}
}
}
private static void parse(PrintWriter pw, String line,String globalLabels) throws IOException {
//
// Replace escape sequences
//
//if (line.indexOf('\\') >= 0) {
// line = line.replaceAll("%", "%25").replaceAll("\\=", "%3D").replaceAll("\\,", "%2C").replaceAll("\\ ", "%20").replaceAll("\\\"", "%22").replaceAll("\\{", "%7B").replaceAll("\\}", "%7D");
//}
if (line.startsWith("#")){
return;
}
// Example of Input format:
// some_metric{label="val2"} 34 1398355504000
// or another_metric 2398.283
String[] tokens = line.split(" ");
String classname = tokens[0];
StringBuilder labels = new StringBuilder();
labels.append(globalLabels);
if (classname.contains("{")) {
int bracket = classname.indexOf('{');
classname = classname.substring(0, bracket);
labels.append(",");
labels.append(classname.substring(bracket + 1, classname.length()));
}
String val = tokens[1];
// Prometheus is using a millisecond-precision timestamp
long timestamp = tokens.length > 2 ? Long.parseLong(tokens[2]) * 1000 : TimeSource.getNanoTime();
StringBuilder sb = new StringBuilder();
sb.setLength(0);
sb.append(timestamp);
sb.append("// ");
sb.append(classname);
sb.append("{");
sb.append(labels.toString());
sb.append("}");
sb.append(" ");
sb.append(val);
pw.println(sb.toString());
System.out.println("YOLO: " + sb.toString());
}
}
package io.warp10.plugins.prometheus;
import io.warp10.continuum.JettyUtil;
import io.warp10.continuum.egress.CORSHandler;
import io.warp10.warp.sdk.AbstractWarp10Plugin;
import java.net.URL;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
public class PrometheusPushGatewayWarp10Plugin extends AbstractWarp10Plugin implements Runnable {
private static final String CONF_PROMETHEUS_PORT = "prometheus.port";
private static final String CONF_PROMETHEUS_HOST = "prometheus.host";
private static final String CONF_PROMETHEUS_IDLE_TIMEOUT = "prometheus.idle.timeout";
private static final String CONF_PROMETHEUS_JETTY_THREADPOOL = "prometheus.jetty.threadpool";
private static final String CONF_PROMETHEUS_JETTY_MAXQUEUESIZE = "prometheus.jetty.maxqueuesize";
private static final String CONF_PROMETHEUS_ACCEPTORS = "prometheus.acceptors";
private static final String CONF_PROMETHEUS_SELECTORS = "prometheus.selectors";
private static final String CONF_PROMETHEUS_WARP10_ENDPOINT = "prometheus.warp10.endpoint";
private static final String CONF_PROMETHEUS_DEFAULT_TOKEN = "prometheus.default.token";
private int port;
private String host;
private int idleTimeout;
private int maxThreads;
private int acceptors;
private int selectors;
private URL url;
private String token;
private BlockingQueue<Runnable> queue;
@Override
public void run() {
Server server = new Server(new QueuedThreadPool(maxThreads, 8, (int) idleTimeout, queue));
ServerConnector connector = new ServerConnector(server, acceptors, selectors);
connector.setIdleTimeout(idleTimeout);
connector.setPort(port);
connector.setHost(host);
connector.setName("Continuum Ingress");
server.setConnectors(new Connector[] { connector });
HandlerList handlers = new HandlerList();
Handler cors = new CORSHandler();
handlers.addHandler(cors);
handlers.addHandler(new PrometheusPushGatewayHandler(url, token));
server.setHandler(handlers);
JettyUtil.setSendServerVersion(server, false);
try {
server.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void init(Properties properties) {
this.acceptors = Integer.parseInt(properties.getProperty(CONF_PROMETHEUS_ACCEPTORS, "4"));
this.selectors = Integer.parseInt(properties.getProperty(CONF_PROMETHEUS_SELECTORS, "2"));
this.maxThreads = Integer.parseInt(properties.getProperty(CONF_PROMETHEUS_JETTY_THREADPOOL, "8"));
this.idleTimeout = Integer.parseInt(properties.getProperty(CONF_PROMETHEUS_IDLE_TIMEOUT, "30000"));
this.port = Integer.parseInt(properties.getProperty(CONF_PROMETHEUS_PORT, "9091"));
this.host = properties.getProperty(CONF_PROMETHEUS_HOST, "127.0.0.1");
this.token = properties.getProperty(CONF_PROMETHEUS_DEFAULT_TOKEN);
try {
this.url = new URL(properties.getProperty(CONF_PROMETHEUS_WARP10_ENDPOINT));
} catch (Exception e) {
throw new RuntimeException(e);
}
if (properties.containsKey(CONF_PROMETHEUS_JETTY_MAXQUEUESIZE)) {
int queuesize = Integer.parseInt(properties.getProperty(CONF_PROMETHEUS_JETTY_MAXQUEUESIZE));
queue = new BlockingArrayQueue<Runnable>(queuesize);
}
Thread t = new Thread(this);
t.setDaemon(true);
t.setName("[PrometheusPushGatewayWarp10Plugin " + host + ":" + port + "]");
t.start();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment