-
-
Save Et7f3/48d64b2c0483d5e9c1730817cfa11c07 to your computer and use it in GitHub Desktop.
sample for using a yubihsm2 with python requests
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
try: | |
from OpenSSL._util import ( | |
ffi as _ffi, | |
lib as O, | |
lib as S | |
) | |
pyopenssl = True | |
import sys | |
null = _ffi.NULL | |
# the engine loaded here has to have an ABI matching that of the openssl that | |
# is used by cryptography, that means it's probably openssl 1.1 | |
p11 = b"/tmp/openssl-p11/prefix/lib/engines-1.1/pkcs11.so" | |
except ImportError: | |
pyopenssl = False | |
import ctypes | |
O = ctypes.CDLL("/usr/lib/x86_64-linux-gnu/libcrypto.so") | |
S = ctypes.CDLL("/usr/lib/x86_64-linux-gnu/libssl.so") | |
null = None | |
p11 = b"/usr/lib/ssl/engines/libpkcs11.so" | |
O.ENGINE_load_dynamic() | |
e = O.ENGINE_by_id(b"dynamic") | |
print(e) | |
print(O.ENGINE_ctrl_cmd_string(e, b"SO_PATH", p11, 0)) | |
print(O.ENGINE_ctrl_cmd_string(e, b"LOAD", null, 0)) | |
# where to find the yubihsm_pkcs11.so | |
print(O.ENGINE_ctrl_cmd_string(e, b"MODULE_PATH", b"yubihsm_pkcs11.so", 0)) | |
# authenticate to the YubiHSM (id+password) | |
print(O.ENGINE_ctrl_cmd_string(e, b"PIN", b"0001password", 0)) | |
print(O.ENGINE_init(e)) | |
# load a key with id 0x0001 | |
key = O.ENGINE_load_private_key(e, b"0:0001", null, null) | |
import requests | |
from requests.adapters import HTTPAdapter | |
from requests.packages.urllib3.util.ssl_ import create_urllib3_context | |
from requests.packages.urllib3.poolmanager import PoolManager | |
class YubihsmAdapter(HTTPAdapter): | |
def init_poolmanager(self, connections, maxsize, block=False, *args, **kwargs): | |
context = create_urllib3_context() | |
if(pyopenssl): | |
# if pyopenssl is in use we can find the SSL_CTX in a semi-clean way | |
ctx = context._ctx._context | |
else: | |
# if the builtin ssl module is in use we have to jump blindly to the | |
# third element in the struct and rely on cpython returning the adress | |
# with id() on an object. | |
ctx = ctypes.c_void_p.from_address(id(context) + ctypes.sizeof(ctypes.c_void_p) * 2) | |
# point the SSL_CTX to a certificate matching our key | |
print(S.SSL_CTX_use_certificate_file(ctx, b"cert.pem", 1)) | |
# and load the key into the SSL_CTX | |
print(S.SSL_CTX_use_PrivateKey(ctx, key)) | |
kwargs['ssl_context'] = context | |
self.poolmanager = PoolManager( | |
num_pools=connections, maxsize=maxsize, | |
block=block, *args, **kwargs) | |
s = requests.Session() | |
# the URL "mounted" has to be longer than https:// and match whatever it is | |
# we're going to use for our requests | |
s.mount("https://127.0.0.1", YubihsmAdapter()) | |
print(s.get("https://127.0.0.1:8443", verify=False)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment