Last active
October 10, 2017 19:13
-
-
Save gglanzani/708364f4d5288844fc63d692ebe47b51 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
When deploying new jars used by NiFi's ExecuteScript processor, it is necessary to stop and restart all the | |
processors manually to have them pick up the new jars. This script solves this issue. | |
""" | |
import json | |
from urllib import request | |
from urllib import error | |
from typing import Dict, Iterator, List | |
from http.client import HTTPResponse | |
import asyncio | |
URL = "http://127.0.0.1:8080/nifi-api" | |
RETRY_SECONDS = 3 | |
def get_execute_script_processors(group: Dict) -> Iterator[str]: | |
"Recursively walk the NiFi flows to get all ExecuteScript processors" | |
inners = (group.get('processGroupStatusSnapshot') | |
.get('processGroupStatusSnapshots')) | |
if inners: | |
for inner in inners: | |
yield from get_execute_script_processors(inner) | |
snapshots = (group.get('processGroupStatusSnapshot') | |
.get('processorStatusSnapshots')) | |
for snapshot in snapshots: | |
info = snapshot["processorStatusSnapshot"] | |
if info["type"] == 'ExecuteScript': | |
yield info["id"] | |
def get_processors_id(response: Dict) -> Iterator[str]: | |
"Yield ExecuteScript processors given NiFi API response" | |
groups = (response.get('processGroupStatus') | |
.get('aggregateSnapshot') | |
.get('processGroupStatusSnapshots')) | |
for group in groups: | |
yield from get_execute_script_processors(group) | |
def as_dict(response: HTTPResponse) -> Dict: | |
return json.loads(response.read().decode('utf-8')) | |
def get_processors_info(ids: Iterator[str]) -> List[Dict]: | |
"Get processor info by id using the NiFi API" | |
return [as_dict(request.urlopen("{}/processors/{}".format(URL, _id))) | |
for _id in ids] | |
def filter_groovy(processor_props: Dict) -> bool: | |
"Get all Groovy ExecuteScript processors with a filled Module Dir and currently running" | |
if (processor_props['component']['config']['properties']['Module Directory'] and | |
processor_props['component']['config']['properties']['Script Engine'].lower() == "groovy" and | |
processor_props['component']['state'].lower() == "running"): | |
return True | |
else: | |
return False | |
def pipe(data, *funcs): | |
""" | |
Pipe data through funcs, i.e | |
pipe(data, f,g,...) == (...(g(f(data))) | |
""" | |
for fun in (funcs): | |
data = fun(data) | |
return data | |
def post_payload(url: str, payload: Dict) -> Dict: | |
"Wrapper to easy posting JSONs to a URL" | |
req = request.Request(url, json.dumps(payload).encode('utf-8')) | |
req.add_header("Content-Type", 'application/json') | |
req.get_method = lambda: "PUT" | |
with request.urlopen(req) as response: | |
dict_response = json.loads(response.read().decode('utf-8')) | |
return dict_response | |
async def restart_processor(result: Dict, retry_secs: int): | |
"Safely restart a given processor" | |
_id = result['id'] | |
payload = {"component": {"state": "STOPPED", "id": _id}, | |
"id": _id, | |
"revision": result['revision']} | |
url = "{}/processors/{}".format(URL, _id) | |
# we need the response to get the revision | |
json_response = post_payload(url, payload) | |
try: | |
payload['component']['state'] = 'RUNNING' | |
payload['revision'] = json_response['revision'] | |
post_payload(url, payload) | |
except error.HTTPError: | |
# we're here if NiFi is still trying to stop the processor: we have to wait | |
await asyncio.sleep(retry_secs) | |
await restart_processor(json_response, retry_secs * 1.5) | |
if __name__ == '__main__': | |
results = filter(filter_groovy, | |
pipe("{}/flow/process-groups/root/status?recursive=true".format(URL), | |
request.urlopen, | |
as_dict, | |
get_processors_id, | |
get_processors_info)) | |
loop = asyncio.get_event_loop() | |
tasks = [restart_processor(result, RETRY_SECONDS) for result in results] | |
loop.run_until_complete(asyncio.wait(tasks)) | |
loop.close() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment