Created
April 25, 2017 15:57
-
-
Save mattyb149/24b20977b411f5c40e82cf08d961b8a0 to your computer and use it in GitHub Desktop.
A NiFi 1.2.0 template with sample data and script to illustrate the use of ScriptedReader
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" ?> | |
<template encoding-version="1.1"> | |
<description></description> | |
<groupId>a5a9dae4-015b-1000-7a1c-eaf2db85b188</groupId> | |
<name>ScriptedReaderTest</name> | |
<snippet> | |
<controllerServices> | |
<id>bb8e2ece-1712-3f89-0000-000000000000</id> | |
<parentGroupId>f3e4227c-c306-3c57-0000-000000000000</parentGroupId> | |
<bundle> | |
<artifact>nifi-record-serialization-services-nar</artifact> | |
<group>org.apache.nifi</group> | |
<version>1.2.0-SNAPSHOT</version> | |
</bundle> | |
<comments></comments> | |
<descriptors> | |
<entry> | |
<key>Schema Write Strategy</key> | |
<value> | |
<name>Schema Write Strategy</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Schema Access Strategy</key> | |
<value> | |
<name>Schema Access Strategy</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Schema Registry</key> | |
<value> | |
<identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService> | |
<name>Schema Registry</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Schema Name</key> | |
<value> | |
<name>Schema Name</name> | |
</value> | |
</entry> | |
<entry> | |
<key>schema-text</key> | |
<value> | |
<name>schema-text</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Date Format</key> | |
<value> | |
<name>Date Format</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Time Format</key> | |
<value> | |
<name>Time Format</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Timestamp Format</key> | |
<value> | |
<name>Timestamp Format</name> | |
</value> | |
</entry> | |
<entry> | |
<key>CSV Format</key> | |
<value> | |
<name>CSV Format</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Value Separator</key> | |
<value> | |
<name>Value Separator</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Include Header Line</key> | |
<value> | |
<name>Include Header Line</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Quote Character</key> | |
<value> | |
<name>Quote Character</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Escape Character</key> | |
<value> | |
<name>Escape Character</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Comment Marker</key> | |
<value> | |
<name>Comment Marker</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Null String</key> | |
<value> | |
<name>Null String</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Trim Fields</key> | |
<value> | |
<name>Trim Fields</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Quote Mode</key> | |
<value> | |
<name>Quote Mode</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Record Separator</key> | |
<value> | |
<name>Record Separator</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Include Trailing Delimiter</key> | |
<value> | |
<name>Include Trailing Delimiter</name> | |
</value> | |
</entry> | |
</descriptors> | |
<name>CSVRecordSetWriter</name> | |
<persistsState>false</persistsState> | |
<properties> | |
<entry> | |
<key>Schema Write Strategy</key> | |
<value>full-schema-attribute</value> | |
</entry> | |
<entry> | |
<key>Schema Access Strategy</key> | |
<value>schema-name</value> | |
</entry> | |
<entry> | |
<key>Schema Registry</key> | |
<value>16091e45-f03a-3b17-0000-000000000000</value> | |
</entry> | |
<entry> | |
<key>Schema Name</key> | |
<value>code</value> | |
</entry> | |
<entry> | |
<key>schema-text</key> | |
<value>${avro.schema}</value> | |
</entry> | |
<entry> | |
<key>Date Format</key> | |
<value>yyyy-MM-dd</value> | |
</entry> | |
<entry> | |
<key>Time Format</key> | |
<value>HH:mm:ss</value> | |
</entry> | |
<entry> | |
<key>Timestamp Format</key> | |
<value>yyyy-MM-dd HH:mm:ss</value> | |
</entry> | |
<entry> | |
<key>CSV Format</key> | |
<value>custom</value> | |
</entry> | |
<entry> | |
<key>Value Separator</key> | |
<value>,</value> | |
</entry> | |
<entry> | |
<key>Include Header Line</key> | |
<value>true</value> | |
</entry> | |
<entry> | |
<key>Quote Character</key> | |
<value>"</value> | |
</entry> | |
<entry> | |
<key>Escape Character</key> | |
<value>\</value> | |
</entry> | |
<entry> | |
<key>Comment Marker</key> | |
</entry> | |
<entry> | |
<key>Null String</key> | |
</entry> | |
<entry> | |
<key>Trim Fields</key> | |
<value>true</value> | |
</entry> | |
<entry> | |
<key>Quote Mode</key> | |
<value>MINIMAL</value> | |
</entry> | |
<entry> | |
<key>Record Separator</key> | |
<value>\n</value> | |
</entry> | |
<entry> | |
<key>Include Trailing Delimiter</key> | |
<value>false</value> | |
</entry> | |
</properties> | |
<state>ENABLED</state> | |
<type>org.apache.nifi.csv.CSVRecordSetWriter</type> | |
</controllerServices> | |
<controllerServices> | |
<id>db4be0ae-793d-3d98-0000-000000000000</id> | |
<parentGroupId>f3e4227c-c306-3c57-0000-000000000000</parentGroupId> | |
<bundle> | |
<artifact>nifi-scripting-nar</artifact> | |
<group>org.apache.nifi</group> | |
<version>1.2.0-SNAPSHOT</version> | |
</bundle> | |
<comments></comments> | |
<descriptors> | |
<entry> | |
<key>Script Engine</key> | |
<value> | |
<name>Script Engine</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Script File</key> | |
<value> | |
<name>Script File</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Script Body</key> | |
<value> | |
<name>Script Body</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Module Directory</key> | |
<value> | |
<name>Module Directory</name> | |
</value> | |
</entry> | |
</descriptors> | |
<name>ScriptedReader</name> | |
<persistsState>false</persistsState> | |
<properties> | |
<entry> | |
<key>Script Engine</key> | |
<value>Groovy</value> | |
</entry> | |
<entry> | |
<key>Script File</key> | |
</entry> | |
<entry> | |
<key>Script Body</key> | |
<value>import groovy.json.JsonSlurper | |
import org.apache.nifi.controller.* | |
import org.apache.nifi.flowfile.FlowFile | |
import org.apache.nifi.logging.ComponentLog | |
import org.apache.nifi.schema.access.SchemaNotFoundException | |
import org.apache.nifi.serialization.* | |
import org.apache.nifi.serialization.record.* | |
class GroovyXmlRecordReader implements RecordReader { | |
def recordIterator | |
def recordSchema | |
GroovyXmlRecordReader(final String recordTag, final RecordSchema schema, final InputStream inputStream) { | |
recordSchema = schema | |
def xml = new XmlSlurper().parse(inputStream) | |
// Change the XML fields to a MapRecord for each incoming record | |
recordIterator = xml[recordTag].collect {r -> | |
// Create a map of field names to values, using the field names from the schema as keys into the XML object | |
def fields = recordSchema.fieldNames.inject([:]) {result, fieldName -> | |
result[fieldName] = r[fieldName].toString() | |
result | |
} | |
new MapRecord(recordSchema, fields) | |
}.iterator() | |
} | |
Record nextRecord() throws IOException, MalformedRecordException { | |
return recordIterator?.hasNext() ? recordIterator.next() : null | |
} | |
RecordSchema getSchema() throws MalformedRecordException { | |
return recordSchema | |
} | |
void close() throws IOException { } | |
} | |
class GroovyXmlRecordReaderFactory extends AbstractControllerService implements RecordReaderFactory { | |
// Will be set by the ScriptedRecordReaderFactory | |
ConfigurationContext configurationContext | |
RecordReader createRecordReader(FlowFile flowFile, InputStream inputStream, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { | |
// Expecting 'schema.text' to have be an JSON array full of objects whose keys are the field name and the value maps to a RecordFieldType | |
def schemaText = flowFile.getAttribute('schema.text') | |
if (!schemaText) throw new IOException('No schema set in schema.text') | |
def jsonSchema = new JsonSlurper().parseText(schemaText) | |
def recordSchema = new SimpleRecordSchema(jsonSchema.collect {field -> | |
def entry = field.entrySet()[0] | |
new RecordField(entry.key, RecordFieldType.of(entry.value).dataType) | |
} as List<RecordField>) | |
return new GroovyXmlRecordReader(flowFile.getAttribute('record.tag'), recordSchema, inputStream) | |
} | |
} | |
// Create an instance of RecordReaderFactory called "writer", this is the entry point for ScriptedReader | |
reader = new GroovyXmlRecordReaderFactory() | |
</value> | |
</entry> | |
<entry> | |
<key>Module Directory</key> | |
</entry> | |
</properties> | |
<state>ENABLED</state> | |
<type>org.apache.nifi.record.script.ScriptedReader</type> | |
</controllerServices> | |
<controllerServices> | |
<id>16091e45-f03a-3b17-0000-000000000000</id> | |
<parentGroupId>f3e4227c-c306-3c57-0000-000000000000</parentGroupId> | |
<bundle> | |
<artifact>nifi-registry-nar</artifact> | |
<group>org.apache.nifi</group> | |
<version>1.2.0-SNAPSHOT</version> | |
</bundle> | |
<comments></comments> | |
<descriptors> | |
<entry> | |
<key>code</key> | |
<value> | |
<name>code</name> | |
</value> | |
</entry> | |
<entry> | |
<key>name</key> | |
<value> | |
<name>name</name> | |
</value> | |
</entry> | |
<entry> | |
<key>user</key> | |
<value> | |
<name>user</name> | |
</value> | |
</entry> | |
</descriptors> | |
<name>AvroSchemaRegistry</name> | |
<persistsState>false</persistsState> | |
<properties> | |
<entry> | |
<key>code</key> | |
<value>{ | |
"type": "record", | |
"name": "name", | |
"fields" : [ | |
{"name": "id", "type": "long"}, | |
{"name": "name", "type": ["null", "string"]}, | |
{"name": "code", "type": "long"} | |
] | |
}</value> | |
</entry> | |
<entry> | |
<key>name</key> | |
<value>{ | |
"type": "record", | |
"name": "name", | |
"fields" : [ | |
{"name": "id", "type": "long"}, | |
{"name": "name", "type": ["null", "string"]} | |
] | |
}</value> | |
</entry> | |
<entry> | |
<key>user</key> | |
<value>{ | |
"type": "record", | |
"name": "UserRecord", | |
"fields" : [ | |
{"name": "id", "type": "long"}, | |
{"name": "title", "type": ["null", "string"]}, | |
{"name": "first", "type": ["null", "string"]}, | |
{"name": "last", "type": ["null", "string"]}, | |
{"name": "street", "type": ["null", "string"]}, | |
{"name": "city", "type": ["null", "string"]}, | |
{"name": "state", "type": ["null", "string"]}, | |
{"name": "zip", "type": ["null", "string"]}, | |
{"name": "gender", "type": ["null", "string"]}, | |
{"name": "email", "type": ["null", "string"]}, | |
{"name": "username", "type": ["null", "string"]}, | |
{"name": "password", "type": ["null", "string"]}, | |
{"name": "phone", "type": ["null", "string"]}, | |
{"name": "cell", "type": ["null", "string"]}, | |
{"name": "ssn", "type": ["null", "string"]}, | |
{"name": "date_of_birth", "type": ["null", "string"]}, | |
{"name": "reg_date", "type": ["null", "string"]}, | |
{"name": "large", "type": ["null", "string"]}, | |
{"name": "medium", "type": ["null", "string"]}, | |
{"name": "thumbnail", "type": ["null", "string"]}, | |
{"name": "version", "type": ["null", "string"]}, | |
{"name": "nationality", "type": ["null", "string"]} | |
] | |
}</value> | |
</entry> | |
</properties> | |
<state>ENABLED</state> | |
<type>org.apache.nifi.schemaregistry.services.AvroSchemaRegistry</type> | |
</controllerServices> | |
<processors> | |
<id>a359e14e-325b-3731-0000-000000000000</id> | |
<parentGroupId>f3e4227c-c306-3c57-0000-000000000000</parentGroupId> | |
<position> | |
<x>679.5938453247036</x> | |
<y>248.81901560117888</y> | |
</position> | |
<bundle> | |
<artifact>nifi-standard-nar</artifact> | |
<group>org.apache.nifi</group> | |
<version>1.2.0-SNAPSHOT</version> | |
</bundle> | |
<config> | |
<bulletinLevel>WARN</bulletinLevel> | |
<comments></comments> | |
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount> | |
<descriptors> | |
<entry> | |
<key>record-reader</key> | |
<value> | |
<identifiesControllerService>org.apache.nifi.serialization.RecordReaderFactory</identifiesControllerService> | |
<name>record-reader</name> | |
</value> | |
</entry> | |
<entry> | |
<key>record-writer</key> | |
<value> | |
<identifiesControllerService>org.apache.nifi.serialization.RecordSetWriterFactory</identifiesControllerService> | |
<name>record-writer</name> | |
</value> | |
</entry> | |
</descriptors> | |
<executionNode>ALL</executionNode> | |
<lossTolerant>false</lossTolerant> | |
<penaltyDuration>30 sec</penaltyDuration> | |
<properties> | |
<entry> | |
<key>record-reader</key> | |
<value>db4be0ae-793d-3d98-0000-000000000000</value> | |
</entry> | |
<entry> | |
<key>record-writer</key> | |
<value>bb8e2ece-1712-3f89-0000-000000000000</value> | |
</entry> | |
</properties> | |
<runDurationMillis>0</runDurationMillis> | |
<schedulingPeriod>0 sec</schedulingPeriod> | |
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> | |
<yieldDuration>1 sec</yieldDuration> | |
</config> | |
<name>ConvertRecord</name> | |
<relationships> | |
<autoTerminate>true</autoTerminate> | |
<name>failure</name> | |
</relationships> | |
<relationships> | |
<autoTerminate>false</autoTerminate> | |
<name>success</name> | |
</relationships> | |
<style></style> | |
<type>org.apache.nifi.processors.standard.ConvertRecord</type> | |
</processors> | |
<processors> | |
<id>ec5ae3df-0b62-3112-0000-000000000000</id> | |
<parentGroupId>f3e4227c-c306-3c57-0000-000000000000</parentGroupId> | |
<position> | |
<x>0.0</x> | |
<y>949.3994791857035</y> | |
</position> | |
<bundle> | |
<artifact>nifi-standard-nar</artifact> | |
<group>org.apache.nifi</group> | |
<version>1.2.0-SNAPSHOT</version> | |
</bundle> | |
<config> | |
<bulletinLevel>WARN</bulletinLevel> | |
<comments></comments> | |
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount> | |
<descriptors> | |
<entry> | |
<key>Log Level</key> | |
<value> | |
<name>Log Level</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Log Payload</key> | |
<value> | |
<name>Log Payload</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Attributes to Log</key> | |
<value> | |
<name>Attributes to Log</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Attributes to Ignore</key> | |
<value> | |
<name>Attributes to Ignore</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Log prefix</key> | |
<value> | |
<name>Log prefix</name> | |
</value> | |
</entry> | |
</descriptors> | |
<executionNode>ALL</executionNode> | |
<lossTolerant>false</lossTolerant> | |
<penaltyDuration>30 sec</penaltyDuration> | |
<properties> | |
<entry> | |
<key>Log Level</key> | |
<value>info</value> | |
</entry> | |
<entry> | |
<key>Log Payload</key> | |
<value>false</value> | |
</entry> | |
<entry> | |
<key>Attributes to Log</key> | |
</entry> | |
<entry> | |
<key>Attributes to Ignore</key> | |
</entry> | |
<entry> | |
<key>Log prefix</key> | |
</entry> | |
</properties> | |
<runDurationMillis>0</runDurationMillis> | |
<schedulingPeriod>0 sec</schedulingPeriod> | |
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> | |
<yieldDuration>1 sec</yieldDuration> | |
</config> | |
<name>LogAttribute</name> | |
<relationships> | |
<autoTerminate>false</autoTerminate> | |
<name>success</name> | |
</relationships> | |
<style></style> | |
<type>org.apache.nifi.processors.standard.LogAttribute</type> | |
</processors> | |
<processors> | |
<id>6982bc85-a7b2-3400-0000-000000000000</id> | |
<parentGroupId>f3e4227c-c306-3c57-0000-000000000000</parentGroupId> | |
<position> | |
<x>672.3773977059673</x> | |
<y>0.0</y> | |
</position> | |
<bundle> | |
<artifact>nifi-standard-nar</artifact> | |
<group>org.apache.nifi</group> | |
<version>1.2.0-SNAPSHOT</version> | |
</bundle> | |
<config> | |
<bulletinLevel>WARN</bulletinLevel> | |
<comments></comments> | |
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount> | |
<descriptors> | |
<entry> | |
<key>File Size</key> | |
<value> | |
<name>File Size</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Batch Size</key> | |
<value> | |
<name>Batch Size</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Data Format</key> | |
<value> | |
<name>Data Format</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Unique FlowFiles</key> | |
<value> | |
<name>Unique FlowFiles</name> | |
</value> | |
</entry> | |
<entry> | |
<key>generate-ff-custom-text</key> | |
<value> | |
<name>generate-ff-custom-text</name> | |
</value> | |
</entry> | |
<entry> | |
<key>record.tag</key> | |
<value> | |
<name>record.tag</name> | |
</value> | |
</entry> | |
<entry> | |
<key>schema.text</key> | |
<value> | |
<name>schema.text</name> | |
</value> | |
</entry> | |
</descriptors> | |
<executionNode>ALL</executionNode> | |
<lossTolerant>false</lossTolerant> | |
<penaltyDuration>30 sec</penaltyDuration> | |
<properties> | |
<entry> | |
<key>File Size</key> | |
<value>0B</value> | |
</entry> | |
<entry> | |
<key>Batch Size</key> | |
<value>1</value> | |
</entry> | |
<entry> | |
<key>Data Format</key> | |
<value>Text</value> | |
</entry> | |
<entry> | |
<key>Unique FlowFiles</key> | |
<value>false</value> | |
</entry> | |
<entry> | |
<key>generate-ff-custom-text</key> | |
<value><root> | |
<myRecord> | |
<id>1</id> | |
<name>John</name> | |
<code>100</code> | |
</myRecord> | |
<myRecord> | |
<id>2</id> | |
<name>Mary</name> | |
<code>200</code> | |
</myRecord> | |
<myRecord> | |
<id>3</id> | |
<name>Ramon</name> | |
<code>300</code> | |
</myRecord> | |
</root></value> | |
</entry> | |
<entry> | |
<key>record.tag</key> | |
<value>myRecord</value> | |
</entry> | |
<entry> | |
<key>schema.text</key> | |
<value>[ | |
{"id": "int"}, | |
{"name": "string"}, | |
{"code": "int"} | |
]</value> | |
</entry> | |
</properties> | |
<runDurationMillis>0</runDurationMillis> | |
<schedulingPeriod>30 sec</schedulingPeriod> | |
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> | |
<yieldDuration>1 sec</yieldDuration> | |
</config> | |
<name>GenerateFlowFile</name> | |
<relationships> | |
<autoTerminate>false</autoTerminate> | |
<name>success</name> | |
</relationships> | |
<style></style> | |
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type> | |
</processors> | |
</snippet> | |
<timestamp>04/25/2017 11:56:16 EDT</timestamp> | |
</template> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment