Created
December 13, 2016 09:10
-
-
Save PierreZ/3388aa757fbdffcfa1335768daa686f6 to your computer and use it in GitHub Desktop.
prometheus Psuhgateway
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.warp10.plugins.prometheus; | |
import io.warp10.continuum.Configuration; | |
import io.warp10.continuum.TimeSource; | |
import io.warp10.continuum.store.Constants; | |
import java.io.BufferedReader; | |
import java.io.IOException; | |
import java.io.OutputStream; | |
import java.io.PrintWriter; | |
import java.net.HttpURLConnection; | |
import java.net.URL; | |
import javax.servlet.ServletException; | |
import javax.servlet.http.HttpServletRequest; | |
import javax.servlet.http.HttpServletResponse; | |
import org.apache.commons.codec.binary.Base64; | |
import org.eclipse.jetty.server.Request; | |
import org.eclipse.jetty.server.handler.AbstractHandler; | |
public class PrometheusPushGatewayHandler extends AbstractHandler { | |
private final URL url; | |
private final String token; | |
public PrometheusPushGatewayHandler(URL url, String token) { | |
this.url = url; | |
this.token = token; | |
} | |
@Override | |
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { | |
baseRequest.setHandled(true); | |
if (!request.getMethod().equals("POST")){ | |
throw new IOException("Wrong HTTP method used for pushGateway"); | |
} | |
String path = request.getRequestURI(); | |
if (!path.contains("/metrics/job/")){ | |
throw new IOException("Wrong path used for pushGateway"); | |
} | |
// Path for PushGateway is /metrics/job/<JOBNAME>{/<LABEL_NAME>/<LABEL_VALUE>} | |
path = path.replace("/metrics/", ""); | |
String[] queryLabels = path.split("/"); | |
if (queryLabels.length %2 != 0){ | |
throw new IOException("odd number of parameter used for PushGateway"); | |
} | |
String labels = null; | |
for (int i = 0; i < queryLabels.length; i = i + 2) { | |
labels = labels + queryLabels[i] + "=" + queryLabels[i+1] + ","; | |
} | |
labels = labels.substring(0,labels.length()-1); | |
labels = labels.replaceAll("\"",""); | |
String token = this.token; | |
String basicAuth = request.getHeader("Authorization"); | |
if (null != basicAuth) { | |
String[] userpass = new String(Base64.decodeBase64(basicAuth.substring(6))).split(":"); | |
token = userpass[1]; | |
} | |
if (null == token) { | |
throw new IOException("Missing token, basicAuth and no default token."); | |
} | |
HttpURLConnection conn = null; | |
try { | |
conn = (HttpURLConnection) url.openConnection(); | |
conn.setDoOutput(true); | |
conn.setDoInput(true); | |
conn.setRequestMethod("POST"); | |
conn.setRequestProperty(Constants.getHeader(Configuration.HTTP_HEADER_UPDATE_TOKENX), token); | |
//conn.setRequestProperty("Content-Type", "application/gzip"); | |
conn.setChunkedStreamingMode(16384); | |
conn.connect(); | |
OutputStream os = conn.getOutputStream(); | |
//GZIPOutputStream out = new GZIPOutputStream(os); | |
PrintWriter pw = new PrintWriter(os); | |
BufferedReader br = request.getReader(); | |
while(true) { | |
String line = br.readLine(); | |
if (null == line) { | |
break; | |
} | |
parse(pw,line,labels); | |
} | |
br.close(); | |
pw.flush(); | |
if (HttpServletResponse.SC_OK != conn.getResponseCode()) { | |
throw new IOException(conn.getResponseMessage()); | |
} | |
} finally { | |
if (null != conn) { | |
conn.disconnect(); | |
} | |
} | |
} | |
private static void parse(PrintWriter pw, String line,String globalLabels) throws IOException { | |
// | |
// Replace escape sequences | |
// | |
//if (line.indexOf('\\') >= 0) { | |
// line = line.replaceAll("%", "%25").replaceAll("\\=", "%3D").replaceAll("\\,", "%2C").replaceAll("\\ ", "%20").replaceAll("\\\"", "%22").replaceAll("\\{", "%7B").replaceAll("\\}", "%7D"); | |
//} | |
if (line.startsWith("#")){ | |
return; | |
} | |
// Example of Input format: | |
// some_metric{label="val2"} 34 1398355504000 | |
// or another_metric 2398.283 | |
String[] tokens = line.split(" "); | |
String classname = tokens[0]; | |
StringBuilder labels = new StringBuilder(); | |
labels.append(globalLabels); | |
if (classname.contains("{")) { | |
int bracket = classname.indexOf('{'); | |
classname = classname.substring(0, bracket); | |
labels.append(","); | |
labels.append(classname.substring(bracket + 1, classname.length())); | |
} | |
String val = tokens[1]; | |
// Prometheus is using a millisecond-precision timestamp | |
long timestamp = tokens.length > 2 ? Long.parseLong(tokens[2]) * 1000 : TimeSource.getNanoTime(); | |
StringBuilder sb = new StringBuilder(); | |
sb.setLength(0); | |
sb.append(timestamp); | |
sb.append("// "); | |
sb.append(classname); | |
sb.append("{"); | |
sb.append(labels.toString()); | |
sb.append("}"); | |
sb.append(" "); | |
sb.append(val); | |
pw.println(sb.toString()); | |
System.out.println("YOLO: " + sb.toString()); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.warp10.plugins.prometheus; | |
import io.warp10.continuum.JettyUtil; | |
import io.warp10.continuum.egress.CORSHandler; | |
import io.warp10.warp.sdk.AbstractWarp10Plugin; | |
import java.net.URL; | |
import java.util.Properties; | |
import java.util.concurrent.BlockingQueue; | |
import org.eclipse.jetty.server.Connector; | |
import org.eclipse.jetty.server.Handler; | |
import org.eclipse.jetty.server.Server; | |
import org.eclipse.jetty.server.ServerConnector; | |
import org.eclipse.jetty.server.handler.HandlerList; | |
import org.eclipse.jetty.util.BlockingArrayQueue; | |
import org.eclipse.jetty.util.thread.QueuedThreadPool; | |
public class PrometheusPushGatewayWarp10Plugin extends AbstractWarp10Plugin implements Runnable { | |
private static final String CONF_PROMETHEUS_PORT = "prometheus.port"; | |
private static final String CONF_PROMETHEUS_HOST = "prometheus.host"; | |
private static final String CONF_PROMETHEUS_IDLE_TIMEOUT = "prometheus.idle.timeout"; | |
private static final String CONF_PROMETHEUS_JETTY_THREADPOOL = "prometheus.jetty.threadpool"; | |
private static final String CONF_PROMETHEUS_JETTY_MAXQUEUESIZE = "prometheus.jetty.maxqueuesize"; | |
private static final String CONF_PROMETHEUS_ACCEPTORS = "prometheus.acceptors"; | |
private static final String CONF_PROMETHEUS_SELECTORS = "prometheus.selectors"; | |
private static final String CONF_PROMETHEUS_WARP10_ENDPOINT = "prometheus.warp10.endpoint"; | |
private static final String CONF_PROMETHEUS_DEFAULT_TOKEN = "prometheus.default.token"; | |
private int port; | |
private String host; | |
private int idleTimeout; | |
private int maxThreads; | |
private int acceptors; | |
private int selectors; | |
private URL url; | |
private String token; | |
private BlockingQueue<Runnable> queue; | |
@Override | |
public void run() { | |
Server server = new Server(new QueuedThreadPool(maxThreads, 8, (int) idleTimeout, queue)); | |
ServerConnector connector = new ServerConnector(server, acceptors, selectors); | |
connector.setIdleTimeout(idleTimeout); | |
connector.setPort(port); | |
connector.setHost(host); | |
connector.setName("Continuum Ingress"); | |
server.setConnectors(new Connector[] { connector }); | |
HandlerList handlers = new HandlerList(); | |
Handler cors = new CORSHandler(); | |
handlers.addHandler(cors); | |
handlers.addHandler(new PrometheusPushGatewayHandler(url, token)); | |
server.setHandler(handlers); | |
JettyUtil.setSendServerVersion(server, false); | |
try { | |
server.start(); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
@Override | |
public void init(Properties properties) { | |
this.acceptors = Integer.parseInt(properties.getProperty(CONF_PROMETHEUS_ACCEPTORS, "4")); | |
this.selectors = Integer.parseInt(properties.getProperty(CONF_PROMETHEUS_SELECTORS, "2")); | |
this.maxThreads = Integer.parseInt(properties.getProperty(CONF_PROMETHEUS_JETTY_THREADPOOL, "8")); | |
this.idleTimeout = Integer.parseInt(properties.getProperty(CONF_PROMETHEUS_IDLE_TIMEOUT, "30000")); | |
this.port = Integer.parseInt(properties.getProperty(CONF_PROMETHEUS_PORT, "9091")); | |
this.host = properties.getProperty(CONF_PROMETHEUS_HOST, "127.0.0.1"); | |
this.token = properties.getProperty(CONF_PROMETHEUS_DEFAULT_TOKEN); | |
try { | |
this.url = new URL(properties.getProperty(CONF_PROMETHEUS_WARP10_ENDPOINT)); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
if (properties.containsKey(CONF_PROMETHEUS_JETTY_MAXQUEUESIZE)) { | |
int queuesize = Integer.parseInt(properties.getProperty(CONF_PROMETHEUS_JETTY_MAXQUEUESIZE)); | |
queue = new BlockingArrayQueue<Runnable>(queuesize); | |
} | |
Thread t = new Thread(this); | |
t.setDaemon(true); | |
t.setName("[PrometheusPushGatewayWarp10Plugin " + host + ":" + port + "]"); | |
t.start(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment