This NiFi flow template illustrates how incoming FlowFile attributes are carried to the InvokeHTTP output FlowFile.
| <?xml version="1.0" encoding="UTF-8" standalone="yes"?> | |
| <template encoding-version="1.2"> | |
| <description></description> | |
| <groupId>39379f66-0167-1000-9951-3cf7c004e310</groupId> | |
| <name>Merge XML Records</name> | |
| <snippet> | |
| <controllerServices> | |
| <id>36c4d83a-ff47-38e2-0000-000000000000</id> | |
| <parentGroupId>376efa9a-48fc-3e3d-0000-000000000000</parentGroupId> | |
| <bundle> |
| <?xml version="1.0" encoding="UTF-8" standalone="yes"?> | |
| <template encoding-version="1.2"> | |
| <description></description> | |
| <groupId>29cd7683-0167-1000-0886-c9dc91c022a5</groupId> | |
| <name>AddTimestamp</name> | |
| <snippet> | |
| <connections> | |
| <id>09785868-682a-3058-0000-000000000000</id> | |
| <parentGroupId>818236f9-e91f-324b-0000-000000000000</parentGroupId> | |
| <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold> |
| #!/usr/bin/env bash | |
| # Launch Centos/RHEL 7 Vm with at least 4 cores / 16Gb mem / 60Gb disk | |
| # Then run: | |
| # curl -sSL https://gist.github.com/abajwa-hw/b5565d7e7f9beffd8dd57a970dc54266/raw | sudo -E sh | |
| export ambari_password=${ambari_password:-StrongPassword} | |
| export db_password=${db_password:-StrongPassword} | |
| export nifi_password=${nifi_password:-StrongPassword} | |
| export ambari_services="ZOOKEEPER STREAMLINE NIFI KAFKA STORM REGISTRY NIFI_REGISTRY KNOX AMBARI_METRICS" | |
| export cluster_name=${cluster_name:-hdf} |
| #!/usr/bin/env bash | |
| # Launch Centos/RHEL 7 VM with at least 8 vcpu / 32Gb+ memory / 100Gb disk | |
| # Then run: | |
| # curl -sSL https://gist.github.com/abajwa-hw/66c62bc860a47dfb0de53dfe5cbb4415/raw | sudo -E sh | |
| export create_image=${create_image:-false} | |
| export ambari_version=2.7.1.0 | |
| export mpack_url="http://public-repo-1.hortonworks.com/HDF/amazonlinux2/3.x/updates/3.3.0.0/tars/hdf_ambari_mp/hdf-ambari-mpack-3.3.0.0-165.tar.gz" | |
| export hdf_vdf="http://s3.amazonaws.com/public-repo-1.hortonworks.com/HDF/centos7/3.x/updates/3.3.0.0/HDF-3.3.0.0-165.xml" |
| <?xml version="1.0" encoding="UTF-8" standalone="yes"?> | |
| <template encoding-version="1.2"> | |
| <description></description> | |
| <groupId>8927f4c0-0160-1000-597a-ea764ccd81a7</groupId> | |
| <name>MDD</name> | |
| <snippet> | |
| <connections> | |
| <id>a2098494-cce9-3fa4-0000-000000000000</id> | |
| <parentGroupId>a8352767-434f-3321-0000-000000000000</parentGroupId> | |
| <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold> |
| <?xml version="1.0" encoding="UTF-8" standalone="yes"?> | |
| <template encoding-version="1.2"> | |
| <description></description> | |
| <groupId>8927f4c0-0160-1000-597a-ea764ccd81a7</groupId> | |
| <name>MDD</name> | |
| <snippet> | |
| <connections> | |
| <id>a2098494-cce9-3fa4-0000-000000000000</id> | |
| <parentGroupId>a8352767-434f-3321-0000-000000000000</parentGroupId> | |
| <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold> |
| package com.operative.pipelinetracker.controller; | |
| import java.util.HashMap; | |
| import java.util.Map; | |
| import java.util.Properties; | |
| import org.apache.kafka.clients.consumer.KafkaConsumer; | |
| import org.apache.kafka.clients.consumer.OffsetAndMetadata; | |
| import org.apache.kafka.common.TopicPartition; | |
| import org.apache.kafka.common.serialization.StringDeserializer; |
Apache NiFi is a data flow tool that is focused on moving data between systems.
NiFi's focus is on capabilities like visual command and control, filtering of data, enrichment of data, data provenance, and security, just to name a few. With NiFi, you aren't writing code and deploying it as a job, you are building a living data flow through the UI that is taking effect with each action.
Data flow tool is often complimentary and used to manage the flow of data from the sources to the streaming processing platforms, such as SAS ESP.
Event stream processing applications typically perform real-time analytics on streams of events. These streams are continuously published into an event stream processing engine. Typical use cases for event stream processing include but are not limited to the following:
- sensor data monitoring and management
- operational systems monitoring and management
This example flow can be used to process files with following requirements:
-
A group of files can only be processed when every files for a specific group is ready
-
Each filename has groupId (e.g. 123_456) and a type name (e.g. ab/cd/ef)
-
Example set of files for group '123_456'
- file_123_456_ab.ex1
- file_123_456_cd.ex1
- file_123_456_ef.ex1
-
file_123_456.ex2