Last active
December 14, 2021 23:59
-
-
Save e7p/66dde9002fcc0cb197f3bcab7c3ce975 to your computer and use it in GitHub Desktop.
CovPass QR-Code analyzer port in Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import sys | |
import base45 | |
import zlib | |
import cose.messages.sign1message | |
from cose.keys import EC2Key | |
import cbor2 | |
import datetime | |
import base64 | |
import json | |
from cryptography.hazmat.primitives.serialization import load_pem_public_key | |
from cryptography.hazmat.primitives import hashes | |
from cryptography.hazmat.primitives.asymmetric import ec | |
from cryptography.hazmat.primitives.asymmetric.utils import encode_dss_signature | |
from cryptography.x509 import load_pem_x509_certificate | |
from cryptography.hazmat.backends.openssl.ec import _EllipticCurvePublicKey | |
# get signed certificate list here: https://de.dscg.ubirch.com/trustList/DSC/ | |
# adapted from de.rki.covpass.sdk.cert | |
class QRCoder: | |
@staticmethod | |
def decodeRawCose(qrContent): | |
if qrContent.startswith("HC1:"): | |
qrContent = qrContent.removeprefix("HC1:") | |
return zlib.decompress(base45.b45decode(qrContent)) | |
@staticmethod | |
def decodeCose(qr): | |
return cose.messages.sign1message.Sign1Message.decode(QRCoder.decodeRawCose(qr)) | |
@staticmethod | |
def decodeCovCert(qr): | |
return certValidator.decodeAndValidate(QRCoder.decodeCose(qr)) | |
class CBORWebToken: | |
@staticmethod | |
def decode(data): | |
cbor = cbor2.loads(data) | |
cwt = CBORWebToken() | |
cwt.issuer = cbor[1] | |
cwt.validFrom = datetime.datetime.fromtimestamp(cbor[6]) | |
cwt.validUntil = datetime.datetime.fromtimestamp(cbor[4]) | |
cwt.rawCbor = cbor | |
return cwt | |
class IllegalStateException(Exception): | |
pass | |
class Name: | |
@staticmethod | |
def decode(cbor_data): | |
n = Name() | |
n.givenName = cbor_data["gn"] if "gn" in cbor_data else None | |
n.familyName = cbor_data["fn"] if "fn" in cbor_data else None | |
n.givenNameTransliterated = cbor_data["gnt"] if "gnt" in cbor_data else None | |
n.familyNameTransliterated = cbor_data["fnt"] if "fnt" in cbor_data else None | |
return n | |
class Vaccination: | |
@staticmethod | |
def decode(cbor_data): | |
v = Vaccination() | |
v.targetDisease = cbor_data["tg"] | |
v.vaccineCode = cbor_data["vp"] | |
v.product = cbor_data["mp"] | |
v.manufacturer = cbor_data["ma"] | |
v.doseNumber = cbor_data["dn"] | |
v.totalSerialDoses = cbor_data["sd"] | |
v.occurrence = datetime.date.fromisoformat(cbor_data["dt"]) if "dt" in cbor_data else None | |
v.country = cbor_data["co"] | |
v.certificateIssuer = cbor_data["is"] | |
v.id = cbor_data["ci"] | |
return v | |
def isComplete(self): | |
return self.doseNumber == self.totalSerialDoses | |
def hasFullProtection(self): | |
return self.isComplete() and self.occurrence + datetime.timedelta(days=14) < datetime.date.today() | |
PRODUCT_COMIRNATY = "EU/1/20/1528" | |
PRODUCT_MODERNA = "EU/1/20/1507" | |
PRODUCT_VAXZEVRIA = "EU/1/21/1529" | |
PRODUCT_JANSSEN = "EU/1/20/1525" | |
class Test: | |
@staticmethod | |
def decode(cbor_data): | |
t = Test() | |
t.targetDisease = cbor_data["tg"] | |
t.testType = cbor_data["tt"] | |
t.testName = cbor_data["nm"] | |
t.manufacturer = cbor_data["ma"] | |
t.sampleCollection = datetime.date.fromisoformat(cbor_data["sc"]) if "sc" in cbor_data else None | |
t.testResult = cbor_data["tr"] | |
t.testingCentre = cbor_data["tc"] | |
t.country = cbor_data["co"] | |
t.certificateIssuer = cbor_data["is"] | |
t.id = cbor_data["ci"] | |
return t | |
def isPositive(self): | |
return self.testResult == Test.POSITIVE_RESULT | |
PCR_TEST = "LP6464-4" | |
ANTIGEN_TEST = "LP217198-3" | |
POSITIVE_RESULT = "260373001" | |
NEGATIVE_RESULT = "260415000" | |
PCR_TEST_EXPIRY_TIME_HOURS = 72 | |
ANTIGEN_TEST_EXPIRY_TIME_HOURS = 48 | |
class Recovery: | |
@staticmethod | |
def decode(cbor_data): | |
r = Recovery() | |
r.targetDisease = cbor_data["tg"] | |
r.firstResult = datetime.date.fromisoformat(cbor_data["fr"]) if "fr" in cbor_data else None | |
r.validFrom = datetime.date.fromisoformat(cbor_data["df"]) if "df" in cbor_data else None | |
r.validUntil = datetime.date.fromisoformat(cbor_data["du"]) if "du" in cbor_data else None | |
r.country = cbor_data["co"] | |
r.certificateIssuer = cbor_data["is"] | |
r.id = cbor_data["ci"] | |
return r | |
class CovCertificate: | |
@staticmethod | |
def decode(cbor_data): | |
cc = CovCertificate() | |
cc.issuer = "" | |
cc.validFrom = None | |
cc.validUntil = None | |
cc.name = Name.decode(cbor_data["nam"]) | |
cc.birthDate = datetime.date.fromisoformat(cbor_data["dob"]) | |
# According to latest EU specification the lists should not be nullable. | |
# But some countries use null values here, so we have to support it. | |
cc.vaccinations = [Vaccination.decode(v) for v in cbor_data["v"]] if "v" in cbor_data else None | |
cc.tests = [Test.decode(t) for t in cbor_data["t"]] if "t" in cbor_data else None | |
cc.recoveries = [Recovery.decode(r) for r in cbor_data["r"]] if "r" in cbor_data else None | |
cc.version = cbor_data["ver"] | |
return cc | |
def getDgcEntry(self): | |
if self.vaccinations: | |
return self.vaccinations[0] | |
elif self.tests: | |
return self.tests[0] | |
elif self.recoveries: | |
return self.recoveries[0] | |
else: | |
raise IllegalStateException("CovCertificates without any DGCEntries are not allowed.") | |
def getFullName(self): | |
return " ".join([self.name.givenName if self.name.givenName else self.name.givenNameTransliterated, self.name.familyName if self.name.familyName else self.name.familyNameTransliterated]) | |
def getFullNameReverse(self): | |
return " ".join([self.name.familyName if self.name.familyName else self.name.familyNameTransliterated, self.name.givenName if self.name.givenName else self.name.givenNameTransliterated]) | |
def getValidDate(self): | |
return self.vaccinations[0].occurrence + datetime.timedelta(days=15) | |
class ExpiredCwtException(Exception): | |
pass | |
class BadCoseSignatureException(Exception): | |
pass | |
class NoMatchingExtendedKeyUsageException(Exception): | |
pass | |
class CertValidator: | |
def __init__(self, trusted, cbor=None): | |
self.trustedCerts = set(trusted) | |
self.kidToCerts = {base64.b64decode(v.kid): v for v in self.trustedCerts} | |
def findByKid(self, kid): | |
return [self.kidToCerts[kid]] if kid in self.kidToCerts else [] | |
@staticmethod | |
def decodeCovCert(cwt): | |
return CovCertificate.decode(cwt.rawCbor[-260][1]) | |
vaccinationCertOids = { | |
"1.3.6.1.4.1.1847.2021.1.2", | |
"1.3.6.1.4.1.0.1847.2021.1.2" | |
} | |
testCertOids = { | |
"1.3.6.1.4.1.1847.2021.1.1", | |
"1.3.6.1.4.1.0.1847.2021.1.1" | |
} | |
recoveryCertOids = { | |
"1.3.6.1.4.1.1847.2021.1.3", | |
"1.3.6.1.4.1.0.1847.2021.1.3" | |
} | |
allCertOids = vaccinationCertOids | testCertOids | recoveryCertOids | |
@staticmethod | |
def checkCertOid(cert, dgcEntry): | |
extendedKeyUsageIntersect = set() if not cert.extensions else {e for e in cert.extensions if e.oid.dotted_string in CertValidator.allCertOids} | |
if not extendedKeyUsageIntersect: | |
return True | |
if type(dgcEntry) == Vaccination: | |
return bool(extendedKeyUsageIntersect & CertValidator.vaccinationCertOids) | |
elif type(dgcEntry) == Test: | |
return bool(extendedKeyUsageIntersect & CertValidator.testCertOids) | |
else: | |
return bool(extendedKeyUsageIntersect & CertValidator.recoveryCertOids) | |
@staticmethod | |
def decodeAndValidateCertificate(cwt, cert): | |
covCertificate = CertValidator.decodeCovCert(cwt) | |
if not CertValidator.checkCertOid(cert, covCertificate.getDgcEntry()): | |
raise NoMatchingExtendedKeyUsageException | |
covCertificate.issuer = cwt.issuer | |
covCertificate.validFrom = cwt.validFrom | |
covCertificate.validUntil = cwt.validUntil | |
return covCertificate | |
def decodeAndValidate(self, cose_sign1): | |
cwt = CBORWebToken.decode(cose_sign1.payload) | |
if (cwt.validUntil < datetime.datetime.now()): | |
raise ExpiredCwtException | |
try: | |
kid = cose_sign1.phdr[cose.headers.KID] | |
except KeyError: | |
kid = cose_sign1.uhdr[cose.headers.KID] | |
certs = self.findByKid(kid) | |
if not certs: | |
certs = self.trustedCerts | |
for cert in certs: | |
if cert.certificate.not_valid_before > datetime.datetime.now() or cert.certificate.not_valid_after < datetime.datetime.now(): | |
continue | |
# Validate the COSE signature | |
public_key = cert.certificate.public_key() | |
if type(public_key) != _EllipticCurvePublicKey: | |
continue | |
x = public_key.public_numbers().x.to_bytes(32, "big") | |
y = public_key.public_numbers().y.to_bytes(32, "big") | |
cose_key = EC2Key(crv='P_256', x=x, y=y, optional_params={'ALG': 'Es256'}) | |
cose_sign1.key = cose_key | |
try: | |
if cose_sign1.verify_signature(): | |
return CertValidator.decodeAndValidateCertificate(cwt, cert.certificate) | |
except cose.exceptions.CoseIllegalAlgorithm: | |
continue | |
print("WARN: Could not verify signature successfully!") | |
return CertValidator.decodeAndValidateCertificate(cwt, cert.certificate) | |
#raise BadCoseSignatureException | |
class TrustedCert: | |
def __init__(self, country, kid, certificate): | |
self.country = country | |
self.kid = kid | |
self.certificate = certificate | |
class DscListDecoder: | |
def __init__(self, publicKey): | |
self.publicKey = publicKey | |
def decodeDscList(self, data): | |
encodedSignature = data[:data.index("{")] | |
signature = base64.b64decode(encodedSignature) | |
r = int.from_bytes(signature[:len(signature)//2], byteorder="big", signed=False) | |
s = int.from_bytes(signature[len(signature)//2:], byteorder="big", signed=False) | |
signature = encode_dss_signature(r, s) | |
trustedList = data[len(encodedSignature):].strip() | |
self.publicKey.verify(signature, trustedList.encode(), ec.ECDSA(hashes.SHA256())) | |
return json.loads(trustedList) | |
def toTrustedCerts(self, dscList): | |
for dscEntry in dscList["certificates"]: | |
certificate = load_pem_x509_certificate(f"-----BEGIN CERTIFICATE-----\n{dscEntry['rawData']}\n-----END CERTIFICATE-----".encode()) | |
if not certificate: | |
print(f"DSC list contains invalid X509Certificate for kid {dscEntry['kid']}") | |
continue | |
yield TrustedCert(dscEntry["country"], dscEntry["kid"], certificate) | |
manufacturerDict = { | |
"ORG-100001699": "AstraZeneca AB", | |
"ORG-100030215": "Biontech Manufacturing GmbH", | |
"ORG-100031184": "Moderna Biotech Spain S.L.", | |
"ORG-100006270": "Curevac AG", | |
"ORG-100013793": "CanSino Biologics", | |
"ORG-100020693": "China Sinopharm International Corp. - Beijing location", | |
"ORG-100010771": "Sinopharm Weiqida Europe Pharmaceutical s.r.o. - Prague location", | |
"ORG-100024420": "Sinopharm Zhijun (Shenzhen) Pharmaceutical Co. Ltd. - Shenzhen location", | |
"ORG-100032020": "Novavax CZ AS", | |
"Gamaleya-Research-Institute": "Gamaleya Research Institute", | |
"Vector-Institute": "Vector Institute", | |
"Sinovac-Biotech": "Sinovac Biotech", | |
"Bharat-Biotech": "Bharat Biotech" | |
} | |
productDict = { | |
"EU/1/20/1528": "Comirnaty", | |
"EU/1/20/1507": "COVID-19 Vaccine Moderna", | |
"EU/1/21/1529": "Vaxzevria", | |
"EU/1/20/1525": "COVID-19 Vaccine Janssen", | |
"CVnCoV": "CVnCoV", | |
"Sputnik-V": "Sputnik-V", | |
"Convidecia": "Convidecia", | |
"EpiVacCorona": "EpiVacCorona", | |
"BBIBP-CorV": "BBIBP-CorV", | |
"Inactivated-SARS-CoV-2-Vero-Cell": "Inactivated SARS-CoV-2 (Vero Cell)", | |
"CoronaVac": "CoronaVac", | |
"Covaxin": "Covaxin (also known as BBV152 A, B, C)" | |
} | |
vaccineDict = { | |
"1119349007": "SARS-CoV-2 mRNA vaccine", | |
"1119305005": "SARS-CoV-2 antigen vaccine", | |
"J07BX03": "covid-19 vaccines" | |
} | |
testDict = { | |
"LP6464-4": "Nucleic acid amplification with probe detection (PCR)", | |
"LP217198-3": "Rapid immunoassay (Antigen)" | |
} | |
testManufacturerDict = { | |
"1232": "Abbott Rapid Diagnostics, Panbio COVID-19 Ag Test", | |
"1304": "AMEDA Labordiagnostik GmbH, AMP Rapid Test SARS-CoV-2 Ag", | |
"1065": "Becton Dickinson, Veritor System Rapid Detection of SARS-CoV-2", | |
"1331": "Beijing Lepu Medical Technology Co., Ltd, SARS-CoV-2 Antigen Rapid Test Kit", | |
"1484": "Beijing Wantai Biological Pharmacy Enterprise Co., Ltd, Wantai SARS-CoV-2 Ag Rapid Test (FIA)", | |
"1242": "Bionote, Inc, NowCheck COVID-19 Ag Test", | |
"1223": "BIOSYNEX SWISS SA, BIOSYNEX COVID-19 Ag BSS", | |
"1173": "CerTest Biotec, S.L., CerTest SARS-CoV-2 Card test", | |
"1244": "GenBody, Inc, Genbody COVID-19 Ag Test", | |
"1360": "Guangdong Wesail Biotech Co., Ltd, COVID-19 Ag Test Kit", | |
"1363": "Hangzhou Clongene Biotech Co., Ltd, Covid-19 Antigen Rapid Test Kit", | |
"1767": "Healgen Scientific Limited Liability Company, Coronavirus Ag Rapid Test Cassette", | |
"1333": "Joinstar Biomedical Technology Co., Ltd, COVID-19 Rapid Antigen Test (Colloidal Gold)", | |
"1268": "LumiraDX UK Ltd, LumiraDx SARS-CoV-2 Ag Test", | |
"1180": "MEDsan GmbH, MEDsan SARS-CoV-2 Antigen Rapid Test", | |
"1481": "MP Biomedicals Germany GmbH, Rapid SARS-CoV-2 Antigen Test Card", | |
"1162": "Nal von minden GmbH, NADAL COVID-19 Ag Test", | |
"1271": "Precision Biosensor, Inc, Exdia COVID-19 Ag", | |
"1341": "Qingdao Hightop Biotech Co., Ltd, SARS-CoV-2 Antigen Rapid Test (Immunochromatography)", | |
"1097": "Quidel Corporation, Sofia SARS Antigen FIA", | |
"1489": "Safecare Biotech (Hangzhou) Co. Ltd, COVID-19 Antigen Rapid Test Kit (Swab)", | |
"344": "SD BIOSENSOR Inc, STANDARD F COVID-19 Ag FIA", | |
"345": "SD BIOSENSOR Inc, STANDARD Q COVID-19 Ag Test", | |
"1218": "Siemens Healthineers, CLINITEST Rapid Covid-19 Antigen Test", | |
"1278": "Xiamen Boson Biotech Co. Ltd, Rapid SARS-CoV-2 Antigen Test Card", | |
"1343": "Zhejiang Orient Gene Biotech, Coronavirus Ag Rapid Test Cassette (Swab)" | |
} | |
diseaseDict = { | |
"840539006": "COVID-19" | |
} | |
def printEntry(entry): | |
if type(entry) == Vaccination: | |
print("= Vaccination Certificate =") | |
print(" Certificate Issuer:", entry.certificateIssuer) | |
print(" Country: ", entry.country) | |
print(" Dose: ", entry.doseNumber, "/", entry.totalSerialDoses) | |
print(" Occurrence: ", entry.occurrence) | |
print(" Full protection: ", entry.hasFullProtection()) | |
print(" Manufacturer: ", manufacturerDict[entry.manufacturer] if entry.manufacturer in manufacturerDict else entry.manufacturer) | |
print(" Product: ", productDict[entry.product] if entry.product in productDict else entry.product) | |
print(" Target Disease: ", diseaseDict[entry.targetDisease] if entry.targetDisease in diseaseDict else entry.targetDisease) | |
print(" Vaccine Code: ", vaccineDict[entry.vaccineCode] if entry.vaccineCode in vaccineDict else entry.vaccineCode) | |
print(" ID: ", entry.id) | |
elif type(entry) == Test: | |
print("= Test Certificate =") | |
print(" Certificate Issuer:", entry.certificateIssuer) | |
print(" Country: ", entry.country) | |
print(" Target Disease: ", diseaseDict[entry.targetDisease] if entry.targetDisease in diseaseDict else entry.targetDisease) | |
print(" Test Type: ", testDict[entry.testType] if entry.testType in testDict else entry.testType) | |
print(" Test Name: ", entry.testName) | |
print(" Manufacturer: ", testManufacturerDict[entry.manufacturer] if entry.manufacturer in testManufacturerDict else entry.manufacturer) | |
print(" Sample Collection: ", entry.sampleCollection) | |
print(" Test Result: ", "Positive" if entry.isPositive() else "Negative") | |
print(" Testing Centre: ", entry.testingCentre) | |
print(" ID: ", entry.id) | |
else: | |
print("= Recovery Certificate =") | |
print(" Certificate Issuer:", entry.certificateIssuer) | |
print(" Country: ", entry.country) | |
print(" Target Disease: ", diseaseDict[entry.targetDisease] if entry.targetDisease in diseaseDict else entry.targetDisease) | |
print(" First Result: ", entry.firstResult) | |
print(" Valid From: ", entry.validFrom) | |
print(" Valid Until: ", entry.validUntil) | |
print(" ID: ", entry.id) | |
if __name__ == "__main__": | |
key = load_pem_public_key(open("covpass-sdk/dsc-list-signing-key.pem", "rb").read()) | |
dscListDecoder = DscListDecoder(key) | |
dscList = dscListDecoder.decodeDscList(open("covpass-sdk/dsc-list.json").read()) | |
trustedCerts = list(dscListDecoder.toTrustedCerts(dscList)) | |
certValidator = CertValidator(trustedCerts) | |
if len(sys.argv) > 1: | |
qr = open(sys.argv[1]) | |
else: | |
qr = sys.stdin | |
qrData = qr.read().rstrip() | |
covCertificate = QRCoder.decodeCovCert(qrData) | |
print("Name: ", covCertificate.getFullName()) | |
print("Birthday: ", covCertificate.birthDate) | |
print("Valid from: ", covCertificate.validFrom) | |
print("Valid until:", covCertificate.validUntil) | |
print("Version: ", covCertificate.version) | |
print("Issuer: ", covCertificate.issuer) | |
numVacc = len(covCertificate.vaccinations) if covCertificate.vaccinations else 0 | |
numTest = len(covCertificate.tests) if covCertificate.tests else 0 | |
numReco = len(covCertificate.recoveries) if covCertificate.recoveries else 0 | |
print("Content: ", numVacc, "Vaccinations,", numTest, "Tests,", numReco, "Recoveries") | |
entry = covCertificate.getDgcEntry() | |
printEntry(entry) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment