# credits source : https://gist.github.com/rueedlinger/76af36d04a0798a8e1f43ed16595bd97 import sys import os import json import argparse from base64 import b64encode PYTHON_MAJOR_VERSION = sys.version_info.major DEFAULT_HOST = 'localhost' DEFAULT_PORT = '8084' BASE_PATH = '/connectors' KAFKA_CONNECT_API_CREDENTIALS_USER = 'admin' KAFKA_CONNECT_API_CREDENTIALS_PWD = 'admin' if PYTHON_MAJOR_VERSION == 2: import httplib else: import http.client as httplib if 'KAFKA_CONNECT_REST' in os.environ: KAFKA_CONNECT_REST = os.environ['KAFKA_CONNECT_REST'] else: KAFKA_CONNECT_REST = DEFAULT_HOST + ':' + str(DEFAULT_PORT) # Authorization token: we need to base 64 encode it # and then decode it to acsii as python 3 stores it as a byte string def basic_auth(username, password): return { "Authorization": "Basic {}".format( b64encode(bytes(f"{username}:{password}", "utf-8")).decode("ascii") ) } class ConnectError(Exception): def __init__(self, method, path, http_status, reason): self.method = method self.path = path self.http_status = http_status self.reason = reason class HttpUtil: def __init__(self, http_connection, headers): self.http_connection = http_connection self.headers = headers def get(self, path): self.http_connection.request(method='GET', url=path, body=None, headers=self.headers) response = self.http_connection.getresponse() if response.status != 200: raise ConnectError(method='GET', path=path, http_status=response.status, reason=response.reason) return json.loads(response.read()) def post(self, path): self.http_connection.request('POST', url=path, body=None, headers=self.headers) response = self.http_connection.getresponse() response.read() if response.status != 204: raise ConnectError(method='POST', path=path, http_status=response.status, reason=response.reason) return {'http_status': response.status, 'reason': response.reason, 'path': path, 'method': 'POST'} def put(self, path): self.http_connection.request('PUT', url=path, body=None, headers=self.headers) response = self.http_connection.getresponse() response.read() if response.status != 202: raise ConnectError(method='PUT', path=path, http_status=response.status, reason=response.reason) return {'http_status': response.status, 'reason': response.reason, 'path': path, 'method': 'PUT'} if __name__ == '__main__': parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(help='Functions', dest='cmd') cmd_status = subparsers.add_parser('status', help='show the status') cmd_status.add_argument('connector_id', metavar='<connector_id>', nargs='?', help='the id of the connector') cmd_status = subparsers.add_parser('restart', help='restart connector') cmd_status.add_argument('connector_id', metavar='<connector_id>', help='the id of the connector') cmd_status = subparsers.add_parser('pause', help='pause connector') cmd_status.add_argument('connector_id', metavar='<connector_id>', help='the id of the connector') cmd_status = subparsers.add_parser('resume', help='resume connector') cmd_status.add_argument('connector_id', metavar='<connector_id>', help='the id of the connector') args = parser.parse_args() # conn = httplib.HTTPConnection(KAFKA_CONNECT_REST) conn = httplib.HTTPSConnection(KAFKA_CONNECT_REST) http_util = HttpUtil(conn, basic_auth(username=KAFKA_CONNECT_API_CREDENTIALS_USER, password=KAFKA_CONNECT_API_CREDENTIALS_PWD)) try: if args.cmd == 'status': if args.connector_id: status = http_util.get(BASE_PATH + '/' + args.connector_id + '/status') print(status['name'] + ': ' + status['connector']['state']) for tasks in status['tasks']: print(' task ' + str(tasks['id']) + ': ' + tasks['state']) else: connectors = http_util.get(BASE_PATH) for connector in connectors: status = http_util.get(BASE_PATH + '/' + connector + '/status') print(status['name'] + ': ' + status['connector']['state']) for tasks in status['tasks']: print(' task ' + str(tasks['id']) + ': ' + tasks['state']) elif args.cmd == 'pause': http_util.put(BASE_PATH + '/' + args.connector_id + '/pause') elif args.cmd == 'resume': http_util.put(BASE_PATH + '/' + args.connector_id + '/resume') elif args.cmd == 'restart': resp = http_util.get(BASE_PATH + '/' + args.connector_id) http_util.post(BASE_PATH + '/' + args.connector_id + '/restart') for task in resp['tasks']: http_util.post(BASE_PATH + '/' + args.connector_id + '/tasks/' + str(task['task']) + '/restart') else: parser.print_help() except ConnectError as ex: print('Got error %s (%s) for request %s %s%s ' % (ex.http_status, ex.reason, ex.method, KAFKA_CONNECT_REST, ex.path)) print(' command: ' + args.cmd) if args.connector_id: print(' connector_id: ' + str(args.connector_id))