Created
November 30, 2012 08:20
-
-
Save kbkaran/4174494 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.kiru.flume.sink.splunk; | |
import java.io.IOException; | |
import org.apache.flume.Channel; | |
import org.apache.flume.Context; | |
import org.apache.flume.Event; | |
import org.apache.flume.EventDeliveryException; | |
import org.apache.flume.Transaction; | |
import org.apache.flume.conf.Configurable; | |
import org.apache.flume.sink.AbstractSink; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.apache.flume.event.EventHelper; | |
import com.splunk.Service; | |
import com.splunk.Index; | |
import java.io.OutputStreamWriter; | |
import java.net.Socket; | |
import java.util.Calendar; | |
import java.util.HashMap; | |
import java.text.SimpleDateFormat; | |
/** | |
* | |
*/ | |
public class SplunkSink extends AbstractSink implements Configurable { | |
private static final String INDEX = "flumeindex"; | |
private static final Logger logger = LoggerFactory | |
.getLogger(SplunkSink.class); | |
static SimpleDateFormat dateFormat = | |
new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ"); | |
static Calendar cal = Calendar.getInstance(); | |
private Socket stream; | |
private OutputStreamWriter splunkStream; | |
private Index index; | |
public SplunkSink() { | |
} | |
@Override | |
public void start() { | |
super.start(); | |
HashMap<String, Object> arg0 = new HashMap<String, Object>(); | |
arg0.put("index", INDEX); | |
arg0.put("itype", "submit"); | |
arg0.put("host", "localhost"); | |
arg0.put("username", "admin"); | |
arg0.put("password", "changeme"); | |
Service service = Service.connect(arg0); | |
this.index = service.getIndexes().get(INDEX); | |
try { | |
this.stream = index.attach(); | |
splunkStream = new OutputStreamWriter(stream.getOutputStream(), | |
"UTF8"); | |
} catch (IOException ioe) { | |
throw new RuntimeException("Unable to open a connection to Splunk"); | |
} | |
} | |
@Override | |
public void stop() { | |
try { | |
splunkStream.flush(); | |
splunkStream.close(); | |
} catch (IOException ioe) { | |
System.out.println("Problem closing Splunk streams"); | |
} | |
} | |
@SuppressWarnings("unchecked") | |
@Override | |
public void configure(Context context) { | |
} | |
@Override | |
public Status process() throws EventDeliveryException { | |
Status result = Status.READY; | |
Channel channel = getChannel(); | |
Transaction transaction = channel.getTransaction(); | |
Event event = null; | |
try { | |
transaction.begin(); | |
event = channel.take(); | |
if (event != null) { | |
String eventstr = EventHelper.dumpEvent(event); | |
System.out.println("SplunkSink: " + eventstr); | |
index.submit(String.format("%s %s", dateFormat.format(cal.getTime()), eventstr)) ; | |
} else { | |
// No event found, request back-off semantics from the sink | |
// runner | |
result = Status.BACKOFF; | |
} | |
transaction.commit(); | |
} catch (Exception ex) { | |
transaction.rollback(); | |
throw new EventDeliveryException("Failed to log event: " + event, | |
ex); | |
} finally { | |
transaction.close(); | |
} | |
return result; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment