Created
December 24, 2021 19:29
-
-
Save pvillard31/79ccd8f3f8d2f0a69476e5f0e63fc131 to your computer and use it in GitHub Desktop.
Script to be used with the NiFi Registry Event Script Hook for automatic sync with the Cloudera DataFlow Catalog
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/sh | |
set -e | |
# This script is used to automatically sync new flows into the DataFlow Catalog | |
# It assumes that the CDP CLI has been installed first. See: | |
# https://docs.cloudera.com/dataflow/cloud/cli/index.html | |
# It also assumes that the NiFi CLI has been properly configured to interact with the NiFi Registry instance. See: | |
# https://nifi.apache.org/docs/nifi-docs/html/toolkit-guide.html#nifi_CLI | |
NIFICLI="/opt/cloudera/parcels/CFM/TOOLKIT/bin/cli.sh" | |
CDPCLI="/hadoopfs/fs1/working-dir/cdpclienv/bin/cdp" | |
MAPPING="/hadoopfs/fs1/working-dir/scripts/mapping.txt" | |
# CREATE_FLOW_VERSION is the event when a new version of a flow is committed | |
if [ "$1" = "CREATE_FLOW_VERSION" ]; then | |
BUCKETID="$2" | |
FLOWID="$3" | |
FLOWVERSION="$4" | |
# we export the flow definition using the NiFi CLI as a JSON file: | |
$NIFICLI registry export-flow-version --flowIdentifier $FLOWID --flowVersion $FLOWVERSION > /tmp/flow.json | |
FLOWNAME=`jq -r '.flowContents.name' /tmp/flow.json` | |
FLOWDESCRIPTION=`$NIFICLI registry list-flows --bucketIdentifier $BUCKETID -ot json | jq '.[] | select(.identifier=="'$FLOWID'")' | jq -r '.description'` | |
FLOWVERSIONCOMMENT=`jq -r '.snapshotMetadata.comments' /tmp/flow.json` | |
# In order to know if a flow has already been sync'ed into the DataFlow Catalog, we keep track of | |
# what we're doing in a local file. | |
if [ $(grep "$FLOWID" "$MAPPING" | wc -l) -eq 1 ]; then | |
# The flow ID is already in our mapping file... so we're going to add a new version | |
# we retrieve the CRN from the mapping file | |
CRN=`grep "$FLOWID" "$MAPPING" | awk '{print $2}'` | |
# we add the new version into the DF Catalog | |
$CDPCLI df import-flow-definition-version --flow-crn "$CRN" --file /tmp/flow.json --comments "$FLOWVERSIONCOMMENT" | |
else | |
# The flow ID is not in our mapping file... We didn't sync it yet in the DF Catalog | |
# we import the flow into the DF Catalog for the first time | |
$CDPCLI df import-flow-definition --file /tmp/flow.json --name "$FLOWNAME" --description "$FLOWDESCRIPTION" --comments "$FLOWVERSIONCOMMENT" > /tmp/cdpinsert.json | |
# We extract the CRN of the newly added flow | |
CRN=`jq -r '.crn' /tmp/cdpinsert.json` | |
# we add the info into our flow mapping file | |
echo "$FLOWID $CRN" >> $MAPPING | |
fi | |
fi |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment