Skip to content

Instantly share code, notes, and snippets.

@joeblackwaslike
Last active July 10, 2023 20:04
Show Gist options
  • Save joeblackwaslike/b262a76813ed3e578e292f353a4b4e51 to your computer and use it in GitHub Desktop.
Save joeblackwaslike/b262a76813ed3e578e292f353a4b4e51 to your computer and use it in GitHub Desktop.

Usage is simple

import json

from joserf.jwk import KeySet

from idp import IdentityProvider

default_registry = JWSRegistry(algorithms=["RS256", "ES256"])
keyset = KeySet.generate_key_set("EC", "P-256", parameters={"use": "sig"}, count=1)

signer = IdentityProvider(
    keyset=keyset,
    issuer="https://api.magic.link",
    audience="https://auth.magic.link",
    registry=default_registry,
)

pub_keyset = keyset.as_dict()

verifier = IdentityProvider(
    keyset=pub_keyset,
    issuer="https://api.magic.link",
    audience="https://auth.magic.link",
    registry=default_registry,
)

json.dumps(pub_keyset)
"""
{"keys": [{"crv": "P-256", "x": "RfHAtStGItpu2DUyifoOx2Q28GYNNw_6beO3hoIF2C4", "y": "eIKACTusYm6THys1YkB7mISu30gebWi1E-N6CsvPYXo", "d": "iaCwJ0d6HjAHyzSkCGIfa_hAXLj4d5P-QbLeojJ9lvM", "use": "sig", "kty": "EC", "kid": "lbTHbNZ0ToR8q22VAng_f0ohecrYoOWrt8BXj0A9sLc"}]}
"""

Sign/encode new JWT

header = signer.build_header()
"""
{'alg': 'ES256',
 'typ': 'JWT',
 'kid': 'lbTHbNZ0ToR8q22VAng_f0ohecrYoOWrt8BXj0A9sLc'}
"""

base_claims = signer.build_identity_claims(subject="device_profile_id", authorizing_party="auth_user_id")
"""
{'sub': 'device_profile_id', 'azp': 'auth_user_id'}
"""

claims = signer.build_claims(subject="user_id:001", scope="openid", expiry=3600, **base_claims)
"""
{'sub': 'device_profile_id',
 'azp': 'auth_user_id',
 'iss': 'https://api.magic.link',
 'aud': 'https://auth.magic.link',
 'scope': 'openid',
 'jti': '9799e77c-8011-41db-8921-646496e0872c',
 'iat': 1689018283,
 'exp': 1689021883}
"""

token = signer.encode(header, claims)
"""
'eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6ImxiVEhiTlowVG9SOHEyMlZBbmdfZjBvaGVjcllvT1dydDhCWGowQTlzTGMifQ.eyJzdWJqZWN0IjoidXNlcl9pZDowMDEiLCJzdWIiOiJkZXZpY2VfcHJvZmlsZV9pZCIsImF6cCI6ImF1dGhfdXNlcl9pZCIsImlzcyI6Imh0dHBzOi8vYXBpLm1hZ2ljLmxpbmsiLCJhdWQiOiJodHRwczovL2F1dGgubWFnaWMubGluayIsInNjb3BlIjoib3BlbmlkIiwianRpIjoiOTc5OWU3N2MtODAxMS00MWRiLTg5MjEtNjQ2NDk2ZTA4NzJjIiwiaWF0IjoxNjg5MDE4MjgzLCJleHAiOjE2ODkwMjE4ODN9.OYoORXx_HURQJiAZmW9XZXvqlu_nhkpF9_d2xPHaaGCd6SJg5a31Go6DpN-HW2l0S1z0Jy44zuHOnMvqPDfgkw'
"""

## Verify/Decode jwt
```python
decoded = verifier.decode(token)
claims_registry = verifier.build_claims_registry()
verifier.validate(token, claims_registry)

decoded.header
"""
{'alg': 'ES256',
 'typ': 'JWT',
 'kid': 'lbTHbNZ0ToR8q22VAng_f0ohecrYoOWrt8BXj0A9sLc'}
"""

decoded.claims
"""
{'subject': 'user_id:001',
 'sub': 'device_profile_id',
 'azp': 'auth_user_id',
 'iss': 'https://api.magic.link',
 'aud': 'https://auth.magic.link',
 'scope': 'openid',
 'jti': '9799e77c-8011-41db-8921-646496e0872c',
 'iat': 1689018283,
 'exp': 1689021883}
"""
import base64
import os.path
from contextlib import contextmanager
from datetime import datetime
from datetime import timezone
from hashlib import sha256
from typing import Optional
from uuid import uuid4
import yaml
from joserfc import jwt
from joserfc.jwk import KeySet
from joserfc.jws import JWSRegistry
from joserfc.jwt import JWTClaimsRegistry
class IdentityProvider:
def __init__(
self,
keyset: KeySet,
issuer: str,
audience: Optional[str] = None,
registry: JWSRegistry = default_registry,
):
self.keyset = keyset
self.issuer = issuer
self.audience = audience
self.registry = registry
@property
def claims_options(self):
return dict(
iss=dict(essential=True, value=issuer),
aud=dict(essential=True, values=[audience]),
)
@property
def public_jwkset(self):
return self.keyset.as_dict()
@contextmanager
def as_audience(self, audience: str):
old = self.audience
self.audience = audience
try:
yield
finally:
self.audience = old
def build_header(self, type: str = "jwt", **extra_header):
header = extra_header
if isinstance(self.keyset, KeySet):
priv_keys = [kp.kid for kp in self.keyset.keys if kp.is_private]
if len(priv_keys) >= 1:
key = self.keyset.get_by_kid(priv_keys[0])
else:
raise ValueError("No private key in keyset")
else:
key = self.keyset
key_type = key.key_type
if "alg" not in header:
if key_type == "EC":
header["alg"] = "ES256"
else:
header["alg"] = "RS256"
if "typ" not in header:
header["typ"] = type
if type.lower() == "dpop+jwt":
if "jwk" not in header:
header["jwk"] = key.to_dict()
else:
if "kid" not in header:
header["kid"] = key.kid
return header
def build_claims(
self, expiry: int = 3600, scope: Optional[str] = None, now=None, **extra_claims
):
claims = extra_claims
now = now or int(datetime.now(timezone.utc).timestamp())
claims["iss"] = self.issuer
claims["aud"] = self.audience
if scope and "scope" not in claims:
claims["scope"] = scope
if "jti" not in claims:
claims["jti"] = str(uuid4())
if "iat" not in claims:
claims["iat"] = now
if "exp" not in claims:
claims["exp"] = now + expiry
return claims
def build_identity_claims(self, subject: str, authorizing_party: Optional[str] = None):
claims = {
"sub": subject,
}
if authorizing_party:
claims["azp"] = authorizing_party
return claims
def build_dpop_claims(
self, http_method: str, http_uri: str, access_token: Optional[str] = None
):
claims = {
"htm": http_method.upper(),
"htu": http_uri,
"jti": str(uuid4()),
}
if access_token:
claims["ath"] = base64.urlsafe_b64encode(
sha256("access_token".encode()).digest()
).decode()
return claims
def encode(self, header, claims, key=None):
key = key or self.keyset
return jwt.encode(header, claims, key, registry=self.registry)
def decode(self, token):
return jwt.decode(token, self.keyset, registry=self.registry)
def build_claims_registry(self, now: Optional[int] = None, leeway: int = 0, **claims_options):
return JWTClaimsRegistry(
now=now,
leeway=leeway,
**self.claims_options,
**claims_options,
)
def validate(self, token, claims_registry=None):
claims_registry = claims_registry or self.build_claims_registry()
decoded = self.decode(token)
return claims_registry.validate(decoded.claims)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment