Created
February 9, 2021 21:43
-
-
Save benob/72d3a2fdefca3a586aa3dd6400892080 to your computer and use it in GitHub Desktop.
self-signed certificate verifying client in nim
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import openssl | |
import ssl_config | |
from ssl_certs import scanSSLCertificates | |
import nativesockets | |
import uri | |
import os | |
import strutils | |
type Socket = ref object | |
fd: SocketHandle | |
handle: SslPtr | |
context: SslCtx | |
proc addSslContext(socket: Socket) = | |
socket.context = SSL_CTX_new(SSLv23_method()) | |
if socket.context.SSL_CTX_set_cipher_list(CiphersIntermediate) != 1: | |
echo "failed to set cipher list" | |
if socket.context.SSL_CTX_set_ecdh_auto(1) != 1: | |
echo "failed to set ecdh auto" | |
# https://www.openssl.org/docs/man1.0.2/man3/SSL_CTX_set_verify.html | |
# as per documentation, in client mode, SSL_VERIFY_NONE allows certificate to be verified with SSL_get_verify_result | |
socket.context.SSL_CTX_set_verify(SSL_VERIFY_NONE, nil) | |
for fn in scanSSLCertificates(): | |
if socket.context.SSL_CTX_load_verify_locations(fn, "") == 0: | |
break | |
discard socket.context.SSLCTXSetMode(SSL_MODE_AUTO_RETRY) | |
proc newSocket(): Socket = | |
result = Socket() | |
result.fd = createNativeSocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, false) | |
result.addSslContext() | |
proc connect(socket: Socket, address: string, port: Port) = | |
var aiList = getAddrInfo(address, port, AF_INET) | |
var it = aiList | |
var success = false | |
while it != nil: | |
echo getAddrString(it.ai_addr) | |
if connect(socket.fd, it.ai_addr, it.ai_addrlen.SockLen) == 0'i32: | |
success = true | |
break | |
it = it.ai_next | |
freeaddrinfo(aiList) | |
proc sslHandShake(socket: Socket, serverName: string) = | |
socket.handle = SSL_new(socket.context) | |
if SSL_set_fd(socket.handle, socket.fd) != 1: | |
echo "could not set fd" | |
discard socket.handle.SSL_set_tlsext_host_name(serverName) | |
if socket.handle.SSL_connect() <= 0: | |
echo "failed to ssl connect" | |
proc close(socket: Socket) = | |
socket.fd.close() | |
# convert certificate to string to view it | |
proc X509_print(bp: BIO, x: PX509): cint {.cdecl, dynlib: "libssl.so", importc.} | |
proc `$`(certificate: PX509): string = | |
if certificate.isNil: | |
return "(nil)" | |
result = newString(100_000) | |
var bp = bioNew(bioSMem()) | |
if X509_print(bp, certificate) != 0.cint: | |
discard BIO_read(bp, result[0].addr, result.len.cint) | |
discard BIO_free(bp) | |
# check if certificate is self-signed | |
# approach 1: duplicate self-certificate checking from libssl | |
# https://stackoverflow.com/questions/56763385/determine-if-ssl-certificate-is-self-signed-using-python | |
proc X509_check_akid(issuer: PX509, akid: pointer): cint {.cdecl, dynlib: "libssl.so", importc.} | |
proc X509_get_ext_d2i(x: PX509, nid: cint, crit: ptr[cint], idx: ptr[cint]): pointer {.cdecl, dynlib: "libssl.so", importc.} | |
const NID_authority_key_identifier = 90 | |
proc X509_NAME_cmp(a: PX509_NAME, b: PX509_NAME): cint {.cdecl, dynlib: "libssl.so", importc.} | |
proc isSelfSignedOld(x: PX509): bool = | |
if x.isNil: | |
return false | |
if X509_NAME_cmp(X509_get_subject_name(x), X509_get_issuer_name(x)) == 0: | |
let akid = X509_get_ext_d2i(x, NID_authority_key_identifier, nil, nil) | |
# missing check that uses flags from x509 structure: | |
##define ku_reject(x, usage) \ | |
# (((x)->ex_flags & EXFLAG_KUSAGE) && !((x)->ex_kusage & (usage))) | |
if X509_check_akid(x, akid) == X509_V_OK: # and not ku_reject(x, KU_KEY_CERT_SIGN): | |
return true | |
return false | |
# approach2: use SSL_get_verify_result | |
# works even when using SSL_VERIFY_PEER | |
proc isSelfSigned(handle: SslPtr): bool = | |
let r = handle.SSL_get_verify_result() | |
return r == X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT or r == X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN | |
# verify validity if SSL_VERIFY_NONE was set | |
proc verifyCertificate(handle: SslPtr): bool = | |
return handle.SSL_get_verify_result() == X509_V_OK | |
let url = parseUri(paramStr(1)) | |
var socket = newSocket() | |
socket.connect(url.hostname, Port(url.port.parseInt)) | |
socket.sslHandShake(url.hostname) | |
let certificate = socket.handle.SSL_get_peer_certificate() | |
echo $certificate | |
if socket.handle.verifyCertificate(): | |
echo "certificate verified by CA" | |
elif socket.handle.isSelfSigned(): | |
echo "self-signed certificate, you will have to use a TOFU list" | |
socket.close() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment