Created
December 6, 2010 22:35
-
-
Save pierre/731112 to your computer and use it in GitHub Desktop.
Handle gracefully idle connections with Scribe in the eventtracker library
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/src/main/java/com/ning/metrics/eventtracker/ScribeSender.java b/src/main/java/com/ning/metrics/eventtracker/ScribeSender.java | |
index e8ffdda..f9b707b 100644 | |
--- a/src/main/java/com/ning/metrics/eventtracker/ScribeSender.java | |
+++ b/src/main/java/com/ning/metrics/eventtracker/ScribeSender.java | |
@@ -24,42 +24,65 @@ import org.apache.commons.logging.LogFactory; | |
import org.apache.thrift.transport.TTransportException; | |
import scribe.thrift.LogEntry; | |
import scribe.thrift.ResultCode; | |
import java.io.ByteArrayOutputStream; | |
import java.io.IOException; | |
import java.io.ObjectOutputStream; | |
import java.nio.charset.Charset; | |
import java.util.ArrayList; | |
import java.util.List; | |
+import java.util.concurrent.Executors; | |
+import java.util.concurrent.ScheduledExecutorService; | |
+import java.util.concurrent.ScheduledThreadPoolExecutor; | |
+import java.util.concurrent.TimeUnit; | |
+import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicInteger; | |
/** | |
* ScribeSender | |
* <p/> | |
* The class needs to be public for JMX. | |
*/ | |
public class ScribeSender implements EventSender | |
{ | |
private static final Log log = LogFactory.getLog(ScribeSender.class); | |
private final AtomicInteger connectionRetries = new AtomicInteger(0); | |
private ScribeClient scribeClient; | |
private final AtomicInteger messagesSuccessfullySent = new AtomicInteger(0); | |
private final AtomicInteger messagesSuccessfullySentSinceLastReconnection = new AtomicInteger(0); | |
private int messagesToSendBeforeReconnecting = 0; | |
+ private final AtomicBoolean sleeping = new AtomicBoolean(false); | |
+ | |
public ScribeSender(ScribeClient scribeClient, int messagesToSendBeforeReconnecting) | |
{ | |
this.scribeClient = scribeClient; | |
this.messagesToSendBeforeReconnecting = messagesToSendBeforeReconnecting; | |
+ | |
+ // Setup a watchdog for the Scribe connection. We don't want it to keep it open forever. For instance, SLB VIP | |
+ // may trigger a RST if idle more than a few minutes. | |
+ final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, Executors.defaultThreadFactory()); | |
+ executor.scheduleAtFixedRate(new Runnable() | |
+ { | |
+ @Override | |
+ public void run() | |
+ { | |
+ if (sleeping.get()) { | |
+ log.info("Idle connection to Scribe, re-opening it"); | |
+ createConnection(); | |
+ } | |
+ sleeping.set(true); | |
+ } | |
+ }, 0, 3, TimeUnit.MINUTES); | |
} | |
/** | |
* Re-initialize the connection with the Scribe endpoint. | |
*/ | |
public synchronized void createConnection() | |
{ | |
if (scribeClient != null) { | |
try { | |
connectionRetries.incrementAndGet(); | |
@@ -89,20 +112,23 @@ public class ScribeSender implements EventSender | |
} | |
@Override | |
public boolean send(Event event) throws IOException | |
{ | |
if (scribeClient == null) { | |
log.warn("Scribe client has not been set up correctly."); | |
return false; | |
} | |
+ // Tell the watchdog that we are doing something | |
+ sleeping.set(false); | |
+ | |
ResultCode res; | |
// TODO: offer batch API?, see drainEvents | |
List<LogEntry> list = new ArrayList<LogEntry>(1); | |
// TODO: update Scribe to pass a Thrift directly instead of serializing it | |
// Has the sender specified how to send the data? | |
byte[] payload = event.getSerializedEvent(); | |
// Nope, default to ObjectOutputStream | |
if (payload == null) { |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment