Skip to content

Instantly share code, notes, and snippets.

@nicmarti
Last active December 22, 2015 17:49
Show Gist options
  • Save nicmarti/6509140 to your computer and use it in GitHub Desktop.
Save nicmarti/6509140 to your computer and use it in GitHub Desktop.
FluentLogbackAppender Java, that is able to track the URI if you used MDC.put("URL","/page/toto") in your code. Used for zaptravel. I adapted one of the fluent logback appender.
package org.zaptravel.logback.fluentd;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class DaemonAppender<E> implements Runnable {
private static final ExecutorService THREAD_POOL = Executors.newCachedThreadPool();
private static final Logger LOG = LoggerFactory.getLogger(DaemonAppender.class);
private AtomicBoolean start = new AtomicBoolean(false);
private final BlockingQueue<E> queue;
DaemonAppender(int maxQueueSize) {
this.queue = new LinkedBlockingQueue<E>(maxQueueSize);
}
protected void execute() {
THREAD_POOL.execute(this);
}
void log(E eventObject) {
if (!queue.offer(eventObject)) {
LOG.warn("DaemonAppender : Message queue is full. Ignore the message.");
} else if (start.compareAndSet(false, true)) {
execute();
}
}
@Override
public void run() {
try {
for (;;) {
append(queue.take());
}
} catch (InterruptedException e) {
// ignore the error and rerun.
run();
} catch (Exception e) {
close();
}
}
abstract protected void append(E rawData);
protected void close() {
synchronized (THREAD_POOL) {
if (!THREAD_POOL.isShutdown()) {
shutdownAndAwaitTermination(THREAD_POOL);
}
}
}
private static void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(5, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(5, TimeUnit.SECONDS))
System.err.println("DaemonAppender: Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
}
package org.zaptravel.logback.fluentd;
import java.util.HashMap;
import java.util.Map;
import ch.qos.logback.classic.spi.LoggingEvent;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
import org.fluentd.logger.FluentLogger;
import ch.qos.logback.core.UnsynchronizedAppenderBase;
import org.slf4j.MDC;
public class FluentLogbackAppender<E> extends UnsynchronizedAppenderBase<E> {
private static final class FluentDaemonAppender<E> extends DaemonAppender<E> {
private FluentLogger fluentLogger;
private final String tag;
private final String label;
private final String remoteHost;
private final int port;
FluentDaemonAppender(String tag, String label, String remoteHost, int port, int maxQueueSize) {
super(maxQueueSize);
this.tag = tag;
this.label = label;
this.remoteHost = remoteHost;
this.port = port;
}
@Override
public void execute() {
this.fluentLogger = FluentLogger.getLogger(tag, remoteHost, port);
super.execute();
}
@Override
protected void close() {
try {
super.close();
} finally {
if (fluentLogger != null) {
fluentLogger.close();
}
}
}
@Override
protected void append(E rawData) {
String cleanMsg = StringEscapeUtils.escapeEcmaScript(rawData.toString());
if (cleanMsg != null && cleanMsg.length() > MSG_SIZE_LIMIT) {
cleanMsg = StringUtils.abbreviate(cleanMsg, MSG_SIZE_LIMIT);
}
Map<String, Object> data = new HashMap<String, Object>();
if (rawData instanceof LoggingEvent) {
LoggingEvent ev = (LoggingEvent) rawData;
data.put("level", ev.getLevel());
data.put("logger", ev.getLoggerName());
data.put("thread", ev.getThreadName());
data.put("msg", cleanMsg);
String uri = MDC.get("URL");
if (uri != null) {
data.put("uri", uri);
} else {
Map mdcMap = ev.getMDCPropertyMap();
if (mdcMap != null && mdcMap.containsKey("URL")) {
data.put("uri", mdcMap.get("URL"));
}else{
data.put("uri","unknown");
}
}
} else {
data.put("rawmsg", cleanMsg);
}
fluentLogger.log(label, data);
}
}
@Override
public void start() {
super.start();
appender = new FluentDaemonAppender<E>(tagprefix, clientid, fluentdHost, port, maxQueueSize);
}
@Override
protected void append(E eventObject) {
appender.log(eventObject);
}
@Override
public void stop() {
try {
super.stop();
} finally {
appender.close();
}
}
public String getTagprefix() {
return tagprefix;
}
public void setTagprefix(String tagprefix) {
this.tagprefix = tagprefix;
}
public String getClientid() {
return clientid;
}
public void setClientid(String clientid) {
this.clientid = clientid;
}
public int getMaxQueueSize() {
return maxQueueSize;
}
public void setMaxQueueSize(int maxQueueSize) {
this.maxQueueSize = maxQueueSize;
}
public String getFluentdHost() {
return fluentdHost;
}
public void setFluentdHost(String fluentdHost) {
this.fluentdHost = fluentdHost;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
private DaemonAppender<E> appender;
private int maxQueueSize;
private String tagprefix;
private String clientid;
private String fluentdHost;
private int port;
private static final int MSG_SIZE_LIMIT = 65535;
}
<!-- Sample Play2 Logger configuration file, but this file in /conf -->
<configuration info="true" scan="false" scanPeriod="30 seconds">
<conversionRule conversionWord="coloredLevel" converterClass="play.api.Logger$ColoredLevel"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%coloredLevel %d{HH:mm:ss,SSS} %X{URL} %logger: %message%n%ex{10}</pattern>
</encoder>
</appender>
<appender name="FLUENTD" class="org.zaptravel.logback.fluentd.FluentLogbackAppender">
<tagprefix>amazon.rds</tagprefix>
<clientid>Prod</clientid>
<port>24224</port>
<fluentdHost>127.0.0.1</fluentdHost>
<maxQueueSize>1024</maxQueueSize>
</appender>
<logger name="play" level="WARN"/>
<logger name="akka" level="WARN"/>
<logger name="akka.event.slf4j.Slf4jEventHandler" level="WARN"/>
<logger name="com.typesafe.config" level="WARN"/>
<logger name="application-akka" level="WARN"/>
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="FLUENTD"/>
</root>
</configuration>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment