Created
December 7, 2024 19:26
-
-
Save ngerakines/6d0d834f206eb7af0aa798e188d89c36 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from multiformats import multibase, multicodec | |
from cryptography.hazmat.primitives.asymmetric import ec | |
from cryptography.hazmat.primitives.serialization import ( | |
Encoding, | |
PublicFormat, | |
PrivateFormat, | |
NoEncryption, | |
) | |
from cryptography.hazmat.primitives import hashes | |
from cryptography.exceptions import InvalidSignature | |
import dag_cbor | |
from base64 import urlsafe_b64encode, urlsafe_b64decode | |
private_key = ec.generate_private_key(ec.SECP256K1()) | |
private_key_bytes = private_key.private_bytes( | |
Encoding.PEM, PrivateFormat.PKCS8, NoEncryption() | |
) | |
def encode_public_key(private_key): | |
public_key = private_key.public_key() | |
public_key_bytes = public_key.public_bytes( | |
Encoding.X962, PublicFormat.CompressedPoint | |
) | |
multicodec_data = multicodec.wrap("secp256k1-pub", public_key_bytes) | |
return multibase.encode(multicodec_data, "base58btc") | |
def sign_record( | |
private_key, record, issuer_did, subject_did, subject_collection, **kwargs | |
): | |
if "signatures" in record: | |
del record["signatures"] | |
if "$sig" in record: | |
del record["$sig"] | |
_sig = {} | |
_sig.update(kwargs) | |
_sig.update( | |
{ | |
"issuer": issuer_did, | |
"did": subject_did, | |
"collection": subject_collection, | |
} | |
) | |
record["$sig"] = _sig | |
encoded_record = dag_cbor.encode(record) | |
signature = private_key.sign(encoded_record, ec.ECDSA(hashes.SHA256())) | |
del record["$sig"] | |
if "signatures" not in record: | |
record["signatures"] = [] | |
signature_info = {} | |
signature_info.update(kwargs) | |
signature_info.update( | |
{ | |
"issuer": issuer_did, | |
"signature": urlsafe_b64encode(signature).decode("utf-8"), | |
} | |
) | |
record["signatures"].append(signature_info) | |
return record | |
def gen_key_lookup(encoded_public_key): | |
def key_lookup(_issuer): | |
decoded_public_key = multibase.decode(encoded_public_key) | |
(codec, public_key_bytes) = multicodec.unwrap(decoded_public_key) | |
if codec.name != "secp256k1-pub": | |
return None | |
return ec.EllipticCurvePublicKey.from_encoded_point( | |
ec.SECP256K1(), public_key_bytes | |
) | |
return key_lookup | |
def verify_record(record, subject_did, subject_collection, key_lookup_fn): | |
if "signatures" not in record: | |
raise ValueError("Record does not contain any signatures") | |
signatures = record.pop("signatures") | |
for signature_info in signatures: | |
if "signature" not in signature_info: | |
raise ValueError("Signature does not contain a signature field") | |
if "issuer" not in signature_info: | |
raise ValueError("Signature does not contain a issuer field") | |
issuer_key = key_lookup_fn(signature_info["issuer"]) | |
if issuer_key is None: | |
raise ValueError("Could not find public key for issuer") | |
_sig = {} | |
_sig.update(signature_info) | |
_sig.update( | |
{ | |
"did": subject_did, | |
"collection": subject_collection, | |
} | |
) | |
del _sig["signature"] | |
combined_record = {**record, "$sig": _sig} | |
encoded_combined_record = dag_cbor.encode(combined_record) | |
signature = urlsafe_b64decode(signature_info["signature"]) | |
try: | |
issuer_key.verify( | |
signature, encoded_combined_record, ec.ECDSA(hashes.SHA256()) | |
) | |
return True | |
except InvalidSignature: | |
pass | |
return False | |
encoded_public_key = encode_public_key(private_key) | |
print("#### Private Key ####") | |
print(private_key_bytes.decode("utf8")) | |
did_document = { | |
"did": "did:web:issuer.com", | |
"verificationMethod": [ | |
{ | |
"id": "did:web:issuer.com#sigs", | |
"type": "Multikey", | |
"controller": "did:web:issuer.com", | |
"publicKeyMultibase": encoded_public_key, | |
} | |
], | |
} | |
print("#### DID Document ####") | |
print(json.dumps(did_document, indent=2)) | |
orig_record = {"$type": "contact", "name": "Alice"} | |
signing_extra = {} | |
# orig_record = {"$type": "status", "status": "Everything is fine"} | |
# signing_extra = { | |
# "signedAt": "2024-12-07T18:43:27.62Z", | |
# "revocation": "at://did:web:issuer.com/community.lexicon.attestation.revocation/jhrcw9j73e", | |
# "headlines": [ | |
# # https://www.reutersagency.com/feed/?best-types=the-big-picture&post_type=best | |
# "Global tech crackdown Mon, 20 Dec 2021 14:58:18 +0000", | |
# "COVID-19 series: The year of the vaccine Tue, 24 Aug 2021 14:09:30 +0000", | |
# ], | |
# } | |
signed_record = sign_record( | |
private_key, | |
orig_record, | |
"did:web:issuer.com", | |
"did:web:subject.com", | |
"collection", | |
**signing_extra | |
) | |
print("#### Signed Record ####") | |
print(json.dumps(signed_record, indent=2)) | |
key_lookup = gen_key_lookup(encoded_public_key) | |
verify_record(signed_record, "did:web:subject.com", "collection", key_lookup) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment