Created
October 22, 2019 09:15
-
-
Save auxten/1d4542203f30b179d014d72c83b8e20d to your computer and use it in GitHub Desktop.
Spark-submit like tool for livy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# Very bare bones shell for driving a Livy session. Usage: | |
# | |
# livy-shell url [option=value ...] | |
# | |
# Options are set directly in the session creation request, so they must match the names of fields | |
# in the CreateInteractiveRequest structure. Option values should be python-like objects (should be | |
# parseable by python's "eval" function; naked strings are allowed). For example: | |
# | |
# kind=spark | |
# jars=[ '/foo.jar', '/bar.jar' ] | |
# conf={ foo : bar, 'spark.option' : opt_value } | |
# | |
# By default, a Spark (Scala) session is created. | |
# | |
import httplib | |
import json | |
import readline | |
import sys | |
import time | |
import urlparse | |
def check(condition, msg, *args): | |
if not condition: | |
if args: | |
msg = msg % args | |
print >> sys.stderr, msg | |
sys.exit(1) | |
def message(msg, *args): | |
if args: | |
msg = msg % args | |
print msg | |
class LiteralDict(dict): | |
def __missing__(self, name): | |
return name | |
def request(conn, method, uri, body): | |
body = json.dumps(body) if body else None | |
headers = { 'Content-Type' : 'application/json' } | |
conn.request(method, uri, body=body, headers=headers) | |
resp = conn.getresponse() | |
data = resp.read() | |
if resp.status < 200 or resp.status >= 400: | |
raise httplib.HTTPException, (resp.status, resp.reason, data) | |
if resp.status < 300 and resp.status != httplib.NO_CONTENT: | |
return json.loads(data) | |
return None | |
def get(conn, uri): | |
return request(conn, 'GET', uri, None) | |
def post(conn, uri, body): | |
return request(conn, 'POST', uri, body) | |
def delete(conn, uri): | |
return request(conn, 'DELETE', uri, None) | |
def create_session(conn): | |
request = { | |
"kind" : "spark" | |
} | |
for opt in sys.argv[3:]: | |
check(opt.find('=') > 0, "Invalid option: %s.", opt) | |
key, value = opt.split('=', 1) | |
request[key] = eval(value, LiteralDict()) | |
return post(conn, "/sessions", request) | |
def wait_for_idle(sid): | |
session = get(conn, "/sessions/%d" % (sid, )) | |
while session['state'] == 'starting': | |
message("Session not ready yet (%s)", session['state']) | |
time.sleep(5) | |
session = get(conn, "/sessions/%d" % (sid, )) | |
if session['state'] != 'idle': | |
raise Exception, "Session failed to start." | |
def monitor_statement(conn, sid, s): | |
cnt = 0 | |
while True: | |
state = s['state'] | |
if state == 'available': | |
output = s['output'] | |
status = output['status'] | |
if status == 'ok': | |
result = output['data'] | |
text = result.get('text/plain', None) | |
if text is None: | |
message("Success (non-text result).") | |
elif text.rstrip(): | |
message("%s", text) | |
elif status == 'error': | |
ename = output['ename'] | |
evalue = output['evalue'] | |
traceback = "\n".join(output.get('traceback', [])) | |
message("%s: %s", ename, evalue) | |
if traceback: | |
message("%s", traceback) | |
else: | |
message("Statement finished with unknown status '%s'.", status) | |
break | |
elif state == 'error': | |
message("%s", s['error']) | |
break | |
else: | |
if cnt == 10: | |
message("(waiting for result...)") | |
cnt = 0 | |
else: | |
cnt += 1 | |
time.sleep(1) | |
s = get(conn, "/sessions/%d/statements/%s" % (sid, s['id'])) | |
def run_shell(conn, sid): | |
while True: | |
cmd = sc.readline().strip() | |
if len(cmd) == 0: | |
break | |
statement = post(conn, "/sessions/%d/statements" % (sid, ), { 'code' : cmd }) | |
print "cmd: " + cmd | |
monitor_statement(conn, sid, statement) | |
def open_connection(url): | |
if url.scheme == "https": | |
return httplib.HTTPSConnection(url.netloc) | |
else: | |
return httplib.HTTPConnection(url.netloc) | |
# | |
# main() | |
# | |
check(len(sys.argv) > 1, "Missing arguments.") | |
url = urlparse.urlparse(sys.argv[1]) | |
conn = open_connection(url) | |
sc = open(sys.argv[2]) | |
sid = -1 | |
try: | |
message("Creating new session...") | |
session = create_session(conn) | |
sid = int(session['id']) | |
message("New session (id = %d, kind = %s), waiting for idle state...", sid, session['kind']) | |
wait_for_idle(sid) | |
message("Session ready.") | |
run_shell(conn,sid) | |
except EOFError: | |
pass | |
finally: | |
conn.close() | |
if sid != -1: | |
conn = open_connection(url) | |
try: | |
delete(conn, "/sessions/%d" % (sid, )) | |
finally: | |
conn.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment