Skip to content

Instantly share code, notes, and snippets.

@Teqqles
Created March 1, 2018 18:50
Show Gist options
  • Select an option

  • Save Teqqles/baf20ca0add766eaec7bc867890f7aad to your computer and use it in GitHub Desktop.

Select an option

Save Teqqles/baf20ca0add766eaec7bc867890f7aad to your computer and use it in GitHub Desktop.
For use against dcos spark status cli
import json
import re
import click
import sys
# noinspection PyMethodMayBeStatic
class SparkJobStatus(object):
DRIVER_NO_STATUS = "DRIVER_NO_STATUS"
TASK_NO_STATUS = "TASK_NO_STATUS"
def __init__(self, source):
status = json.loads(self.__extract_status_jsons(source))
self.driver_status = status.get("driverState", self.DRIVER_NO_STATUS)
self.task_status = self.__extract_task_status(status)
def __extract_status_jsons(self, source: str) -> str:
match = re.match(r"({.+?}\n)", source, re.DOTALL)
return str(match.group(1)) if match else "{}"
def get_status(self) -> str:
return "{}/{}".format(self.driver_status, self.task_status)
def __extract_task_status(self, status: dict) -> str:
message = status.get("message", "")
match = re.findall(r"state:\s([^\s\\]+)", message)
return match[0] if len(match) > 0 else self.TASK_NO_STATUS
def get_exit_code(self) -> int:
return 1 if self.get_status().find("FAIL") != -1 else 0
@click.command()
@click.argument('src', nargs=1)
def run_bot(src):
spark_job_status = SparkJobStatus(src)
print(spark_job_status.get_status())
sys.exit(spark_job_status.get_exit_code())
if __name__ == '__main__':
run_bot()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment