Skip to content

Instantly share code, notes, and snippets.

@gglanzani
Last active October 10, 2017 19:13
Show Gist options
  • Save gglanzani/708364f4d5288844fc63d692ebe47b51 to your computer and use it in GitHub Desktop.
Save gglanzani/708364f4d5288844fc63d692ebe47b51 to your computer and use it in GitHub Desktop.
"""
When deploying new jars used by NiFi's ExecuteScript processor, it is necessary to stop and restart all the
processors manually to have them pick up the new jars. This script solves this issue.
"""
import json
from urllib import request
from urllib import error
from typing import Dict, Iterator, List
from http.client import HTTPResponse
import asyncio
URL = "http://127.0.0.1:8080/nifi-api"
RETRY_SECONDS = 3
def get_execute_script_processors(group: Dict) -> Iterator[str]:
"Recursively walk the NiFi flows to get all ExecuteScript processors"
inners = (group.get('processGroupStatusSnapshot')
.get('processGroupStatusSnapshots'))
if inners:
for inner in inners:
yield from get_execute_script_processors(inner)
snapshots = (group.get('processGroupStatusSnapshot')
.get('processorStatusSnapshots'))
for snapshot in snapshots:
info = snapshot["processorStatusSnapshot"]
if info["type"] == 'ExecuteScript':
yield info["id"]
def get_processors_id(response: Dict) -> Iterator[str]:
"Yield ExecuteScript processors given NiFi API response"
groups = (response.get('processGroupStatus')
.get('aggregateSnapshot')
.get('processGroupStatusSnapshots'))
for group in groups:
yield from get_execute_script_processors(group)
def as_dict(response: HTTPResponse) -> Dict:
return json.loads(response.read().decode('utf-8'))
def get_processors_info(ids: Iterator[str]) -> List[Dict]:
"Get processor info by id using the NiFi API"
return [as_dict(request.urlopen("{}/processors/{}".format(URL, _id)))
for _id in ids]
def filter_groovy(processor_props: Dict) -> bool:
"Get all Groovy ExecuteScript processors with a filled Module Dir and currently running"
if (processor_props['component']['config']['properties']['Module Directory'] and
processor_props['component']['config']['properties']['Script Engine'].lower() == "groovy" and
processor_props['component']['state'].lower() == "running"):
return True
else:
return False
def pipe(data, *funcs):
"""
Pipe data through funcs, i.e
pipe(data, f,g,...) == (...(g(f(data)))
"""
for fun in (funcs):
data = fun(data)
return data
def post_payload(url: str, payload: Dict) -> Dict:
"Wrapper to easy posting JSONs to a URL"
req = request.Request(url, json.dumps(payload).encode('utf-8'))
req.add_header("Content-Type", 'application/json')
req.get_method = lambda: "PUT"
with request.urlopen(req) as response:
dict_response = json.loads(response.read().decode('utf-8'))
return dict_response
async def restart_processor(result: Dict, retry_secs: int):
"Safely restart a given processor"
_id = result['id']
payload = {"component": {"state": "STOPPED", "id": _id},
"id": _id,
"revision": result['revision']}
url = "{}/processors/{}".format(URL, _id)
# we need the response to get the revision
json_response = post_payload(url, payload)
try:
payload['component']['state'] = 'RUNNING'
payload['revision'] = json_response['revision']
post_payload(url, payload)
except error.HTTPError:
# we're here if NiFi is still trying to stop the processor: we have to wait
await asyncio.sleep(retry_secs)
await restart_processor(json_response, retry_secs * 1.5)
if __name__ == '__main__':
results = filter(filter_groovy,
pipe("{}/flow/process-groups/root/status?recursive=true".format(URL),
request.urlopen,
as_dict,
get_processors_id,
get_processors_info))
loop = asyncio.get_event_loop()
tasks = [restart_processor(result, RETRY_SECONDS) for result in results]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment