Skip to content

Instantly share code, notes, and snippets.

@johnandersen777
Created March 26, 2024 21:00
Show Gist options
  • Save johnandersen777/b8d4728a002ca0e5ffe5f78c4a8ec68b to your computer and use it in GitHub Desktop.
Save johnandersen777/b8d4728a002ca0e5ffe5f78c4a8ec68b to your computer and use it in GitHub Desktop.
diff --git a/scitt_emulator/ccf.py b/scitt_emulator/ccf.py
index 06296f8..452c801 100644
--- a/scitt_emulator/ccf.py
+++ b/scitt_emulator/ccf.py
@@ -78,13 +78,13 @@ class CCFSCITTServiceEmulator(SCITTServiceEmulator):
key = jwcrypto.jwk.JWK()
key_bytes = pathlib.Path(self._service_private_key_path).read_bytes()
key.import_from_pem(key_bytes)
- return [
- {
+ return {
+ key.thumbprint(): {
**key.export_public(as_dict=True),
"use": "sig",
"kid": key.thumbprint(),
}
- ]
+ }
def create_receipt_contents(self, countersign_tbi: bytes, entry_id: str):
# Load service private key and certificate
diff --git a/scitt_emulator/oidc.py b/scitt_emulator/oidc.py
index 75d82d0..0e0f2af 100644
--- a/scitt_emulator/oidc.py
+++ b/scitt_emulator/oidc.py
@@ -10,6 +10,7 @@ from scitt_emulator.client import HttpClient
class OIDCAuthMiddleware:
def __init__(self, app, config_path):
self.app = app
+ self.asgi_app = app.asgi_app
self.config = {}
if config_path and config_path.exists():
self.config = json.loads(config_path.read_text())
@@ -29,7 +30,7 @@ class OIDCAuthMiddleware:
claims = self.validate_token(request.headers["Authorization"].replace("Bearer ", ""))
if "claim_schema" in self.config and claims["iss"] in self.config["claim_schema"]:
jsonschema.validate(claims, schema=self.config["claim_schema"][claims["iss"]])
- return self.app(environ, start_response)
+ return self.wsgi_app(environ, start_response)
def validate_token(self, token):
validation_error = Exception(f"Failed to validate against all issuers: {self.jwks_clients.keys()!s}")
diff --git a/scitt_emulator/rkvst.py b/scitt_emulator/rkvst.py
index add848a..3769951 100644
--- a/scitt_emulator/rkvst.py
+++ b/scitt_emulator/rkvst.py
@@ -59,7 +59,7 @@ class RKVSTSCITTServiceEmulator(SCITTServiceEmulator):
}
def keys_as_jwks(self):
- return []
+ return {}
def _event_id_to_operation_id(self, event_id: str):
return event_id.replace('/', '_')
diff --git a/scitt_emulator/scitt.py b/scitt_emulator/scitt.py
index 8486efb..1c5fc64 100644
--- a/scitt_emulator/scitt.py
+++ b/scitt_emulator/scitt.py
@@ -13,7 +13,6 @@ from pycose.messages import Sign1Message
import pycose.headers
from scitt_emulator.create_statement import CWTClaims
-from scitt_emulator.verify_statement import verify_statement
# temporary receipt header labels, see draft-birkholz-scitt-receipts
COSE_Headers_Service_Id = "service_id"
diff --git a/scitt_emulator/server.py b/scitt_emulator/server.py
index 0bd6a9f..6e7a18d 100644
--- a/scitt_emulator/server.py
+++ b/scitt_emulator/server.py
@@ -2,13 +2,19 @@
# Licensed under the MIT License.
import os
+import json
from pathlib import Path
from io import BytesIO
import random
+import jwt.api_jwt
+import jwcrypto.jwt
+import pycose.headers
+from pycose.messages import Sign1Message
from flask import Flask, request, send_file, make_response, jsonify
from scitt_emulator.tree_algs import TREE_ALGS
+from scitt_emulator.verify_statement import verify_statement
from scitt_emulator.plugin_helpers import entrypoint_style_load
from scitt_emulator.scitt import EntryNotFoundError, ClaimInvalidError, OperationNotFoundError
@@ -34,9 +40,6 @@ def create_flask_app(config):
app.config.update(dict(DEBUG=True))
app.config.update(config)
- if app.config.get("middleware", None):
- app.wsgi_app = app.config["middleware"](app.wsgi_app, app.config.get("middleware_config_path", None))
-
error_rate = app.config["error_rate"]
use_lro = app.config["use_lro"]
@@ -53,6 +56,11 @@ def create_flask_app(config):
app.scitt_service.initialize_service()
print(f"Service parameters: {app.service_parameters_path}")
+ app.jwks = {}
+
+ for middleware, middleware_config_path in zip(app.config.get("middleware", []), app.config.get("middleware_config_path", [])):
+ app.wsgi_app = middleware(app, middleware_config_path)
+
def is_unavailable():
return random.random() <= error_rate
@@ -66,13 +74,81 @@ def create_flask_app(config):
"registration_endpoint": f"/entries",
"nonce_endpoint": f"/nonce",
"registration_policy": f"/statements/TODO",
- "supported_signature_algorithms": ["ES256"],
+ "supported_signature_algorithms": ["RS256"],
"jwks": {
- "keys": app.scitt_service.keys_as_jwks(),
+ "keys": list(
+ {
+ **app.scitt_service.keys_as_jwks(),
+ **app.jwks.values(),
+ }
+ )
}
}
)
+ # TODO During phase 1, this should be moved into it's own service
+ # @lice, track this as a series of issues from engineering logs 2024-03-23
+ @app.route("/v1/token/issue/<string:audience>/<string:subject>", methods=["POST"])
+ def token_issue(audience: str, subject: str):
+ if is_unavailable():
+ return make_unavailable_error()
+
+ claim = request.get_data()
+
+ msg = Sign1Message.decode(claim, tag=True)
+
+ if pycose.headers.ContentType not in msg.phdr:
+ raise ClaimInvalidError("Claim does not have a content type header parameter")
+
+ verification_key = None
+ e = Exception("Failed to verify statement")
+ try:
+ verification_key = verify_statement(msg)
+ except Exception as error:
+ e = error
+ if verification_key is None:
+ return make_error("StatementVerificationFailed", str(e), 404)
+
+ key = jwcrypto.jwk.JWK.generate(kty="RSA", size=2048)
+ app.jwks[key.thumbprint()] = key
+
+ algorithm = "RS256"
+ iss = app.config.get("fqdn", f"http://localhost:{app.port}")
+ new_token = jwt.encode(
+ # TODO app.fqdn
+ {"iss": iss, "aud": audience, "sub": subject},
+ key.export_to_pem(private_key=True, password=None),
+ algorithm=algorithm,
+ headers={"kid": key.thumbprint()},
+ )
+
+ return jsonify(
+ {
+ "token": new_token,
+ }
+ )
+
+ # TODO During phase 1, this should be moved into it's own service
+ @app.route("/v1/token/revoke", methods=["POST"])
+ def token_revoke():
+ if is_unavailable():
+ return make_unavailable_error()
+
+ token = json.loads(request.get_data())["token"]
+
+ unverified_token = jwt.api_jwt.decode_complete(
+ token,
+ options={"verify_signature": False},
+ )
+ unverified_token_header = unverified_token["header"]
+ kid = unverified_token_header.get("kid")
+
+ if kid in app.jwks:
+ del app.jwks[kid]
+ return jsonify({"status": "success", "detail": None})
+
+ return make_error("KeyIDNotActive", f"kid {kid!r} not active", 404)
+
@app.route("/entries/<string:entry_id>/receipt", methods=["GET"])
def get_receipt(entry_id: str):
if is_unavailable():
@@ -133,6 +209,7 @@ def create_flask_app(config):
def cli(fn):
parser = fn()
+ parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("-p", "--port", type=int, default=8000)
parser.add_argument("--error-rate", type=float, default=0.01)
parser.add_argument("--use-lro", action="store_true", help="Create operations for submissions")
@@ -141,9 +218,10 @@ def cli(fn):
parser.add_argument(
"--middleware",
type=lambda value: list(entrypoint_style_load(value))[0],
- default=None,
+ nargs="*",
+ default=[],
)
- parser.add_argument("--middleware-config-path", type=Path, default=None)
+ parser.add_argument("--middleware-config-path", type=Path, nargs="*", default=[])
def cmd(args):
app = create_flask_app(
@@ -156,7 +234,9 @@ def cli(fn):
"use_lro": args.use_lro
}
)
- app.run(host="0.0.0.0", port=args.port)
+ app.host = args.host
+ app.port = args.port
+ app.run(host=args.host, port=args.port)
parser.set_defaults(func=cmd)
diff --git a/tests/test_docs.py b/tests/test_docs.py
index 6ed369f..794a498 100644
--- a/tests/test_docs.py
+++ b/tests/test_docs.py
@@ -312,3 +312,196 @@ def test_docs_registration_policies(create_flask_app_notary_identity, tmp_path):
receipt_path.unlink()
assert os.path.exists(entry_id_path)
receipt_path.unlink(entry_id_path)
+
+
+def test_phase_0_relying_party_workload_identity_token_response(tmp_path):
+ workspace_path = tmp_path / "workspace"
+
+ claim_path = tmp_path / "claim.cose"
+ receipt_path = tmp_path / "claim.receipt.cbor"
+ entry_id_path = tmp_path / "claim.entry_id.txt"
+ retrieved_claim_path = tmp_path / "claim.retrieved.cose"
+
+ key = jwcrypto.jwk.JWK.generate(kty="RSA", size=2048)
+ algorithm = "RS256"
+ audience = "scitt.example.org"
+ subject = "repo:scitt-community/scitt-api-emulator:ref:refs/heads/main"
+
+ relying_party_workload_identity_token_response = client.post("/v1/token/{scitt.config.fqdn}/scitt_entry_submission_token")
+ relying_party_workload_identity_token = relying_party_workload_identity_token_response["token"]
+ # We can then use the tokens issued with the same SCITT service as the audience
+ # TODO scitt.config.fqdn TODO
+
+
+ with Service(
+ {"key": key, "algorithms": [algorithm]},
+ create_flask_app=create_flask_app_nop_scitt_scrapi,
+ ) as oidc_service:
+ os.environ["no_proxy"] = ",".join(
+ os.environ.get("no_proxy", "").split(",") + [oidc_service.host]
+ )
+ middleware_config_path = tmp_path / "oidc-middleware-config.json"
+ middleware_config_path.write_text(
+ json.dumps(
+ {
+ "issuers": [oidc_service.url],
+ "audience": audience,
+ "claim_schema": {
+ oidc_service.url: {
+ "$schema": "https://json-schema.org/draft/2020-12/schema",
+ "required": ["sub"],
+ "properties": {
+ "sub": {"type": "string", "enum": [subject]},
+ },
+ }
+ },
+ }
+ )
+ )
+ with Service(
+ {
+ "middleware": [OIDCAuthMiddleware],
+ "middleware_config_path": [middleware_config_path],
+ "tree_alg": "CCF",
+ "workspace": workspace_path,
+ "error_rate": 0.1,
+ "use_lro": False,
+ }
+ ) as service:
+ # create claim
+ command = [
+ "client",
+ "create-claim",
+ "--out",
+ claim_path,
+ "--subject",
+ "test",
+ "--content-type",
+ content_type,
+ "--payload",
+ payload,
+ ]
+ execute_cli(command)
+ assert os.path.exists(claim_path)
+
+ # submit claim without token
+ command = [
+ "client",
+ "submit-claim",
+ "--claim",
+ claim_path,
+ "--out",
+ receipt_path,
+ "--out-entry-id",
+ entry_id_path,
+ "--url",
+ service.url,
+ ]
+ check_error = None
+ try:
+ execute_cli(command)
+ except Exception as error:
+ check_error = error
+ assert check_error
+ assert not os.path.exists(receipt_path)
+ assert not os.path.exists(entry_id_path)
+
+ # create token without subject
+ token = jwt.encode(
+ {"iss": oidc_service.url, "aud": audience},
+ key.export_to_pem(private_key=True, password=None),
+ algorithm=algorithm,
+ headers={"kid": key.thumbprint()},
+ )
+ # submit claim with token lacking subject
+ command += [
+ "--token",
+ token,
+ ]
+ check_error = None
+ try:
+ execute_cli(command)
+ except Exception as error:
+ check_error = error
+ assert check_error
+ assert not os.path.exists(receipt_path)
+ assert not os.path.exists(entry_id_path)
+
+ # create token with subject
+ token = jwt.encode(
+ {"iss": oidc_service.url, "aud": audience, "sub": subject},
+ key.export_to_pem(private_key=True, password=None),
+ algorithm=algorithm,
+ headers={"kid": key.thumbprint()},
+ )
+ # submit claim with token containing subject
+ command[-1] = token
+ execute_cli(command)
+ assert os.path.exists(receipt_path)
+ assert os.path.exists(entry_id_path)
+
+ # We need to use httptest.oidc as the notary ID server
+ # Create entry
+ # Policy Engine Runs eval
+ # Policy engine requests /v1/token/... token for job
+ # using it's relying_party_service_account_token validated by OIDC
+ # middlware.
+ # TODO Pass endpoint of relying party (phase 0 this is scitt loopback
+ # port). to policy_engine.cli_api
+ # Receipt is for playload of PolicyEngineRequest (this is the manifest,
+ # request.yml),
+ # Policy Engine should run based key'd off subject to select workflows to
+ # trigger / run. Policy Engine MUST support !* gitignore style globs and
+ # subject exceptions to globs to not run a workflow on.
+ # If glob validating all subjects ensure PolicyEngineRequest schema
+ # receipt URN is in ignore list so it doesn't run on it's TCB which it
+ # already determined was insert worthy.
+ oidc_auth_middleware_config = {
+ {
+ "issuers": ["https://{scitt_service.url}"],
+ "claim_schema": {
+ "https://token.actions.githubusercontent.com": {
+ "\$schema": "https://json-schema.org/draft/2020-12/schema",
+ "required": [
+ "job_workflow_ref",
+ "job_workflow_sha",
+ "repository_owner_id",
+ "repository_id"
+ ],
+ "properties": {
+ "job_workflow_ref": {
+ "type": "string",
+ "enum": [
+ "${WORKFLOW_REF}"
+ ]
+ },
+ "job_workflow_sha": {
+ "type": "string",
+ "enum": [
+ "${JOB_WORKFLOW_SHA}"
+ ]
+ },
+ "repository_owner_id": {
+ "type": "string",
+ "enum": [
+ "${REPOSITORY_OWNER_ID}"
+ ]
+ },
+ "repository_id": {
+ "type": "string",
+ "enum": [
+ "${REPOSITORY_ID}"
+ ]
+ }
+ }
+ }
+ },
+ "audience": "${SCITT_URL}"
+
+ }
+ oidc_auth_middleware_config_path = tempdir_path.joinpath("config.json")
+ oidc_auth_middleware_config_path.write_text(json.dump())
+ oidc = OIDCAuthMiddleware(app, )
+ claims = oidc.validate_token(relying_party_workload_identity_token)
+ if "claim_schema" in self.config and claims["iss"] in oidc.config["claim_schema"]:
+ jsonschema.validate(claims, schema=oidc.config["claim_schema"][claims["iss"]])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment