-
-
Save vijayanandrp/4886d2089f671a5075a7f00fdb027acd to your computer and use it in GitHub Desktop.
Kafka Connect Python Script - https | user auth | Status | restart | pause | resume
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# credits source : https://gist.github.com/rueedlinger/76af36d04a0798a8e1f43ed16595bd97 | |
import sys | |
import os | |
import json | |
import argparse | |
from base64 import b64encode | |
PYTHON_MAJOR_VERSION = sys.version_info.major | |
DEFAULT_HOST = 'localhost' | |
DEFAULT_PORT = '8084' | |
BASE_PATH = '/connectors' | |
KAFKA_CONNECT_API_CREDENTIALS_USER = 'admin' | |
KAFKA_CONNECT_API_CREDENTIALS_PWD = 'admin' | |
if PYTHON_MAJOR_VERSION == 2: | |
import httplib | |
else: | |
import http.client as httplib | |
if 'KAFKA_CONNECT_REST' in os.environ: | |
KAFKA_CONNECT_REST = os.environ['KAFKA_CONNECT_REST'] | |
else: | |
KAFKA_CONNECT_REST = DEFAULT_HOST + ':' + str(DEFAULT_PORT) | |
# Authorization token: we need to base 64 encode it | |
# and then decode it to acsii as python 3 stores it as a byte string | |
def basic_auth(username, password): | |
return { | |
"Authorization": "Basic {}".format( | |
b64encode(bytes(f"{username}:{password}", "utf-8")).decode("ascii") | |
) | |
} | |
class ConnectError(Exception): | |
def __init__(self, method, path, http_status, reason): | |
self.method = method | |
self.path = path | |
self.http_status = http_status | |
self.reason = reason | |
class HttpUtil: | |
def __init__(self, http_connection, headers): | |
self.http_connection = http_connection | |
self.headers = headers | |
def get(self, path): | |
self.http_connection.request(method='GET', url=path, body=None, headers=self.headers) | |
response = self.http_connection.getresponse() | |
if response.status != 200: | |
raise ConnectError(method='GET', path=path, http_status=response.status, reason=response.reason) | |
return json.loads(response.read()) | |
def post(self, path): | |
self.http_connection.request('POST', url=path, body=None, headers=self.headers) | |
response = self.http_connection.getresponse() | |
response.read() | |
if response.status != 204: | |
raise ConnectError(method='POST', path=path, http_status=response.status, reason=response.reason) | |
return {'http_status': response.status, 'reason': response.reason, 'path': path, 'method': 'POST'} | |
def put(self, path): | |
self.http_connection.request('PUT', url=path, body=None, headers=self.headers) | |
response = self.http_connection.getresponse() | |
response.read() | |
if response.status != 202: | |
raise ConnectError(method='PUT', path=path, http_status=response.status, reason=response.reason) | |
return {'http_status': response.status, 'reason': response.reason, 'path': path, 'method': 'PUT'} | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
subparsers = parser.add_subparsers(help='Functions', dest='cmd') | |
cmd_status = subparsers.add_parser('status', help='show the status') | |
cmd_status.add_argument('connector_id', metavar='<connector_id>', nargs='?', help='the id of the connector') | |
cmd_status = subparsers.add_parser('restart', help='restart connector') | |
cmd_status.add_argument('connector_id', metavar='<connector_id>', help='the id of the connector') | |
cmd_status = subparsers.add_parser('pause', help='pause connector') | |
cmd_status.add_argument('connector_id', metavar='<connector_id>', help='the id of the connector') | |
cmd_status = subparsers.add_parser('resume', help='resume connector') | |
cmd_status.add_argument('connector_id', metavar='<connector_id>', help='the id of the connector') | |
args = parser.parse_args() | |
# conn = httplib.HTTPConnection(KAFKA_CONNECT_REST) | |
conn = httplib.HTTPSConnection(KAFKA_CONNECT_REST) | |
http_util = HttpUtil(conn, basic_auth(username=KAFKA_CONNECT_API_CREDENTIALS_USER, | |
password=KAFKA_CONNECT_API_CREDENTIALS_PWD)) | |
try: | |
if args.cmd == 'status': | |
if args.connector_id: | |
status = http_util.get(BASE_PATH + '/' + args.connector_id + '/status') | |
print(status['name'] + ': ' + status['connector']['state']) | |
for tasks in status['tasks']: | |
print(' task ' + str(tasks['id']) + ': ' + tasks['state']) | |
else: | |
connectors = http_util.get(BASE_PATH) | |
for connector in connectors: | |
status = http_util.get(BASE_PATH + '/' + connector + '/status') | |
print(status['name'] + ': ' + status['connector']['state']) | |
for tasks in status['tasks']: | |
print(' task ' + str(tasks['id']) + ': ' + tasks['state']) | |
elif args.cmd == 'pause': | |
http_util.put(BASE_PATH + '/' + args.connector_id + '/pause') | |
elif args.cmd == 'resume': | |
http_util.put(BASE_PATH + '/' + args.connector_id + '/resume') | |
elif args.cmd == 'restart': | |
resp = http_util.get(BASE_PATH + '/' + args.connector_id) | |
http_util.post(BASE_PATH + '/' + args.connector_id + '/restart') | |
for task in resp['tasks']: | |
http_util.post(BASE_PATH + '/' + args.connector_id + '/tasks/' + str(task['task']) + '/restart') | |
else: | |
parser.print_help() | |
except ConnectError as ex: | |
print('Got error %s (%s) for request %s %s%s ' % (ex.http_status, ex.reason, | |
ex.method, KAFKA_CONNECT_REST, ex.path)) | |
print(' command: ' + args.cmd) | |
if args.connector_id: | |
print(' connector_id: ' + str(args.connector_id)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment