Last active
November 12, 2020 14:56
-
-
Save pipern/2d67fbc0a4225cf1b42d1fff832c2c54 to your computer and use it in GitHub Desktop.
NiFi CEF record reader
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
/* Author: Nick Piper / CGI Inc. 2020-11-12 */ | |
/* | |
Can test with | |
mvn -T 2.0C -Dtest=org.apache.nifi.record.script.ScriptedReaderTest test | |
Requires java libraries to be provided in 'modules' folder, which can | |
be taken from nifi-standard-nar-1.11.4.nar | |
bval-core-1.1.2.jar | |
bval-jsr-1.1.2.jar | |
javax.el-api-3.0.0.jar | |
macnificent-0.2.0.jar | |
ParCEFone-1.2.6.jar | |
validation-api-1.1.0.Final.jar | |
*/ | |
import org.apache.nifi.controller.AbstractControllerService | |
import org.apache.nifi.controller.ConfigurationContext | |
import org.apache.nifi.logging.ComponentLog | |
import org.apache.nifi.schema.access.SchemaNotFoundException | |
import org.apache.nifi.serialization.MalformedRecordException | |
import org.apache.nifi.serialization.RecordReader | |
import org.apache.nifi.serialization.RecordReaderFactory | |
import org.apache.nifi.serialization.SimpleRecordSchema | |
import org.apache.nifi.serialization.record.MapRecord | |
import org.apache.nifi.serialization.record.Record | |
import org.apache.nifi.serialization.record.RecordField | |
import org.apache.nifi.serialization.record.RecordFieldType | |
import org.apache.nifi.serialization.record.RecordSchema | |
import com.fluenda.parcefone.event.CEFHandlingException | |
import com.fluenda.parcefone.event.CommonEvent | |
import com.fluenda.parcefone.parser.CEFParser | |
import javax.validation.Validation | |
import javax.validation.Validator | |
class GroovyCEFRecordReader implements RecordReader { | |
def bufferedreader | |
def parser | |
def recordSchema = new SimpleRecordSchema( | |
[new RecordField('version', RecordFieldType.INT.dataType), | |
new RecordField('deviceVendor', RecordFieldType.STRING.dataType), | |
new RecordField('deviceProduct', RecordFieldType.STRING.dataType), | |
new RecordField('deviceVersion', RecordFieldType.STRING.dataType), | |
new RecordField('deviceEventClassId', RecordFieldType.STRING.dataType), | |
new RecordField('name', RecordFieldType.STRING.dataType), | |
new RecordField('severity', RecordFieldType.STRING.dataType), | |
new RecordField('extension', RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.dataType))] | |
) | |
GroovyCEFRecordReader(final InputStream inputStream) { | |
final Validator validator = Validation.buildDefaultValidatorFactory().getValidator(); | |
bufferedreader = new BufferedReader(new InputStreamReader(inputStream)); | |
parser = new CEFParser(validator) | |
} | |
Record nextRecord(boolean coerceTypes, boolean dropUnknown) throws IOException, MalformedRecordException { | |
def result | |
if(bufferedreader.ready()) { | |
try { | |
result = parser.parse(bufferedreader.readLine(), true) | |
if (!result) { | |
throw new MalformedRecordException("Failed to parse line as CEF"); | |
} | |
return new MapRecord(recordSchema, | |
['version': 0, | |
'deviceVendor': result.getHeader().get("deviceVendor"), | |
'deviceProduct': result.getHeader().get("deviceProduct"), | |
'deviceVersion': result.getHeader().get("deviceVersion"), | |
'deviceEventClassId': result.getHeader().get("deviceEventClassId"), | |
'name': result.getHeader().get("name"), | |
'severity': result.getHeader().get("severity"), | |
'extension': result.getExtension(true)]) | |
} catch (final java.lang.NumberFormatException e) { | |
// TODO why does parser.parse() sometimes through this? Reading through it's implementation implies it returns null | |
throw new MalformedRecordException("Failed to parse line as CEF", e); | |
} | |
} else { | |
// bufferedreader not ready | |
return null | |
} | |
} | |
RecordSchema getSchema() throws MalformedRecordException { | |
return recordSchema | |
} | |
void close() throws IOException { | |
bufferedreader.close() | |
} | |
} | |
class GroovyCEFRecordReaderFactory extends AbstractControllerService implements RecordReaderFactory { | |
ConfigurationContext configurationContext | |
RecordReader createRecordReader(Map<String, String> variables, InputStream inputStream, long inputLength, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { | |
return new GroovyCEFRecordReader(inputStream) | |
} | |
} | |
reader = new GroovyCEFRecordReaderFactory() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedReaderTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedReaderTest.groovy | |
index 8be0b6f2d6..8c1244e961 100644 | |
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedReaderTest.groovy | |
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedReaderTest.groovy | |
@@ -26,6 +26,7 @@ import org.apache.nifi.processors.script.AccessibleScriptingComponentHelper | |
import org.apache.nifi.script.ScriptingComponentHelper | |
import org.apache.nifi.script.ScriptingComponentUtils | |
import org.apache.nifi.serialization.RecordReader | |
+import org.apache.nifi.serialization.MalformedRecordException | |
import org.apache.nifi.util.MockComponentLog | |
import org.apache.nifi.util.MockPropertyValue | |
import org.apache.nifi.util.TestRunners | |
@@ -190,6 +191,98 @@ class ScriptedReaderTest { | |
} | |
assertNull(recordReader.nextRecord()) | |
+ | |
+ } | |
+ | |
+ @Test | |
+ void testCEFRecordReaderGroovyScript() { | |
+ | |
+ def properties = [:] as Map<PropertyDescriptor, String> | |
+ recordReaderFactory.getSupportedPropertyDescriptors().each {PropertyDescriptor descriptor -> | |
+ properties.put(descriptor, descriptor.getDefaultValue()) | |
+ } | |
+ | |
+ // Mock the ConfigurationContext for setup(...) | |
+ def configurationContext = mock(ConfigurationContext) | |
+ when(configurationContext.getProperties()).thenReturn(properties) | |
+ when(configurationContext.getProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE)) | |
+ .thenReturn(new MockPropertyValue('Groovy')) | |
+ when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_FILE)) | |
+ .thenReturn(new MockPropertyValue('target/test/resources/groovy/record_reader_cef.groovy')) | |
+ when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_BODY)) | |
+ .thenReturn(new MockPropertyValue(null)) | |
+ when(configurationContext.getProperty(ScriptingComponentUtils.MODULES)) | |
+ .thenReturn(new MockPropertyValue('target/test/resources/groovy/modules')) | |
+ | |
+ def logger = new MockComponentLog('ScriptedReader', '') | |
+ def initContext = mock(ControllerServiceInitializationContext) | |
+ when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()) | |
+ when(initContext.getLogger()).thenReturn(logger) | |
+ | |
+ recordReaderFactory.initialize initContext | |
+ recordReaderFactory.onEnabled configurationContext | |
+ | |
+ Map<String, String> schemaVariables = [:] | |
+ | |
+ byte[] contentBytes = '''CEF:0|Symantec|Symantec Endpoint Protection Mobile |4.1.0|incident_opened|Secure Traffic Decryption|10|externalId=123456789 deviceExternalId=987654321 cn1=123456 [email protected] cs2=Network threats cs3=AIRPORT FREE WIFI msg=The "AIRPORT FREE WIFI" hotspot has attempted to decrypt secure communications cs4=iOS cs5=Device tagged as compliant cs6=Trojan src=172.31.20.236 shost=ip-172-31-20-236 end=2016-02-13 06:16:02 UTC | |
+CEF:0|Symantec|Symantec Endpoint Protection Mobile |4.2.0|incident_opened|Secure Traffic Decryption|10|externalId=123456789 deviceExternalId=987654321 cn1=123456 [email protected] cs2=Network threats cs3=AIRPORT FREE WIFI msg=The "AIRPORT FREE WIFI" hotspot has attempted to decrypt secure communications cs4=iOS cs5=Device tagged as compliant cs6=Trojan src=172.31.20.236 shost=ip-172-31-20-236 end=2016-02-13 06:16:02 UTC | |
+CEF:0|Symantec|Symantec Endpoint Protection Mobile |4.3.0|incident_opened|Secure Traffic Decryption|10|externalId=123456789 deviceExternalId=987654321 cn1=123456 [email protected] cs2=Network threats cs3=AIRPORT FREE WIFI msg=The "AIRPORT FREE WIFI" hotspot has attempted to decrypt secure communications cs4=iOS cs5=Device tagged as compliant cs6=Trojan src=172.31.20.236 shost=ip-172-31-20-236 end=2016-02-13 06:16:02 UTC | |
+CEF:0|Symantec|Symantec Endpoint Protection Mobile |4.4.0|incident_opened|Secure Traffic Decryption|10|externalId=123456789 deviceExternalId=987654321 cn1=123456 [email protected] cs2=Network threats cs3=AIRPORT FREE WIFI msg=The "AIRPORT FREE WIFI" hotspot has attempted to decrypt secure communications cs4=iOS cs5=Device tagged as compliant cs6=Trojan src=172.31.20.236 shost=ip-172-31-20-236 end=2016-02-13 06:16:02 UTC | |
+'''.bytes | |
+ | |
+ InputStream inStream = new ByteArrayInputStream(contentBytes) | |
+ | |
+ RecordReader recordReader = recordReaderFactory.createRecordReader(schemaVariables, inStream, contentBytes.length, logger) | |
+ assertNotNull(recordReader) | |
+ | |
+ 4.times { | |
+ def record = recordReader.nextRecord() | |
+ assertNotNull(record) | |
+ assertEquals(0, record.getAsInt('version')) | |
+ assertEquals("Symantec", record.getAsString('deviceVendor')) | |
+ // assertEquals("iOS", record.getAsString('extension/cs4')) | |
+ } | |
+ assertNull(recordReader.nextRecord()) | |
+ | |
+ } | |
+ | |
+ @Test(expected = MalformedRecordException.class) | |
+ void testCEFRecordReaderGroovyScriptMalformed() { | |
+ | |
+ def properties = [:] as Map<PropertyDescriptor, String> | |
+ recordReaderFactory.getSupportedPropertyDescriptors().each {PropertyDescriptor descriptor -> | |
+ properties.put(descriptor, descriptor.getDefaultValue()) | |
+ } | |
+ | |
+ // Mock the ConfigurationContext for setup(...) | |
+ def configurationContext = mock(ConfigurationContext) | |
+ when(configurationContext.getProperties()).thenReturn(properties) | |
+ when(configurationContext.getProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE)) | |
+ .thenReturn(new MockPropertyValue('Groovy')) | |
+ when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_FILE)) | |
+ .thenReturn(new MockPropertyValue('target/test/resources/groovy/record_reader_cef.groovy')) | |
+ when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_BODY)) | |
+ .thenReturn(new MockPropertyValue(null)) | |
+ when(configurationContext.getProperty(ScriptingComponentUtils.MODULES)) | |
+ .thenReturn(new MockPropertyValue('target/test/resources/groovy/modules')) | |
+ | |
+ def logger = new MockComponentLog('ScriptedReader', '') | |
+ def initContext = mock(ControllerServiceInitializationContext) | |
+ when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()) | |
+ when(initContext.getLogger()).thenReturn(logger) | |
+ | |
+ recordReaderFactory.initialize initContext | |
+ recordReaderFactory.onEnabled configurationContext | |
+ | |
+ Map<String, String> schemaVariables = [:] | |
+ | |
+ byte[] contentBytes = '''Symantec|Symantec Endpoint Protection Mobile |4.1.0|incident_opened|Secure Traffic Decryption|10|externalId=123456789 deviceExternalId=987654321 cn1=123456 [email protected] cs2=Network threats cs3=AIRPORT FREE WIFI msg=The "AIRPORT FREE WIFI" hotspot has attempted to decrypt secure communications cs4=iOS cs5=Device tagged as compliant cs6=Trojan src=172.31.20.236 shost=ip-172-31-20-236 end=2016-02-13 06:16:02 UTC'''.bytes | |
+ | |
+ InputStream inStream = new ByteArrayInputStream(contentBytes) | |
+ | |
+ RecordReader recordReader = recordReaderFactory.createRecordReader(schemaVariables, inStream, contentBytes.length, logger) | |
+ assertNotNull(recordReader) | |
+ recordReader.nextRecord() | |
} | |
class MockScriptedReader extends ScriptedReader implements AccessibleScriptingComponentHelper { |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment