Skip to content

Instantly share code, notes, and snippets.

@pipern
Last active November 12, 2020 14:56
Show Gist options
  • Save pipern/2d67fbc0a4225cf1b42d1fff832c2c54 to your computer and use it in GitHub Desktop.
Save pipern/2d67fbc0a4225cf1b42d1fff832c2c54 to your computer and use it in GitHub Desktop.
NiFi CEF record reader
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* Author: Nick Piper / CGI Inc. 2020-11-12 */
/*
Can test with
mvn -T 2.0C -Dtest=org.apache.nifi.record.script.ScriptedReaderTest test
Requires java libraries to be provided in 'modules' folder, which can
be taken from nifi-standard-nar-1.11.4.nar
bval-core-1.1.2.jar
bval-jsr-1.1.2.jar
javax.el-api-3.0.0.jar
macnificent-0.2.0.jar
ParCEFone-1.2.6.jar
validation-api-1.1.0.Final.jar
*/
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.controller.ConfigurationContext
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.schema.access.SchemaNotFoundException
import org.apache.nifi.serialization.MalformedRecordException
import org.apache.nifi.serialization.RecordReader
import org.apache.nifi.serialization.RecordReaderFactory
import org.apache.nifi.serialization.SimpleRecordSchema
import org.apache.nifi.serialization.record.MapRecord
import org.apache.nifi.serialization.record.Record
import org.apache.nifi.serialization.record.RecordField
import org.apache.nifi.serialization.record.RecordFieldType
import org.apache.nifi.serialization.record.RecordSchema
import com.fluenda.parcefone.event.CEFHandlingException
import com.fluenda.parcefone.event.CommonEvent
import com.fluenda.parcefone.parser.CEFParser
import javax.validation.Validation
import javax.validation.Validator
class GroovyCEFRecordReader implements RecordReader {
def bufferedreader
def parser
def recordSchema = new SimpleRecordSchema(
[new RecordField('version', RecordFieldType.INT.dataType),
new RecordField('deviceVendor', RecordFieldType.STRING.dataType),
new RecordField('deviceProduct', RecordFieldType.STRING.dataType),
new RecordField('deviceVersion', RecordFieldType.STRING.dataType),
new RecordField('deviceEventClassId', RecordFieldType.STRING.dataType),
new RecordField('name', RecordFieldType.STRING.dataType),
new RecordField('severity', RecordFieldType.STRING.dataType),
new RecordField('extension', RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.dataType))]
)
GroovyCEFRecordReader(final InputStream inputStream) {
final Validator validator = Validation.buildDefaultValidatorFactory().getValidator();
bufferedreader = new BufferedReader(new InputStreamReader(inputStream));
parser = new CEFParser(validator)
}
Record nextRecord(boolean coerceTypes, boolean dropUnknown) throws IOException, MalformedRecordException {
def result
if(bufferedreader.ready()) {
try {
result = parser.parse(bufferedreader.readLine(), true)
if (!result) {
throw new MalformedRecordException("Failed to parse line as CEF");
}
return new MapRecord(recordSchema,
['version': 0,
'deviceVendor': result.getHeader().get("deviceVendor"),
'deviceProduct': result.getHeader().get("deviceProduct"),
'deviceVersion': result.getHeader().get("deviceVersion"),
'deviceEventClassId': result.getHeader().get("deviceEventClassId"),
'name': result.getHeader().get("name"),
'severity': result.getHeader().get("severity"),
'extension': result.getExtension(true)])
} catch (final java.lang.NumberFormatException e) {
// TODO why does parser.parse() sometimes through this? Reading through it's implementation implies it returns null
throw new MalformedRecordException("Failed to parse line as CEF", e);
}
} else {
// bufferedreader not ready
return null
}
}
RecordSchema getSchema() throws MalformedRecordException {
return recordSchema
}
void close() throws IOException {
bufferedreader.close()
}
}
class GroovyCEFRecordReaderFactory extends AbstractControllerService implements RecordReaderFactory {
ConfigurationContext configurationContext
RecordReader createRecordReader(Map<String, String> variables, InputStream inputStream, long inputLength, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
return new GroovyCEFRecordReader(inputStream)
}
}
reader = new GroovyCEFRecordReaderFactory()
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedReaderTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedReaderTest.groovy
index 8be0b6f2d6..8c1244e961 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedReaderTest.groovy
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedReaderTest.groovy
@@ -26,6 +26,7 @@ import org.apache.nifi.processors.script.AccessibleScriptingComponentHelper
import org.apache.nifi.script.ScriptingComponentHelper
import org.apache.nifi.script.ScriptingComponentUtils
import org.apache.nifi.serialization.RecordReader
+import org.apache.nifi.serialization.MalformedRecordException
import org.apache.nifi.util.MockComponentLog
import org.apache.nifi.util.MockPropertyValue
import org.apache.nifi.util.TestRunners
@@ -190,6 +191,98 @@ class ScriptedReaderTest {
}
assertNull(recordReader.nextRecord())
+
+ }
+
+ @Test
+ void testCEFRecordReaderGroovyScript() {
+
+ def properties = [:] as Map<PropertyDescriptor, String>
+ recordReaderFactory.getSupportedPropertyDescriptors().each {PropertyDescriptor descriptor ->
+ properties.put(descriptor, descriptor.getDefaultValue())
+ }
+
+ // Mock the ConfigurationContext for setup(...)
+ def configurationContext = mock(ConfigurationContext)
+ when(configurationContext.getProperties()).thenReturn(properties)
+ when(configurationContext.getProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE))
+ .thenReturn(new MockPropertyValue('Groovy'))
+ when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_FILE))
+ .thenReturn(new MockPropertyValue('target/test/resources/groovy/record_reader_cef.groovy'))
+ when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_BODY))
+ .thenReturn(new MockPropertyValue(null))
+ when(configurationContext.getProperty(ScriptingComponentUtils.MODULES))
+ .thenReturn(new MockPropertyValue('target/test/resources/groovy/modules'))
+
+ def logger = new MockComponentLog('ScriptedReader', '')
+ def initContext = mock(ControllerServiceInitializationContext)
+ when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString())
+ when(initContext.getLogger()).thenReturn(logger)
+
+ recordReaderFactory.initialize initContext
+ recordReaderFactory.onEnabled configurationContext
+
+ Map<String, String> schemaVariables = [:]
+
+ byte[] contentBytes = '''CEF:0|Symantec|Symantec Endpoint Protection Mobile |4.1.0|incident_opened|Secure Traffic Decryption|10|externalId=123456789 deviceExternalId=987654321 cn1=123456 [email protected] cs2=Network threats cs3=AIRPORT FREE WIFI msg=The "AIRPORT FREE WIFI" hotspot has attempted to decrypt secure communications cs4=iOS cs5=Device tagged as compliant cs6=Trojan src=172.31.20.236 shost=ip-172-31-20-236 end=2016-02-13 06:16:02 UTC
+CEF:0|Symantec|Symantec Endpoint Protection Mobile |4.2.0|incident_opened|Secure Traffic Decryption|10|externalId=123456789 deviceExternalId=987654321 cn1=123456 [email protected] cs2=Network threats cs3=AIRPORT FREE WIFI msg=The "AIRPORT FREE WIFI" hotspot has attempted to decrypt secure communications cs4=iOS cs5=Device tagged as compliant cs6=Trojan src=172.31.20.236 shost=ip-172-31-20-236 end=2016-02-13 06:16:02 UTC
+CEF:0|Symantec|Symantec Endpoint Protection Mobile |4.3.0|incident_opened|Secure Traffic Decryption|10|externalId=123456789 deviceExternalId=987654321 cn1=123456 [email protected] cs2=Network threats cs3=AIRPORT FREE WIFI msg=The "AIRPORT FREE WIFI" hotspot has attempted to decrypt secure communications cs4=iOS cs5=Device tagged as compliant cs6=Trojan src=172.31.20.236 shost=ip-172-31-20-236 end=2016-02-13 06:16:02 UTC
+CEF:0|Symantec|Symantec Endpoint Protection Mobile |4.4.0|incident_opened|Secure Traffic Decryption|10|externalId=123456789 deviceExternalId=987654321 cn1=123456 [email protected] cs2=Network threats cs3=AIRPORT FREE WIFI msg=The "AIRPORT FREE WIFI" hotspot has attempted to decrypt secure communications cs4=iOS cs5=Device tagged as compliant cs6=Trojan src=172.31.20.236 shost=ip-172-31-20-236 end=2016-02-13 06:16:02 UTC
+'''.bytes
+
+ InputStream inStream = new ByteArrayInputStream(contentBytes)
+
+ RecordReader recordReader = recordReaderFactory.createRecordReader(schemaVariables, inStream, contentBytes.length, logger)
+ assertNotNull(recordReader)
+
+ 4.times {
+ def record = recordReader.nextRecord()
+ assertNotNull(record)
+ assertEquals(0, record.getAsInt('version'))
+ assertEquals("Symantec", record.getAsString('deviceVendor'))
+ // assertEquals("iOS", record.getAsString('extension/cs4'))
+ }
+ assertNull(recordReader.nextRecord())
+
+ }
+
+ @Test(expected = MalformedRecordException.class)
+ void testCEFRecordReaderGroovyScriptMalformed() {
+
+ def properties = [:] as Map<PropertyDescriptor, String>
+ recordReaderFactory.getSupportedPropertyDescriptors().each {PropertyDescriptor descriptor ->
+ properties.put(descriptor, descriptor.getDefaultValue())
+ }
+
+ // Mock the ConfigurationContext for setup(...)
+ def configurationContext = mock(ConfigurationContext)
+ when(configurationContext.getProperties()).thenReturn(properties)
+ when(configurationContext.getProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE))
+ .thenReturn(new MockPropertyValue('Groovy'))
+ when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_FILE))
+ .thenReturn(new MockPropertyValue('target/test/resources/groovy/record_reader_cef.groovy'))
+ when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_BODY))
+ .thenReturn(new MockPropertyValue(null))
+ when(configurationContext.getProperty(ScriptingComponentUtils.MODULES))
+ .thenReturn(new MockPropertyValue('target/test/resources/groovy/modules'))
+
+ def logger = new MockComponentLog('ScriptedReader', '')
+ def initContext = mock(ControllerServiceInitializationContext)
+ when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString())
+ when(initContext.getLogger()).thenReturn(logger)
+
+ recordReaderFactory.initialize initContext
+ recordReaderFactory.onEnabled configurationContext
+
+ Map<String, String> schemaVariables = [:]
+
+ byte[] contentBytes = '''Symantec|Symantec Endpoint Protection Mobile |4.1.0|incident_opened|Secure Traffic Decryption|10|externalId=123456789 deviceExternalId=987654321 cn1=123456 [email protected] cs2=Network threats cs3=AIRPORT FREE WIFI msg=The "AIRPORT FREE WIFI" hotspot has attempted to decrypt secure communications cs4=iOS cs5=Device tagged as compliant cs6=Trojan src=172.31.20.236 shost=ip-172-31-20-236 end=2016-02-13 06:16:02 UTC'''.bytes
+
+ InputStream inStream = new ByteArrayInputStream(contentBytes)
+
+ RecordReader recordReader = recordReaderFactory.createRecordReader(schemaVariables, inStream, contentBytes.length, logger)
+ assertNotNull(recordReader)
+ recordReader.nextRecord()
}
class MockScriptedReader extends ScriptedReader implements AccessibleScriptingComponentHelper {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment