Created
August 28, 2018 14:26
-
-
Save miohtama/da7c7f1d790241688355e12a82345703 to your computer and use it in GitHub Desktop.
Python client for Civic sign in and proof of identity
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import datetime | |
import hashlib | |
import hmac | |
import json | |
import uuid | |
import logging | |
import time | |
from requests import Session | |
import binascii | |
from urllib.parse import quote_plus | |
import ecdsa | |
import jwt | |
import requests | |
from cryptography.hazmat.primitives.asymmetric.ec import derive_private_key, SECP256R1, SECP256K1 | |
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes | |
from cryptography.hazmat.backends import default_backend | |
from ecdsa import SECP256k1 | |
from websauna.system.core.utils import get_secrets | |
logger = logging.getLogger(__name__) | |
#: Share the same HTTPS connection to Civic servers across the threads in this process | |
_session = requests.Session() | |
def make_civic_sip(request): | |
"""Create a Civic client instance. | |
Load private keys from Websaunas secrets subsystem. | |
""" | |
secrets = get_secrets(request.registry) | |
app_id = secrets.get("civic_kyc.app_id") | |
private_key_hex = secrets.get("civic_kyc.private_signing_key") | |
app_secret = secrets.get("civic_kyc.secret") | |
return CivicSIP(_session, app_id, private_key_hex, app_secret) | |
class CivicSIP: | |
"""Civic SIP client. | |
.. note :: | |
This is a proof of concept level code, not really meant to be used as an example. | |
Example login:: | |
if not spoof_civic: | |
# How to call Civic for real | |
token = request.POST["token"] | |
logger.info("Received Civic token data %s", token) | |
# Use Civic SIP to exchange the token to a full data | |
sip = make_civic_sip(request) | |
civic_encoded_token = sip.exchange_token(token) | |
decrypted = sip.verify_and_decrypt(civic_encoded_token) | |
civic_user_data = CivicSIP.extract_user_data(decrypted) | |
else: | |
# Integration test, hardcoded data, when you try to mock up Civic for your website tests | |
civic_user_data = {'email': '[email protected]', 'phone_number': '+1 555 111 2222'} | |
Example proof of identity:: | |
token = self.request.POST["token"] # Get the token from JavaScript callback | |
sip = make_civic_sip(self.request) | |
civic_encoded_token = sip.exchange_token(token) | |
decrypted = sip.verify_and_decrypt(civic_encoded_token) | |
extracted = CivicSIP.extract_kyc_data(decrypted) | |
""" | |
def __init__(self, session: Session, app_id, private_key_hex, app_secret, base_url='https://api.civic.com/sip', env="prod"): | |
""" | |
:param session: requests.Session to use to maintain HTTPS keep-alive | |
:param app_id: From integrate.civic.com | |
:param private_key_hex: From integrate.civic.com | |
:param app_secret: From integrate.civic.com | |
:param base_url: You may want to change for Civic internal/beta versions | |
:param env: You may want to change for Civic internal/beta versions | |
""" | |
self.session = session | |
self.base_url = base_url | |
self.env = env | |
self.app_id = app_id | |
self.private_key_hex = private_key_hex | |
self.app_secret = app_secret | |
self.token_expiration = 3*60 | |
@staticmethod | |
def create_token(issuer, audience, subject, expires_in, payload, private_key_hex, iat=None, jti=None, exp=None): | |
"""Mimicked from jwt.js.""" | |
if not exp: | |
exp = datetime.datetime.utcnow() + datetime.timedelta(seconds=expires_in) | |
if not jti: | |
jti = str(uuid.uuid4()) | |
content = { | |
"jti": jti, | |
"iat": iat, | |
"exp": exp, | |
"iss": issuer, | |
"aud": audience, | |
"sub": subject, | |
"data": payload, | |
} | |
# Took some time to figure out this, so please you who are reading this appreciate it a bit | |
exp = int(private_key_hex, 16) | |
key = derive_private_key(exp, SECP256R1(), default_backend()) | |
headers = {"alg":"ES256","typ":"JWT"} | |
encoded = jwt.encode(content, key, headers=headers, algorithm='ES256') | |
return encoded | |
@staticmethod | |
def create_civic_ext(app_secret, body: dict=None, body_str: str=None): | |
"""No idea what Civic coders have been smoking in Palo Alto.""" | |
if body: | |
# Must NOT contain a space inside JSON blob after :, as that is how Civic calculated the mac | |
body_str = json.dumps(body, separators=(',', ':')) | |
else: | |
assert body_str | |
dig = hmac.new(app_secret.encode("ascii"), body_str.encode("ascii"), digestmod=hashlib.sha256).digest() | |
return base64.b64encode(dig).decode() | |
@staticmethod | |
def decrypt_civic(tx_msg, key): | |
"""From basicCrypto.js | |
Some sort of Civic home made AES encryption trick. | |
https://cryptography.io/en/latest/hazmat/primitives/symmetric-encryption/#cryptography.hazmat.primitives.ciphers.Cipher | |
""" | |
iv_start = 0 | |
msg_start = 32 | |
key_b = binascii.unhexlify(key) | |
iv = binascii.unhexlify(tx_msg[iv_start:msg_start]) | |
encrypted_msg_part = base64.b64decode(tx_msg[msg_start:]) | |
backend = default_backend() | |
cipher = Cipher(algorithms.AES(key_b), modes.CBC(iv), backend=backend) | |
decryptor = cipher.decryptor() | |
decrypted = decryptor.update(encrypted_msg_part) + decryptor.finalize() | |
decrypted = decrypted.decode("utf-8") | |
# Some extra characters by AES padding? | |
# '[{"label":"documents.genericId.type",...\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b' | |
# We need to manually find } character | |
for i in range(1, 64): | |
c = decrypted[-i] | |
if c in ("}", "]"): | |
endpoint = i | |
break | |
else: | |
endpoint = 0 | |
decrypted = decrypted[0:-endpoint+1] | |
return json.loads(decrypted) | |
def make_authorization_header(self, path, method, body): | |
""" | |
Creates the authorization header as an extended Civic JWT Token. | |
The token format: Civic requestToken.extToken where requestToken certifies the service path, method and audience, and extToken certifies the request body. | |
The token is signed by the application secret. | |
""" | |
payload = { | |
"method": method, | |
"path": path, | |
} | |
token = CivicSIP.create_token(self.app_id, self.base_url, self.app_id, time.time() + self.token_expiration, payload, self.private_key_hex) | |
extension = CivicSIP.create_civic_ext(self.app_secret, body) | |
token = token.decode("ascii") | |
return "Civic {}.{}".format(token, extension) | |
def exchange_token(self, jwt_token: dict, path="scopeRequest/authCode"): | |
"""Exchange authorization code in the form of a JWT Token for the user data requested in the scope request.""" | |
endpoint = self.base_url + "/" + self.env + "/" + path | |
body = { | |
"authToken": jwt_token, | |
} | |
body_str = json.dumps(body) | |
headers = { | |
"Accept": "application/json", | |
"Content-type": "application/json", | |
"Authorization": self.make_authorization_header(path=path, method="POST", body=body) | |
} | |
logger.info("Posting to %s, headers %s, body %s", endpoint, headers, body_str) | |
resp = self.session.post(endpoint, headers=headers, data=body_str) | |
resp.raise_for_status() | |
data = resp.json() | |
# {'alg': 'aes', 'userId': '90d9d100b9748ad8c218e9b0bd2e0231ea97fa7deb126e63407bfb733e93d8c8', 'encrypted': True, 'data': 'eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9...'} | |
civic_encoded = data["data"] | |
return civic_encoded | |
def verify_and_decrypt(self, civic_encoded_token: str) -> dict: | |
"""Make sense about Civic response payload.""" | |
#: TODO Add SECP256R1 verification | |
decoded = jwt.decode(civic_encoded_token, verify=False) | |
decrypted = CivicSIP.decrypt_civic(decoded["data"], self.app_secret) | |
return decrypted | |
@staticmethod | |
def extract_user_data(decoded_data: dict) -> dict: | |
"""Extract user id, email and phone number. | |
:param data: Decoded and decrypted | |
E.g. {'codeToken': 'fb2c86cd-033a-461a-963f-4fc113236764'}, 'iat': 1519833580.009, 'sub': 'HJ1KSVE_f', 'aud': 'https://api.civic.com/sip/', 'exp': 1519835380.009, 'jti': '4e1bbeea-6626-4fff-9f0d-20dd0c791c5d', 'iss': 'civic-sip-hosted-service'} | |
""" | |
extracted = { | |
} | |
for entry in decoded_data: | |
if entry["label"] == "contact.personal.email": | |
extracted["email"] = entry["value"] | |
elif entry["label"] == "contact.personal.phoneNumber": | |
extracted["phone_number"] = entry["value"] | |
return extracted | |
@staticmethod | |
def extract_kyc_data(decoded_data: dict) -> dict: | |
"""Extract photo, etc. data""" | |
# You may or may not want to blacklist the inline image data as it is a big blob | |
# blacklist = ("documents.genericId.image",) | |
blacklist = (), | |
extracted = { | |
} | |
for tuple in decoded_data: | |
label = tuple["label"] | |
# Putting 300kb of image data to funny places like JSONB and log files is no good | |
if label in blacklist: | |
continue | |
extracted[label] = { | |
"value": tuple["value"], | |
"is_valid": tuple["isValid"], | |
"is_owner": tuple["isOwner"], | |
} | |
return extracted |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment