Created
May 18, 2018 13:15
-
-
Save adamkewley/1965dbff0ec31a2169411d2712b6cff4 to your computer and use it in GitHub Desktop.
High-level script for testing specs via a Jobson API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# -*- coding: utf-8 -*- | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
import argparse | |
import base64 | |
import http | |
import http.client | |
import json | |
import logging | |
import os | |
import sys | |
import unittest | |
from _datetime import datetime | |
from itertools import repeat | |
from time import sleep | |
import yaml | |
BUILD_OUTPUT_FOLDER = "target/" | |
def project_file(path): | |
""" | |
Returns the supplied path resolved relative to the project root. | |
""" | |
return os.path.join(os.path.dirname(__file__), path) | |
def subdirs_in(path): | |
dir_names = next(os.walk(path))[1] | |
return map(lambda dir_name: os.path.join(path, dir_name), dir_names) | |
class SystemTests(unittest.TestCase): | |
seconds_between_polling_requests = 3 | |
safety_limit_on_request_polling = 2000 | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
logging.basicConfig(level=logging.INFO) | |
self.logger = logging.getLogger("jobson-build") | |
def run_all(self, host, port, login, password): | |
self.test_can_request_list_of_jobs(host, port, login, password) | |
self.test_can_request_list_of_specs(host, port, login, password) | |
self.test_can_run_all_test_yaml_files(host, port, login, password) | |
def test_can_request_list_of_jobs(self, host, port, username, password): | |
http_conn = http.client.HTTPConnection(host, port) | |
try: | |
http_conn.request("GET", "/v1/jobs", None, self._generate_req_headers(username, password)) | |
resp = http_conn.getresponse() | |
self.assertEqual(resp.status, 200) | |
finally: | |
http_conn.close() | |
def _generate_req_headers(self, username, password): | |
return { | |
"Content-Type": "application/json", | |
"Authorization": "Basic " + base64.b64encode((username + ":" + password).encode("utf-8")).decode("utf-8"), | |
} | |
def test_can_request_list_of_specs(self, host, port, username, password): | |
http_conn = http.client.HTTPConnection(host, port) | |
try: | |
http_conn.request("GET", "/v1/specs", None, self._generate_req_headers(username, password)) | |
resp = http_conn.getresponse() | |
self.assertEqual(resp.status, 200) | |
finally: | |
http_conn.close() | |
def test_can_run_all_test_yaml_files(self, host, port, username, password): | |
self.logger.info("Attempting to run tests.yml files in each spec directory") | |
specs_folder = project_file(os.path.join(BUILD_OUTPUT_FOLDER, "specs")) | |
if not os.path.exists(specs_folder): | |
raise AssertionError("%s: does not exist (has the deployment been built)?" % specs_folder) | |
a_test_suite_failed = False | |
for spec_folder in subdirs_in(specs_folder): | |
try: | |
self._try_run_test_file(spec_folder, host, port, username, password) | |
except AssertionError as ex: | |
a_test_suite_failed = True | |
self.logger.error("A test in {} failed: {}".format(spec_folder, ex)) | |
if a_test_suite_failed: | |
raise AssertionError("One or more test suites failed") | |
def _try_run_test_file(self, spec_folder, host, port, username, password): | |
spec_id = os.path.basename(spec_folder) | |
maybe_tests_file = os.path.join(spec_folder, "tests.yml") | |
if os.path.exists(maybe_tests_file): | |
self.logger.info("%s: has a tests.yml file. Running tests" % spec_id) | |
self._run_test_file(spec_id, maybe_tests_file, host, port, username, password) | |
else: | |
self.logger.info("%s: does not have a tests.yml file. Skipping." % spec_id) | |
def _run_test_file(self, spec_id, test_file_path, host, port, username, password): | |
with open(test_file_path, 'r') as f: | |
tests_data = yaml.load(f) | |
a_test_failed = False | |
for test_name, test_details in tests_data["tests"].items(): | |
try: | |
self._run_test_file_test(spec_id, test_name, test_details, host, port, username, password) | |
except AssertionError as ex: | |
a_test_failed = True | |
self.logger.error("{}: {} failed: {}.".format(spec_id, test_name, ex)) | |
self.logger.error("Moving on to next test") | |
if a_test_failed: | |
raise AssertionError("One or more tests failed") | |
def _run_test_file_test(self, spec_id, test_name, test_details, host, port, username, password): | |
http_conn = http.client.HTTPConnection(host, port) | |
try: | |
body = json.dumps({ | |
"spec": spec_id, | |
"name": "systemtest_%s_%s" % (spec_id, test_name), | |
"inputs": test_details["inputs"], | |
}) | |
self.logger.info("%s: %s: Sending job request" % (spec_id, test_name)) | |
http_conn.request("POST", "/v1/jobs", body, self._generate_req_headers(username, password)) | |
resp = http_conn.getresponse() | |
if "expectations" in test_details: | |
result_expectations = test_details["expectations"] | |
else: | |
msg = "{}: {}: Does not have an expectations key".format(spec_id, test_name) | |
self.logger.error(msg) | |
raise RuntimeError(msg) | |
self._handle_test_expectations(spec_id, test_name, http_conn, result_expectations, resp, username, password) | |
finally: | |
http_conn.close() | |
def _handle_test_expectations(self, spec_id, test_name, http_conn, expectations, submission_response, username, | |
password): | |
status = submission_response.status | |
if expectations.get("isAccepted", True): | |
self.assertEqual(status, 200, "%s: %s: Should have been accepted but was rejected with status %s" % ( | |
spec_id, test_name, status)) | |
job_id = self._read_response_body_as_json(submission_response)["id"] | |
self.logger.info( | |
"%s: %s: Job request was accepted (as expected) and given an ID of '%s'" % (spec_id, test_name, job_id)) | |
self._handle_after_submission_test_expectations(spec_id, test_name, http_conn, expectations, job_id, | |
username, password) | |
else: | |
self.assertNotEqual(status, 200, | |
"%s: %s: Should have been rejected but was accepted" % (spec_id, test_name)) | |
self.logger.info("%s: %s: Job request was rejected (as expected)" % (spec_id, test_name)) | |
def _read_response_body_as_json(self, response): | |
return json.loads(response.read().decode("utf-8")) | |
def _handle_after_submission_test_expectations(self, spec_id, test_name, http_conn, expectations, job_id, username, | |
password): | |
self.logger.info("%s: %s: Polling the job for its final status" % (spec_id, test_name)) | |
submission_time = datetime.now() | |
final_job_details = self._poll_for_final_job_details(http_conn, job_id, username, password) | |
final_status = self._extract_latest_status(final_job_details) | |
end_time = datetime.now() | |
self.logger.info("%s: %s: Final status was '%s'" % (spec_id, test_name, final_status)) | |
self.assertEqual(expectations.get("finalStatus", "finished"), final_status) | |
outputs_expectations = expectations.get("outputs", None) | |
if outputs_expectations is not None: | |
self.logger.info("%s: %s: Has output expectations. Testing that the job produced the expected outputs" % ( | |
spec_id, test_name)) | |
self._handle_outputs_expectations(spec_id, test_name, http_conn, outputs_expectations, job_id, username, | |
password) | |
def _poll_for_final_job_details(self, http_conn, job_id, username, password): | |
for _ in repeat(self.safety_limit_on_request_polling): | |
job_details = self._get_job_details(http_conn, job_id, username, password) | |
status = self._extract_latest_status(job_details) | |
if status in ["finished", "fatal-error", "aborted"]: | |
return job_details | |
sleep(self.seconds_between_polling_requests) | |
def _get_job_details(self, http_conn, job_id, username, password): | |
http_conn.request("GET", "/v1/jobs/" + job_id, None, self._generate_req_headers(username, password)) | |
resp = http_conn.getresponse() | |
self.assertEqual(resp.status, 200) | |
return json.loads(resp.read().decode("utf-8")) | |
def _extract_latest_status(self, job_details): | |
return job_details["timestamps"][-1]["status"] | |
def _handle_outputs_expectations(self, spec_id, test_name, http_conn, outputs_expectations, job_id, username, | |
password): | |
http_conn.request("GET", "/v1/jobs/%s/outputs" % job_id, None, self._generate_req_headers(username, password)) | |
job_outputs_resp = http_conn.getresponse() | |
self.assertEqual(job_outputs_resp.status, 200) | |
job_outputs_metadata = self._read_response_body_as_json(job_outputs_resp) | |
for output_expectation in outputs_expectations: | |
matcher = self._get_output_id_matcher(output_expectation["id"]) | |
self._handle_output_expectation(spec_id, test_name, http_conn, output_expectation, job_id) | |
def _get_output_id_matcher(self, expected_id): | |
if expected_id.startswith("/"): | |
() | |
else: | |
() | |
def _handle_output_expectation(self, spec_id, test_name, http_conn, output_expectation, output_id): | |
() | |
# MAIN | |
parser = argparse.ArgumentParser(description="Run jobson system tests") | |
parser.add_argument( | |
"host", | |
metavar="HOST", | |
type=str, | |
help="The host running the server (e.g. localhost)", | |
nargs=1) | |
parser.add_argument( | |
"port", | |
metavar="PORT", | |
type=int, | |
help="The port the Jobson API is listening on (e.g. 8080)", | |
nargs=1) | |
parser.add_argument( | |
"login", | |
metavar="LOGIN", | |
type=str, | |
help="The login to use to access the API", | |
nargs=1) | |
parser.add_argument( | |
"password", | |
metavar="PASSWORD", | |
type=str, | |
help="The password to use the access the API", | |
nargs=1) | |
args = parser.parse_args(sys.argv[1:]) | |
host = args.host[0] | |
port = args.port[0] | |
login = args.login[0] | |
password = args.password[0] | |
SystemTests().run_all(host, port, login, password) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment