Created
April 7, 2016 07:10
-
-
Save ambud/162f8719bf77c9523abaddd3b5d6d1fc to your computer and use it in GitHub Desktop.
Apache Flume Syslog Multiline Patch (https://issues.apache.org/jira/browse/FLUME-1938)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java | |
index 5a73c88..4b870ef 100644 | |
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java | |
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogSourceConfigurationConstants.java | |
@@ -56,12 +56,14 @@ public final class SyslogSourceConfigurationConstants { | |
public static final String DEFAULT_CHARSET = "UTF-8"; | |
public static final String CONFIG_PORT_CHARSET_PREFIX = "charset.port."; | |
+ public static final String CONFIG_IS_SYSLOG_MULTILINE_BODY = "body.multiline"; | |
public static final int DEFAULT_BATCHSIZE = 100; | |
public static final String CONFIG_PORT_HEADER = "portHeader"; | |
public static final String DEFAULT_PORT_HEADER = "port"; | |
+ public static final boolean DEFAULT_IS_SYSLOG_MULTILINE_BODY = false; | |
public static final String CONFIG_READBUF_SIZE = "readBufferBytes"; | |
public static final int DEFAULT_READBUF_SIZE = 1024; | |
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java | |
index 96a9e85..29a9561 100644 | |
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java | |
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java | |
@@ -53,13 +53,14 @@ public class SyslogUDPSource extends AbstractSource | |
private String host = null; | |
private Channel nettyChannel; | |
private Map<String, String> formaterProp; | |
+ private boolean isSyslogMultiLineBody; | |
private static final Logger logger = LoggerFactory | |
.getLogger(SyslogUDPSource.class); | |
private CounterGroup counterGroup = new CounterGroup(); | |
public class syslogHandler extends SimpleChannelHandler { | |
- private SyslogUtils syslogUtils = new SyslogUtils(true); | |
+ private SyslogUtils syslogUtils = new SyslogUtils(true, isSyslogMultiLineBody); | |
public void setFormater(Map<String, String> prop) { | |
syslogUtils.addFormats(prop); | |
@@ -130,6 +131,7 @@ public class SyslogUDPSource extends AbstractSource | |
context, SyslogSourceConfigurationConstants.CONFIG_PORT); | |
port = context.getInteger(SyslogSourceConfigurationConstants.CONFIG_PORT); | |
host = context.getString(SyslogSourceConfigurationConstants.CONFIG_HOST); | |
+ isSyslogMultiLineBody = context.getBoolean(SyslogSourceConfigurationConstants.CONFIG_IS_SYSLOG_MULTILINE_BODY, SyslogSourceConfigurationConstants.DEFAULT_IS_SYSLOG_MULTILINE_BODY); | |
formaterProp = context.getSubProperties( | |
SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX); | |
} | |
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java | |
index c2a29a1..8849bd2 100644 | |
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java | |
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java | |
@@ -78,6 +78,7 @@ public class SyslogUtils { | |
private final boolean isUdp; | |
private boolean isBadEvent; | |
private boolean isIncompleteEvent; | |
+ private boolean isSyslogMultiLineBody; | |
private Integer maxSize; | |
private class SyslogFormatter { | |
@@ -98,13 +99,18 @@ public class SyslogUtils { | |
} | |
public SyslogUtils(boolean isUdp) { | |
- this(DEFAULT_SIZE, isUdp); | |
+ this(DEFAULT_SIZE, isUdp, SyslogSourceConfigurationConstants.DEFAULT_IS_SYSLOG_MULTILINE_BODY); | |
} | |
- public SyslogUtils(Integer eventSize, boolean isUdp){ | |
+ public SyslogUtils(boolean isUdp, boolean isMultilineBody) { | |
+ this(DEFAULT_SIZE, isUdp, isMultilineBody); | |
+ } | |
+ | |
+ public SyslogUtils(Integer eventSize, boolean isUdp, boolean isMultilineBody){ | |
this.isUdp = isUdp; | |
isBadEvent = false; | |
isIncompleteEvent = false; | |
+ isSyslogMultiLineBody = isMultilineBody; | |
maxSize = (eventSize < MIN_SIZE) ? MIN_SIZE : eventSize; | |
baos = new ByteArrayOutputStream(eventSize); | |
initHeaderFormats(); | |
@@ -156,7 +162,13 @@ public class SyslogUtils { | |
// setup RFC3164 formater | |
SyslogFormatter fmt2 = new SyslogFormatter(); | |
- fmt2.regexPattern = Pattern.compile(SYSLOG_MSG_RFC3164_0); | |
+ // Multi-line syslog body will cause regular expression extractor below to fail if | |
+ // Pattern.MULTILINE & Pattern.DOTALL are not activated while parsing the message. | |
+ // The code below will preserve header extraction logic when multi-line syslog is | |
+ // enabled. | |
+ if(!isSyslogMultiLineBody){fmt2.regexPattern = Pattern.compile(SYSLOG_MSG_RFC3164_0); | |
+ }else{fmt2.regexPattern = Pattern.compile(SYSLOG_MSG_RFC3164_0, Pattern.MULTILINE|Pattern.DOTALL); | |
+ } | |
// the single digit date has two spaces, so trim it | |
fmt2.searchPattern.add(" "); | |
fmt2.replacePattern.add(" "); | |
@@ -340,11 +352,17 @@ public class SyslogUtils { | |
break; | |
case DATA: | |
// TCP syslog entries are separated by '\n' | |
- if (b == '\n') { | |
- e = buildEvent(); | |
- doneReading = true; | |
- } else { | |
- baos.write(b); | |
+ // UDP syslog entries with multi-line body separated by '\n' or '\r' | |
+ // multi-line event processing handled by sink | |
+ if(!isSyslogMultiLineBody){ | |
+ if (b == '\n') { | |
+ e = buildEvent(); | |
+ doneReading = true; | |
+ } else { | |
+ baos.write(b); | |
+ } | |
+ }else{ | |
+ baos.write(b); | |
} | |
if(baos.size() == this.maxSize && !doneReading){ | |
isIncompleteEvent = true; | |
@@ -357,7 +375,12 @@ public class SyslogUtils { | |
} | |
// UDP doesn't send a newline, so just use what we received | |
- if (e == null && isUdp) { | |
+ if(e == null && !isSyslogMultiLineBody){ | |
+ if (isUdp) { | |
+ doneReading = true; | |
+ e = buildEvent(); | |
+ } | |
+ }else{ | |
doneReading = true; | |
e = buildEvent(); | |
} | |
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java | |
index 7208464..bf9d870 100644 | |
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java | |
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java | |
@@ -397,7 +397,7 @@ public class TestSyslogUtils { | |
@Test | |
public void testExtractBadEventLarge() { | |
String badData1 = "<10> bad bad data bad bad\n"; | |
- SyslogUtils util = new SyslogUtils(5, false); | |
+ SyslogUtils util = new SyslogUtils(5, false, false); | |
ChannelBuffer buff = ChannelBuffers.buffer(100); | |
buff.writeBytes(badData1.getBytes()); | |
Event e = util.extractEvent(buff); | |
@@ -425,4 +425,27 @@ public class TestSyslogUtils { | |
} | |
+ @Test | |
+ public void testMultiLineEvent() { | |
+ String body = "bad bad data bad bad\n bad bad bad\r bad bad"; | |
+ String badData1 = "<10> "+body; | |
+ SyslogUtils util = new SyslogUtils(false, true); | |
+ ChannelBuffer buff = ChannelBuffers.buffer(200); | |
+ buff.writeBytes(badData1.getBytes()); | |
+ Event e = util.extractEvent(buff); | |
+ if(e == null){ | |
+ throw new NullPointerException("Event is null"); | |
+ } | |
+ Map<String, String> headers = e.getHeaders(); | |
+ //System.out.print(new String(e.getBody()).trim()); | |
+ Assert.assertEquals("1", headers.get(SyslogUtils.SYSLOG_FACILITY)); | |
+ Assert.assertEquals("2", headers.get(SyslogUtils.SYSLOG_SEVERITY)); | |
+ //No bad event status is put due to event format variability | |
+ //Assert.assertEquals(SyslogUtils.SyslogStatus.OTHER.getSyslogStatus(), | |
+ // headers.get(SyslogUtils.EVENT_STATUS)); | |
+ Assert.assertEquals(body.trim(), new String(e.getBody()).trim()); | |
+ | |
+ //Addition of more unit tests for multi-line configuration is welcome | |
+ } | |
+ | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, | |
* software distributed under the License is distributed on an | |
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
* KIND, either express or implied. See the License for the | |
* specific language governing permissions and limitations | |
* under the License. | |
*/ | |
package org.apache.flume.source; | |
public final class SyslogSourceConfigurationConstants { | |
public static final String CONFIG_PORT = "port"; | |
/** | |
* List of ports to listen to. | |
*/ | |
public static final String CONFIG_PORTS = "ports"; | |
public static final String CONFIG_HOST = "host"; | |
public static final String CONFIG_FORMAT_PREFIX = "format."; | |
public static final String CONFIG_REGEX = "regex"; | |
public static final String CONFIG_SEARCH = "search"; | |
public static final String CONFIG_REPLACE = "replace"; | |
public static final String CONFIG_DATEFORMAT = "dateFormat"; | |
/** | |
* Number of processors used to calculate number of threads to spawn. | |
*/ | |
public static final String CONFIG_NUMPROCESSORS = "numProcessors"; | |
/** | |
* Maximum allowable size of events. | |
*/ | |
public static final String CONFIG_EVENTSIZE = "eventSize"; | |
public static final String CONFIG_BATCHSIZE = "batchSize"; | |
public static final String CONFIG_CHARSET = "charset.default"; | |
public static final String DEFAULT_CHARSET = "UTF-8"; | |
public static final String CONFIG_PORT_CHARSET_PREFIX = "charset.port."; | |
public static final String CONFIG_IS_SYSLOG_MULTILINE_BODY = "body.multiline"; | |
public static final int DEFAULT_BATCHSIZE = 100; | |
public static final String CONFIG_PORT_HEADER = "portHeader"; | |
public static final String DEFAULT_PORT_HEADER = "port"; | |
public static final boolean DEFAULT_IS_SYSLOG_MULTILINE_BODY = false; | |
public static final String CONFIG_READBUF_SIZE = "readBufferBytes"; | |
public static final int DEFAULT_READBUF_SIZE = 1024; | |
private SyslogSourceConfigurationConstants() { | |
// Disable explicit creation of objects. | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, | |
* software distributed under the License is distributed on an | |
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
* KIND, either express or implied. See the License for the | |
* specific language governing permissions and limitations | |
* under the License. | |
*/ | |
package org.apache.flume.source; | |
import java.net.InetSocketAddress; | |
import java.util.Map; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.TimeUnit; | |
import org.apache.flume.ChannelException; | |
import org.apache.flume.Context; | |
import org.apache.flume.CounterGroup; | |
import org.apache.flume.Event; | |
import org.apache.flume.EventDrivenSource; | |
import org.apache.flume.conf.Configurable; | |
import org.apache.flume.conf.Configurables; | |
import org.apache.flume.source.SyslogUtils; | |
import org.jboss.netty.bootstrap.ConnectionlessBootstrap; | |
import org.jboss.netty.buffer.ChannelBuffer; | |
import org.jboss.netty.channel.Channel; | |
import org.jboss.netty.channel.ChannelHandlerContext; | |
import org.jboss.netty.channel.ChannelPipeline; | |
import org.jboss.netty.channel.ChannelPipelineFactory; | |
import org.jboss.netty.channel.Channels; | |
import org.jboss.netty.channel.MessageEvent; | |
import org.jboss.netty.channel.SimpleChannelHandler; | |
import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
public class SyslogUDPSource extends AbstractSource | |
implements EventDrivenSource, Configurable { | |
private int port; | |
private int maxsize = 1 << 16; // 64k is max allowable in RFC 5426 | |
private String host = null; | |
private Channel nettyChannel; | |
private Map<String, String> formaterProp; | |
private boolean isSyslogMultiLineBody; | |
private static final Logger logger = LoggerFactory | |
.getLogger(SyslogUDPSource.class); | |
private CounterGroup counterGroup = new CounterGroup(); | |
public class syslogHandler extends SimpleChannelHandler { | |
private SyslogUtils syslogUtils = new SyslogUtils(true, isSyslogMultiLineBody); | |
public void setFormater(Map<String, String> prop) { | |
syslogUtils.addFormats(prop); | |
} | |
@Override | |
public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) { | |
try { | |
syslogUtils.setEventSize(maxsize); | |
Event e = syslogUtils.extractEvent((ChannelBuffer)mEvent.getMessage()); | |
if (e == null) { | |
return; | |
} | |
getChannelProcessor().processEvent(e); | |
counterGroup.incrementAndGet("events.success"); | |
} catch (ChannelException ex) { | |
counterGroup.incrementAndGet("events.dropped"); | |
logger.error("Error writting to channel", ex); | |
return; | |
} | |
} | |
} | |
@Override | |
public void start() { | |
// setup Netty server | |
ConnectionlessBootstrap serverBootstrap = new ConnectionlessBootstrap | |
(new OioDatagramChannelFactory(Executors.newCachedThreadPool())); | |
final syslogHandler handler = new syslogHandler(); | |
handler.setFormater(formaterProp); | |
serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() { | |
@Override | |
public ChannelPipeline getPipeline() { | |
return Channels.pipeline(handler); | |
} | |
}); | |
if (host == null) { | |
nettyChannel = serverBootstrap.bind(new InetSocketAddress(port)); | |
} else { | |
nettyChannel = serverBootstrap.bind(new InetSocketAddress(host, port)); | |
} | |
super.start(); | |
} | |
@Override | |
public void stop() { | |
logger.info("Syslog UDP Source stopping..."); | |
logger.info("Metrics:{}", counterGroup); | |
if (nettyChannel != null) { | |
nettyChannel.close(); | |
try { | |
nettyChannel.getCloseFuture().await(60, TimeUnit.SECONDS); | |
} catch (InterruptedException e) { | |
logger.warn("netty server stop interrupted", e); | |
} finally { | |
nettyChannel = null; | |
} | |
} | |
super.stop(); | |
} | |
@Override | |
public void configure(Context context) { | |
Configurables.ensureRequiredNonNull( | |
context, SyslogSourceConfigurationConstants.CONFIG_PORT); | |
port = context.getInteger(SyslogSourceConfigurationConstants.CONFIG_PORT); | |
host = context.getString(SyslogSourceConfigurationConstants.CONFIG_HOST); | |
isSyslogMultiLineBody = context.getBoolean(SyslogSourceConfigurationConstants.CONFIG_IS_SYSLOG_MULTILINE_BODY,SyslogSourceConfigurationConstants.DEFAULT_IS_SYSLOG_MULTILINE_BODY); | |
formaterProp = context.getSubProperties( | |
SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, | |
* software distributed under the License is distributed on an | |
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
* KIND, either express or implied. See the License for the | |
* specific language governing permissions and limitations | |
* under the License. | |
*/ | |
package org.apache.flume.source; | |
import org.apache.flume.Event; | |
import org.apache.flume.event.EventBuilder; | |
import org.jboss.netty.buffer.ChannelBuffer; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.ByteArrayOutputStream; | |
import java.io.IOException; | |
import java.text.ParseException; | |
import java.text.SimpleDateFormat; | |
import java.util.ArrayList; | |
import java.util.Calendar; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.regex.MatchResult; | |
import java.util.regex.Matcher; | |
import java.util.regex.Pattern; | |
public class SyslogUtils { | |
final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_2 = "yyyy-MM-dd'T'HH:mm:ss.SZ"; | |
final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_1 = "yyyy-MM-dd'T'HH:mm:ss.S"; | |
final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_3 = "yyyy-MM-dd'T'HH:mm:ssZ"; | |
final public static String SYSLOG_TIMESTAMP_FORMAT_RFC5424_4 = "yyyy-MM-dd'T'HH:mm:ss"; | |
final public static String SYSLOG_TIMESTAMP_FORMAT_RFC3164_1 = "yyyyMMM d HH:mm:ss"; | |
final public static String SYSLOG_MSG_RFC5424_0 = | |
"(?:\\d\\s)?" +// version | |
// yyyy-MM-dd'T'HH:mm:ss.SZ or yyyy-MM-dd'T'HH:mm:ss.S+hh:mm or - (null stamp) | |
"(?:(\\d{4}[-]\\d{2}[-]\\d{2}[T]\\d{2}[:]\\d{2}[:]\\d{2}(?:\\.\\d{1,6})?(?:[+-]\\d{2}[:]\\d{2}|Z)?)|-)" + // stamp | |
"\\s" + // separator | |
"(?:([\\w][\\w\\d\\.@-]*)|-)" + // host name or - (null) | |
"\\s" + // separator | |
"(.*)$"; // body | |
final public static String SYSLOG_MSG_RFC3164_0 = | |
// stamp MMM d HH:mm:ss, single digit date has two spaces | |
"([A-Z][a-z][a-z]\\s{1,2}\\d{1,2}\\s\\d{2}[:]\\d{2}[:]\\d{2})" + | |
"\\s" + // separator | |
"([\\w][\\w\\d\\.@-]*)" + // host | |
"\\s(.*)$"; // body | |
final public static int SYSLOG_TIMESTAMP_POS = 1; | |
final public static int SYSLOG_HOSTNAME_POS = 2; | |
final public static int SYSLOG_BODY_POS = 3; | |
private Mode m = Mode.START; | |
private StringBuilder prio = new StringBuilder(); | |
private ByteArrayOutputStream baos; | |
private static final Logger logger = LoggerFactory | |
.getLogger(SyslogUtils.class); | |
final public static String SYSLOG_FACILITY = "Facility"; | |
final public static String SYSLOG_SEVERITY = "Severity"; | |
final public static String EVENT_STATUS = "flume.syslog.status"; | |
final public static Integer MIN_SIZE = 10; | |
final public static Integer DEFAULT_SIZE = 2500; | |
private final boolean isUdp; | |
private boolean isBadEvent; | |
private boolean isIncompleteEvent; | |
private boolean isSyslogMultiLineBody; | |
private Integer maxSize; | |
private class SyslogFormatter { | |
public Pattern regexPattern; | |
public ArrayList<String> searchPattern = new ArrayList<String>(); | |
public ArrayList<String> replacePattern = new ArrayList<String>(); | |
public ArrayList<SimpleDateFormat> dateFormat = new ArrayList<SimpleDateFormat>(); | |
public boolean addYear; | |
} | |
private ArrayList<SyslogFormatter> formats = new ArrayList<SyslogFormatter>(); | |
private String timeStamp = null; | |
private String hostName = null; | |
private String msgBody = null; | |
public SyslogUtils() { | |
this(false); | |
} | |
public SyslogUtils(boolean isUdp) { | |
this(DEFAULT_SIZE, isUdp, SyslogSourceConfigurationConstants.DEFAULT_IS_SYSLOG_MULTILINE_BODY); | |
} | |
public SyslogUtils(boolean isUdp, boolean isMultilineBody) { | |
this(DEFAULT_SIZE, isUdp, isMultilineBody); | |
} | |
public SyslogUtils(Integer eventSize, boolean isUdp, boolean isMultilineBody){ | |
this.isUdp = isUdp; | |
isBadEvent = false; | |
isIncompleteEvent = false; | |
this.isSyslogMultiLineBody = isMultilineBody; | |
maxSize = (eventSize < MIN_SIZE) ? MIN_SIZE : eventSize; | |
baos = new ByteArrayOutputStream(eventSize); | |
initHeaderFormats(); | |
} | |
// extend the default header formatter | |
public void addFormats(Map<String, String> formatProp) { | |
if (formatProp.isEmpty() || !formatProp.containsKey( | |
SyslogSourceConfigurationConstants.CONFIG_REGEX)) { | |
return; | |
} | |
SyslogFormatter fmt1 = new SyslogFormatter(); | |
fmt1.regexPattern = Pattern.compile( formatProp.get( | |
SyslogSourceConfigurationConstants.CONFIG_REGEX) ); | |
if (formatProp.containsKey( | |
SyslogSourceConfigurationConstants.CONFIG_SEARCH)) { | |
fmt1.searchPattern.add(formatProp.get( | |
SyslogSourceConfigurationConstants.CONFIG_SEARCH)); | |
} | |
if (formatProp.containsKey( | |
SyslogSourceConfigurationConstants.CONFIG_REPLACE)) { | |
fmt1.replacePattern.add(formatProp.get( | |
SyslogSourceConfigurationConstants.CONFIG_REPLACE)); | |
} | |
if (formatProp.containsKey( | |
SyslogSourceConfigurationConstants.CONFIG_DATEFORMAT)) { | |
fmt1.dateFormat.add(new SimpleDateFormat(formatProp.get( | |
SyslogSourceConfigurationConstants.CONFIG_DATEFORMAT))); | |
} | |
formats.add(0, fmt1); | |
} | |
// setup built-in formats | |
private void initHeaderFormats() { | |
// setup RFC5424 formater | |
SyslogFormatter fmt1 = new SyslogFormatter(); | |
fmt1.regexPattern = Pattern.compile(SYSLOG_MSG_RFC5424_0); | |
// 'Z' in timestamp indicates UTC zone, so replace it it with '+0000' for date formatting | |
fmt1.searchPattern.add("Z"); | |
fmt1.replacePattern.add("+0000"); | |
// timezone in RFC5424 is [+-]tt:tt, so remove the ':' for java date formatting | |
fmt1.searchPattern.add("([+-])(\\d{2})[:](\\d{2})"); | |
fmt1.replacePattern.add("$1$2$3"); | |
fmt1.dateFormat.add(new SimpleDateFormat(SYSLOG_TIMESTAMP_FORMAT_RFC5424_1)); | |
fmt1.dateFormat.add(new SimpleDateFormat(SYSLOG_TIMESTAMP_FORMAT_RFC5424_2)); | |
fmt1.dateFormat.add(new SimpleDateFormat(SYSLOG_TIMESTAMP_FORMAT_RFC5424_3)); | |
fmt1.dateFormat.add(new SimpleDateFormat(SYSLOG_TIMESTAMP_FORMAT_RFC5424_4)); | |
fmt1.addYear = false; | |
// setup RFC3164 formater | |
SyslogFormatter fmt2 = new SyslogFormatter(); | |
fmt2.regexPattern = Pattern.compile(SYSLOG_MSG_RFC3164_0, Pattern.MULTILINE|Pattern.DOTALL); | |
// the single digit date has two spaces, so trim it | |
fmt2.searchPattern.add(" "); | |
fmt2.replacePattern.add(" "); | |
fmt2.dateFormat.add(new SimpleDateFormat(SYSLOG_TIMESTAMP_FORMAT_RFC3164_1)); | |
fmt2.addYear = true; | |
formats.add(fmt1); | |
formats.add(fmt2); | |
} | |
enum Mode { | |
START, PRIO, DATA | |
}; | |
public enum SyslogStatus{ | |
OTHER("Unknown"), | |
INVALID("Invalid"), | |
INCOMPLETE("Incomplete"); | |
private final String syslogStatus; | |
private SyslogStatus(String status){ | |
syslogStatus = status; | |
} | |
public String getSyslogStatus(){ | |
return this.syslogStatus; | |
} | |
} | |
// create the event from syslog data | |
Event buildEvent() { | |
byte[] body; | |
int pri = 0; | |
int sev = 0; | |
int facility = 0; | |
if(!isBadEvent){ | |
pri = Integer.parseInt(prio.toString()); | |
sev = pri % 8; | |
facility = pri / 8; | |
formatHeaders(); | |
} | |
Map <String, String> headers = new HashMap<String, String>(); | |
headers.put(SYSLOG_FACILITY, String.valueOf(facility)); | |
headers.put(SYSLOG_SEVERITY, String.valueOf(sev)); | |
if ((timeStamp != null) && timeStamp.length() > 0) { | |
headers.put("timestamp", timeStamp); | |
} | |
if ((hostName != null) && (hostName.length() > 0)) { | |
headers.put("host", hostName); | |
} | |
if(isBadEvent){ | |
logger.warn("Event created from Invalid Syslog data."); | |
headers.put(EVENT_STATUS, SyslogStatus.INVALID.getSyslogStatus()); | |
} else if(isIncompleteEvent){ | |
logger.warn("Event size larger than specified event size: {}. You should " + | |
"consider increasing your event size.", maxSize); | |
headers.put(EVENT_STATUS, SyslogStatus.INCOMPLETE.getSyslogStatus()); | |
} | |
if ((msgBody != null) && (msgBody.length() > 0)) { | |
body = msgBody.getBytes(); | |
} else { | |
body = baos.toByteArray(); | |
} | |
reset(); | |
// format the message | |
return EventBuilder.withBody(body, headers); | |
} | |
// Apply each known pattern to message | |
private void formatHeaders() { | |
String eventStr = baos.toString(); | |
for(int p=0; p < formats.size(); p++) { | |
SyslogFormatter fmt = formats.get(p); | |
Pattern pattern = fmt.regexPattern; | |
Matcher matcher = pattern.matcher(eventStr); | |
if (! matcher.matches()) { | |
continue; | |
} | |
MatchResult res = matcher.toMatchResult(); | |
for (int grp=1; grp <= res.groupCount(); grp++) { | |
String value = res.group(grp); | |
if (grp == SYSLOG_TIMESTAMP_POS) { | |
// apply available format replacements to timestamp | |
if (value != null) { | |
for (int sp=0; sp < fmt.searchPattern.size(); sp++) { | |
value = value.replaceAll(fmt.searchPattern.get(sp), fmt.replacePattern.get(sp)); | |
} | |
// Add year to timestamp if needed | |
if (fmt.addYear) { | |
value = String.valueOf(Calendar.getInstance().get(Calendar.YEAR)) + value; | |
} | |
// try the available time formats to timestamp | |
for (int dt = 0; dt < fmt.dateFormat.size(); dt++) { | |
try { | |
timeStamp = String.valueOf(fmt.dateFormat.get(dt).parse(value).getTime()); | |
break; // done. formatted the time | |
} catch (ParseException e) { | |
// Error formatting the timeStamp, try next format | |
continue; | |
} | |
} | |
} | |
} else if (grp == SYSLOG_HOSTNAME_POS) { | |
hostName = value; | |
} else if (grp == SYSLOG_BODY_POS) { | |
msgBody = value; | |
} | |
} | |
break; // we successfully parsed the message using this pattern | |
} | |
} | |
private void reset(){ | |
baos.reset(); | |
m = Mode.START; | |
prio.delete(0, prio.length()); | |
isBadEvent = false; | |
isIncompleteEvent = false; | |
hostName = null; | |
timeStamp = null; | |
msgBody = null; | |
} | |
// extract relevant syslog data needed for building Flume event | |
public Event extractEvent(ChannelBuffer in){ | |
/* for protocol debugging | |
ByteBuffer bb = in.toByteBuffer(); | |
int remaining = bb.remaining(); | |
byte[] buf = new byte[remaining]; | |
bb.get(buf); | |
HexDump.dump(buf, 0, System.out, 0); | |
*/ | |
byte b = 0; | |
Event e = null; | |
boolean doneReading = false; | |
try { | |
while (!doneReading && in.readable()) { | |
b = in.readByte(); | |
switch (m) { | |
case START: | |
if (b == '<') { | |
m = Mode.PRIO; | |
} else if(b == '\n'){ | |
//If the character is \n, it was because the last event was exactly | |
//as long as the maximum size allowed and | |
//the only remaining character was the delimiter - '\n', or | |
//multiple delimiters were sent in a row. | |
//Just ignore it, and move forward, don't change the mode. | |
//This is a no-op, just ignore it. | |
logger.debug("Delimiter found while in START mode, ignoring.."); | |
} else { | |
isBadEvent = true; | |
baos.write(b); | |
//Bad event, just dump everything as if it is data. | |
m = Mode.DATA; | |
} | |
break; | |
case PRIO: | |
if (b == '>') { | |
m = Mode.DATA; | |
} else { | |
char ch = (char) b; | |
prio.append(ch); | |
if (!Character.isDigit(ch)) { | |
isBadEvent = true; | |
//Append the priority to baos: | |
String badPrio = "<"+ prio; | |
baos.write(badPrio.getBytes()); | |
//If we hit a bad priority, just write as if everything is data. | |
m = Mode.DATA; | |
} | |
} | |
break; | |
case DATA: | |
// TCP syslog entries are separated by '\n' | |
// UDP syslog entries with multi-line body separated by '\n' or '\r' | |
// multi-line event processing handled by sink | |
if(!isSyslogMultiLineBody){ | |
if (b == '\n') { | |
e = buildEvent(); | |
doneReading = true; | |
} else { | |
baos.write(b); | |
} | |
}else{baos.write(b); | |
} | |
if(baos.size() == this.maxSize && !doneReading){ | |
isIncompleteEvent = true; | |
e = buildEvent(); | |
doneReading = true; | |
} | |
break; | |
} | |
} | |
// UDP doesn't send a newline, so just use what we received | |
if (e == null && isUdp) { | |
doneReading = true; | |
e = buildEvent(); | |
} | |
//} catch (IndexOutOfBoundsException eF) { | |
// e = buildEvent(prio, baos); | |
} catch (IOException e1) { | |
//no op | |
} finally { | |
// no-op | |
} | |
return e; | |
} | |
public Integer getEventSize() { | |
return maxSize; | |
} | |
public void setEventSize(Integer eventSize) { | |
this.maxSize = eventSize; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, | |
* software distributed under the License is distributed on an | |
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
* KIND, either express or implied. See the License for the | |
* specific language governing permissions and limitations | |
* under the License. | |
*/ | |
package org.apache.flume.source; | |
import org.apache.flume.Event; | |
import org.jboss.netty.buffer.ChannelBuffer; | |
import org.jboss.netty.buffer.ChannelBuffers; | |
import org.junit.Assert; | |
import org.junit.Test; | |
import java.text.ParseException; | |
import java.text.SimpleDateFormat; | |
import java.util.Calendar; | |
import java.util.Map; | |
public class TestSyslogUtils { | |
@Test | |
public void TestHeader0() throws ParseException { | |
String stamp1 = "2012-04-13T11:11:11"; | |
String format1 = "yyyy-MM-dd'T'HH:mm:ssZ"; | |
String host1 = "ubuntu-11.cloudera.com"; | |
String data1 = "some msg"; | |
// timestamp with hh:mm format timezone with no version | |
String msg1 = "<10>" + stamp1+ "+08:00" + " " + host1 + " " + data1 + "\n"; | |
checkHeader(msg1, stamp1 + "+0800", format1, host1, data1); | |
} | |
@Test | |
public void TestHeader1() throws ParseException { | |
String stamp1 = "2012-04-13T11:11:11"; | |
String format1 = "yyyy-MM-dd'T'HH:mm:ss"; | |
String host1 = "ubuntu-11.cloudera.com"; | |
String data1 = "some msg"; | |
String msg1 = "<10>1 " + stamp1 + " " + host1 + " " + data1 + "\n"; | |
checkHeader(msg1, stamp1, format1, host1, data1); | |
} | |
@Test | |
public void TestHeader2() throws ParseException { | |
String stamp1 = "2012-04-13T11:11:11"; | |
String format1 = "yyyy-MM-dd'T'HH:mm:ssZ"; | |
String host1 = "ubuntu-11.cloudera.com"; | |
String data1 = "some msg"; | |
// timestamp with 'Z' appended, translates to UTC | |
String msg1 = "<10>1 " + stamp1+ "Z" + " " + host1 + " " + data1 + "\n"; | |
checkHeader(msg1, stamp1 + "+0000", format1, host1, data1); | |
} | |
@Test | |
public void TestHeader3() throws ParseException { | |
String stamp1 = "2012-04-13T11:11:11"; | |
String format1 = "yyyy-MM-dd'T'HH:mm:ssZ"; | |
String host1 = "ubuntu-11.cloudera.com"; | |
String data1 = "some msg"; | |
// timestamp with hh:mm format timezone | |
String msg1 = "<10>1 " + stamp1+ "+08:00" + " " + host1 + " " + data1 + "\n"; | |
checkHeader(msg1, stamp1 + "+0800", format1, host1, data1); | |
} | |
@Test | |
public void TestHeader4() throws ParseException { | |
String host1 = "ubuntu-11.cloudera.com"; | |
String data1 = "some msg"; | |
// null format timestamp (-) | |
String msg1 = "<10>1 " + "-" + " " + host1 + " " + data1 + "\n"; | |
checkHeader(msg1, null, null, host1, data1); | |
} | |
@Test | |
public void TestHeader5() throws ParseException { | |
String stamp1 = "2012-04-13T11:11:11"; | |
String format1 = "yyyy-MM-dd'T'HH:mm:ss"; | |
String host1 = "-"; | |
String data1 = "some msg"; | |
// null host | |
String msg1 = "<10>1 " + stamp1 + " " + host1 + " " + data1 + "\n"; | |
checkHeader(msg1, stamp1, format1, null, data1); | |
} | |
@Test | |
public void TestHeader6() throws ParseException { | |
String stamp1 = "2012-04-13T11:11:11"; | |
String format1 = "yyyy-MM-dd'T'HH:mm:ssZ"; | |
String host1 = "-"; | |
String data1 = "some msg"; | |
// null host | |
String msg1 = "<10>1 " + stamp1+ "Z" + " " + host1 + " " + data1 + "\n"; | |
checkHeader(msg1, stamp1 + "+0000", format1, null, data1); | |
} | |
@Test | |
public void TestHeader7() throws ParseException { | |
String stamp1 = "2012-04-13T11:11:11"; | |
String format1 = "yyyy-MM-dd'T'HH:mm:ssZ"; | |
String host1 = "-"; | |
String data1 = "some msg"; | |
// null host | |
String msg1 = "<10>1 " + stamp1+ "+08:00" + " " + host1 + " " + data1 + "\n"; | |
checkHeader(msg1, stamp1 + "+0800", format1, null, data1); | |
} | |
@Test | |
public void TestHeader8() throws ParseException { | |
String stamp1 = "2012-04-13T11:11:11.999"; | |
String format1 = "yyyy-MM-dd'T'HH:mm:ss.S"; | |
String host1 = "ubuntu-11.cloudera.com"; | |
String data1 = "some msg"; | |
String msg1 = "<10>1 " + stamp1 + " " + host1 + " " + data1 + "\n"; | |
checkHeader(msg1, stamp1, format1, host1, data1); | |
} | |
@Test | |
public void TestHeader9() throws ParseException { | |
String stamp1 = "Apr 11 13:14:04"; | |
String format1 = "yyyyMMM d HH:mm:ss"; | |
String host1 = "ubuntu-11.cloudera.com"; | |
String data1 = "some msg"; | |
// timestamp with 'Z' appended, translates to UTC | |
String msg1 = "<10>" + stamp1 + " " + host1 + " " + data1 + "\n"; | |
checkHeader(msg1, String.valueOf(Calendar.getInstance().get(Calendar.YEAR)) + stamp1, | |
format1, host1, data1); | |
} | |
@Test | |
public void TestHeader10() throws ParseException { | |
String stamp1 = "Apr 1 13:14:04"; | |
String format1 = "yyyyMMM d HH:mm:ss"; | |
String host1 = "ubuntu-11.cloudera.com"; | |
String data1 = "some msg"; | |
// timestamp with 'Z' appended, translates to UTC | |
String msg1 = "<10>" + stamp1 + " " + host1 + " " + data1 + "\n"; | |
checkHeader(msg1, String.valueOf(Calendar.getInstance().get(Calendar.YEAR)) + stamp1, | |
format1, host1, data1); | |
} | |
@Test | |
public void TestRfc3164HeaderApacheLogWithNulls() throws ParseException { | |
String stamp1 = "Apr 1 13:14:04"; | |
String format1 = "yyyyMMM d HH:mm:ss"; | |
String host1 = "ubuntu-11.cloudera.com"; | |
String data1 = "- hyphen_null_breaks_5424_pattern [07/Jun/2012:14:46:44 -0600]"; | |
String msg1 = "<10>" + stamp1 + " " + host1 + " " + data1 + "\n"; | |
checkHeader(msg1, String.valueOf(Calendar.getInstance().get(Calendar.YEAR)) + stamp1, | |
format1, host1, data1); | |
} | |
public void checkHeader(String msg1, String stamp1, String format1, String host1, String data1) throws ParseException { | |
SyslogUtils util = new SyslogUtils(false); | |
ChannelBuffer buff = ChannelBuffers.buffer(200); | |
buff.writeBytes(msg1.getBytes()); | |
Event e = util.extractEvent(buff); | |
if(e == null){ | |
throw new NullPointerException("Event is null"); | |
} | |
Map<String, String> headers2 = e.getHeaders(); | |
if (stamp1 == null) { | |
Assert.assertFalse(headers2.containsKey("timestamp")); | |
} else { | |
SimpleDateFormat formater = new SimpleDateFormat(format1); | |
Assert.assertEquals(String.valueOf(formater.parse(stamp1).getTime()), headers2.get("timestamp")); | |
} | |
if (host1 == null) { | |
Assert.assertFalse(headers2.containsKey("host")); | |
} else { | |
String host2 = headers2.get("host"); | |
Assert.assertEquals(host2,host1); | |
} | |
Assert.assertEquals(data1, new String(e.getBody())); | |
} | |
/** | |
* Test bad event format 1: Priority is not numeric | |
*/ | |
@Test | |
public void testExtractBadEvent1() { | |
String badData1 = "<10F> bad bad data\n"; | |
SyslogUtils util = new SyslogUtils(false); | |
ChannelBuffer buff = ChannelBuffers.buffer(100); | |
buff.writeBytes(badData1.getBytes()); | |
Event e = util.extractEvent(buff); | |
if(e == null){ | |
throw new NullPointerException("Event is null"); | |
} | |
Map<String, String> headers = e.getHeaders(); | |
Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_FACILITY)); | |
Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_SEVERITY)); | |
Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(), | |
headers.get(SyslogUtils.EVENT_STATUS)); | |
Assert.assertEquals(badData1.trim(), new String(e.getBody()).trim()); | |
} | |
/** | |
* Test bad event format 2: The first char is not < | |
*/ | |
@Test | |
public void testExtractBadEvent2() { | |
String badData1 = "hi guys! <10> bad bad data\n"; | |
SyslogUtils util = new SyslogUtils(false); | |
ChannelBuffer buff = ChannelBuffers.buffer(100); | |
buff.writeBytes(badData1.getBytes()); | |
Event e = util.extractEvent(buff); | |
if(e == null){ | |
throw new NullPointerException("Event is null"); | |
} | |
Map<String, String> headers = e.getHeaders(); | |
Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_FACILITY)); | |
Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_SEVERITY)); | |
Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(), | |
headers.get(SyslogUtils.EVENT_STATUS)); | |
Assert.assertEquals(badData1.trim(), new String(e.getBody()).trim()); | |
} | |
/** | |
* Good event | |
*/ | |
@Test | |
public void testExtractGoodEvent() { | |
String priority = "<10>"; | |
String goodData1 = "Good good good data\n"; | |
SyslogUtils util = new SyslogUtils(false); | |
ChannelBuffer buff = ChannelBuffers.buffer(100); | |
buff.writeBytes((priority+goodData1).getBytes()); | |
Event e = util.extractEvent(buff); | |
if(e == null){ | |
throw new NullPointerException("Event is null"); | |
} | |
Map<String, String> headers = e.getHeaders(); | |
Assert.assertEquals("1", headers.get(SyslogUtils.SYSLOG_FACILITY)); | |
Assert.assertEquals("2", headers.get(SyslogUtils.SYSLOG_SEVERITY)); | |
Assert.assertEquals(null, headers.get(SyslogUtils.EVENT_STATUS)); | |
Assert.assertEquals(goodData1.trim(), new String(e.getBody()).trim()); | |
} | |
/** | |
* Bad event immediately followed by a good event | |
*/ | |
@Test | |
public void testBadEventGoodEvent(){ | |
String badData1 = "hi guys! <10F> bad bad data\n"; | |
SyslogUtils util = new SyslogUtils(false); | |
ChannelBuffer buff = ChannelBuffers.buffer(100); | |
buff.writeBytes(badData1.getBytes()); | |
String priority = "<10>"; | |
String goodData1 = "Good good good data\n"; | |
buff.writeBytes((priority+goodData1).getBytes()); | |
Event e = util.extractEvent(buff); | |
if(e == null){ | |
throw new NullPointerException("Event is null"); | |
} | |
Map<String, String> headers = e.getHeaders(); | |
Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_FACILITY)); | |
Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_SEVERITY)); | |
Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(), | |
headers.get(SyslogUtils.EVENT_STATUS)); | |
Assert.assertEquals(badData1.trim(), new String(e.getBody()).trim()); | |
Event e2 = util.extractEvent(buff); | |
if(e2 == null){ | |
throw new NullPointerException("Event is null"); | |
} | |
Map<String, String> headers2 = e2.getHeaders(); | |
Assert.assertEquals("1", headers2.get(SyslogUtils.SYSLOG_FACILITY)); | |
Assert.assertEquals("2", headers2.get(SyslogUtils.SYSLOG_SEVERITY)); | |
Assert.assertEquals(null, | |
headers2.get(SyslogUtils.EVENT_STATUS)); | |
Assert.assertEquals(goodData1.trim(), new String(e2.getBody()).trim()); | |
} | |
@Test | |
public void testGoodEventBadEvent(){ | |
String badData1 = "hi guys! <10F> bad bad data\n"; | |
String priority = "<10>"; | |
String goodData1 = "Good good good data\n"; | |
SyslogUtils util = new SyslogUtils(false); | |
ChannelBuffer buff = ChannelBuffers.buffer(100); | |
buff.writeBytes((priority+goodData1).getBytes()); | |
buff.writeBytes(badData1.getBytes()); | |
Event e2 = util.extractEvent(buff); | |
if(e2 == null){ | |
throw new NullPointerException("Event is null"); | |
} | |
Map<String, String> headers2 = e2.getHeaders(); | |
Assert.assertEquals("1", headers2.get(SyslogUtils.SYSLOG_FACILITY)); | |
Assert.assertEquals("2", headers2.get(SyslogUtils.SYSLOG_SEVERITY)); | |
Assert.assertEquals(null, | |
headers2.get(SyslogUtils.EVENT_STATUS)); | |
Assert.assertEquals(goodData1.trim(), new String(e2.getBody()).trim()); | |
Event e = util.extractEvent(buff); | |
if(e == null){ | |
throw new NullPointerException("Event is null"); | |
} | |
Map<String, String> headers = e.getHeaders(); | |
Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_FACILITY)); | |
Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_SEVERITY)); | |
Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(), | |
headers.get(SyslogUtils.EVENT_STATUS)); | |
Assert.assertEquals(badData1.trim(), new String(e.getBody()).trim()); | |
} | |
@Test | |
public void testBadEventBadEvent(){ | |
String badData1 = "hi guys! <10F> bad bad data\n"; | |
SyslogUtils util = new SyslogUtils(false); | |
ChannelBuffer buff = ChannelBuffers.buffer(100); | |
buff.writeBytes(badData1.getBytes()); | |
String badData2 = "hi guys! <20> bad bad data\n"; | |
buff.writeBytes((badData2).getBytes()); | |
Event e = util.extractEvent(buff); | |
if(e == null){ | |
throw new NullPointerException("Event is null"); | |
} | |
Map<String, String> headers = e.getHeaders(); | |
Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_FACILITY)); | |
Assert.assertEquals("0", headers.get(SyslogUtils.SYSLOG_SEVERITY)); | |
Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(), | |
headers.get(SyslogUtils.EVENT_STATUS)); | |
Assert.assertEquals(badData1.trim(), new String(e.getBody()).trim()); | |
Event e2 = util.extractEvent(buff); | |
if(e2 == null){ | |
throw new NullPointerException("Event is null"); | |
} | |
Map<String, String> headers2 = e2.getHeaders(); | |
Assert.assertEquals("0", headers2.get(SyslogUtils.SYSLOG_FACILITY)); | |
Assert.assertEquals("0", headers2.get(SyslogUtils.SYSLOG_SEVERITY)); | |
Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(), | |
headers2.get(SyslogUtils.EVENT_STATUS)); | |
Assert.assertEquals(badData2.trim(), new String(e2.getBody()).trim()); | |
} | |
@Test | |
public void testGoodEventGoodEvent() { | |
String priority = "<10>"; | |
String goodData1 = "Good good good data\n"; | |
SyslogUtils util = new SyslogUtils(false); | |
ChannelBuffer buff = ChannelBuffers.buffer(100); | |
buff.writeBytes((priority+goodData1).getBytes()); | |
String priority2 = "<20>"; | |
String goodData2 = "Good really good data\n"; | |
buff.writeBytes((priority2+goodData2).getBytes()); | |
Event e = util.extractEvent(buff); | |
if(e == null){ | |
throw new NullPointerException("Event is null"); | |
} | |
Map<String, String> headers = e.getHeaders(); | |
Assert.assertEquals("1", headers.get(SyslogUtils.SYSLOG_FACILITY)); | |
Assert.assertEquals("2", headers.get(SyslogUtils.SYSLOG_SEVERITY)); | |
Assert.assertEquals(null, | |
headers.get(SyslogUtils.EVENT_STATUS)); | |
Assert.assertEquals(goodData1.trim(), new String(e.getBody()).trim()); | |
Event e2 = util.extractEvent(buff); | |
if(e2 == null){ | |
throw new NullPointerException("Event is null"); | |
} | |
Map<String, String> headers2 = e2.getHeaders(); | |
Assert.assertEquals("2", headers2.get(SyslogUtils.SYSLOG_FACILITY)); | |
Assert.assertEquals("4", headers2.get(SyslogUtils.SYSLOG_SEVERITY)); | |
Assert.assertEquals(null, | |
headers.get(SyslogUtils.EVENT_STATUS)); | |
Assert.assertEquals(goodData2.trim(), new String(e2.getBody()).trim()); | |
} | |
@Test | |
public void testExtractBadEventLarge() { | |
String badData1 = "<10> bad bad data bad bad\n"; | |
SyslogUtils util = new SyslogUtils(5, false, false); | |
ChannelBuffer buff = ChannelBuffers.buffer(100); | |
buff.writeBytes(badData1.getBytes()); | |
Event e = util.extractEvent(buff); | |
if(e == null){ | |
throw new NullPointerException("Event is null"); | |
} | |
Map<String, String> headers = e.getHeaders(); | |
Assert.assertEquals("1", headers.get(SyslogUtils.SYSLOG_FACILITY)); | |
Assert.assertEquals("2", headers.get(SyslogUtils.SYSLOG_SEVERITY)); | |
Assert.assertEquals(SyslogUtils.SyslogStatus.INCOMPLETE.getSyslogStatus(), | |
headers.get(SyslogUtils.EVENT_STATUS)); | |
Assert.assertEquals("bad bad d".trim(), new String(e.getBody()).trim()); | |
Event e2 = util.extractEvent(buff); | |
if(e2 == null){ | |
throw new NullPointerException("Event is null"); | |
} | |
Map<String, String> headers2 = e2.getHeaders(); | |
Assert.assertEquals("0", headers2.get(SyslogUtils.SYSLOG_FACILITY)); | |
Assert.assertEquals("0", headers2.get(SyslogUtils.SYSLOG_SEVERITY)); | |
Assert.assertEquals(SyslogUtils.SyslogStatus.INVALID.getSyslogStatus(), | |
headers2.get(SyslogUtils.EVENT_STATUS)); | |
Assert.assertEquals("ata bad ba".trim(), new String(e2.getBody()).trim()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment