Skip to content

Instantly share code, notes, and snippets.

@fabiojose
Last active March 6, 2018 19:12
Show Gist options
  • Save fabiojose/6cb168b8bf9720dee12882b18deee136 to your computer and use it in GitHub Desktop.
Save fabiojose/6cb168b8bf9720dee12882b18deee136 to your computer and use it in GitHub Desktop.
Apache NIFI - ExecuteScript - Merge JSON
import org.apache.nifi.processor.FlowFileFilter
import groovy.json.JsonSlurper
import groovy.json.JsonBuilder
//get first flow file
def ff0 = session.get()
if(!ff0)return
//get project_name
def project_name = ff0.getAttribute('project_name')
//get build_number
def build_number = ff0.getAttribute('build_number')
log.info(project_name)
log.info(build_number)
//try to find file with same attributes
def metrics = session.get(new FlowFileFilter(){
public FlowFileFilterResult filter(FlowFile ff){
if(project_name == ff.getAttribute('project_name') && build_number == ff.getAttribute('build_number')){
return FlowFileFilterResult.ACCEPT_AND_CONTINUE
}
return FlowFileFilterResult.REJECT_AND_CONTINUE
}
})
if(!metrics || metrics.size() != 1){
log.error('More than one flowfile found: ' + metrics.size())
session.rollback(true)
return
}
def metricNew = [:]
def slurper = new JsonSlurper()
session.read(ff0).withStream{inputStreamA ->
def metricA = slurper.parse(inputStreamA)
// -Begin- Reading the metric b //
session.read(metrics[0]).withStream{inputStreamB ->
def metricB = slurper.parse(inputStreamB)
metricNew.putAll(metricB)
// -Begin- Merging metrics a and b into metricNew variable //
def keysA = metricA.keySet() as List
keysA.each{key->
metricNew.put(key, metricA[key])
}
// -End- Merging metrics a and b //
}
// -End- Reading the metric b //
}
// Write the new flowfile with merged json //
def ffOut = session.create()
ffOut = session.write(ffOut, {outputStream ->
outputStream.withWriter("UTF-8"){writer->
new JsonBuilder(metricNew).writeTo(writer)
}
} as OutputStreamCallback )
ffOut = session.putAttribute(ffOut, "mime.type", "application/json")
session.remove(ff0)
session.remove(metrics)
session.transfer(ffOut, REL_SUCCESS)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment