Skip to content

Instantly share code, notes, and snippets.

@zbyte64
Created June 21, 2014 01:13
Show Gist options
  • Save zbyte64/e3d311b2bcc2069aa7fa to your computer and use it in GitHub Desktop.
Save zbyte64/e3d311b2bcc2069aa7fa to your computer and use it in GitHub Desktop.
Attempt to grab peer certificate when serving
from OpenSSL import crypto, SSL
from socket import gethostname
import socket
import os
import re
from pprint import pprint
from time import gmtime, mktime
from os.path import exists, join
CERT_FILE = "myapp.crt"
KEY_FILE = "myapp.key"
cert_dir = './certs/'
def create_self_signed_cert(certname):
"""
If datacard.crt and datacard.key don't exist in cert_dir, create a new
self-signed cert and keypair and write them into that directory.
"""
cert_file = '%s.cert' % certname
key_file = '%s.pkey' % certname
if (not exists(join(cert_dir, cert_file))
or not exists(join(cert_dir, key_file))):
# create a key pair
k = crypto.PKey()
k.generate_key(crypto.TYPE_RSA, 1024)
# create a self-signed cert
cert = crypto.X509()
cert.get_subject().C = "US"
cert.get_subject().ST = "Minnesota"
cert.get_subject().L = "Minnetonka"
cert.get_subject().O = "my company"
cert.get_subject().OU = "my organization"
cert.get_subject().CN = gethostname()
cert.set_serial_number(1000)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(10*365*24*60*60)
cert.set_issuer(cert.get_subject())
cert.set_pubkey(k)
cert.sign(k, 'sha1')
open(join(cert_dir, cert_file), "wb").write(
crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
open(join(cert_dir, key_file), "wb").write(
crypto.dump_privatekey(crypto.FILETYPE_PEM, k))
def get_context(certname):
def callback_function(conn, cert, errno, depth, result):
print("cb", certname, result)
if depth == 0 and (errno == 9 or errno == 10):
return False # or raise Exception("Certificate not yet valid or expired")
pub_key = cert.get_pubkey()
issuer = cert.get_issuer()
common_name = cert.get_subject()
print("[%s] Peer Subect:"%certname, dict(common_name.get_components()))
print("[%s] Peer Issuer:"%certname, dict(issuer.get_components()))
print("[%s] Peer PUB Key:"%certname, crypto.dump_privatekey(crypto.FILETYPE_PEM,pub_key))
return True
context = SSL.Context(SSL.TLSv1_METHOD) # Use TLS Method
context.set_options(SSL.OP_NO_SSLv2) # Don't accept SSLv2
#context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT | SSL.VERIFY_CLIENT_ONCE, callback_function)
context.set_verify(SSL.VERIFY_FAIL_IF_NO_PEER_CERT | SSL.VERIFY_CLIENT_ONCE, callback_function)
#context.set_verify(SSL.VERIFY_NONE, callback_function)
context.set_session_id("My_experimental_AJAX_Server")
create_self_signed_cert(certname)
context.use_privatekey_file (os.path.join(cert_dir, '%s.pkey' % certname))
context.use_certificate_file(os.path.join(cert_dir, '%s.cert' % certname))
#context.load_verify_locations(ca_file, ca_path)
print("generated", certname)
return context
def make_payload(headers):
return '\r\n'.join(headers) + '\r\n'
def client(ip_addr, port):
import time
context = get_context('client')
payload = make_payload([
'GET / HTTP/1.1',
'Host: %s'%ip_addr,
'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language: en-US,en;q=0.5',
'Accept-Encoding: gzip, deflate',
'DNT: 1',
'Connection: keep-alive',
])
sock = socket.socket()
ssl_sock = SSL.Connection(context, sock)
#ssl_sock.setblocking(True)
ssl_sock.connect((ip_addr, port))
ssl_sock.do_handshake()
ssl_sock.send(payload)
#TODO figure out how to read properly
print("time to read")
#ssl_sock.listen(0)
data = ssl_sock.recv(1024)
print("cl:", data)
response = ""
while len(data):
print('cl:', data)
response += data
data = sock.recv(1024)
print('[CLIENT]', response)
ssl_sock.close()
from socketserver import BaseServer
from wsgiref.simple_server import WSGIRequestHandler, WSGIServer
from OpenSSL import SSL
import io
class SSLIO(io.IOBase):
def __init__(self, connection, mode='rwb', bufsize=1024):
self.connection = connection
self.mode = mode
self.bufsize = bufsize
self._closed = False
@property
def closed(self):
return self._closed
def close(self):
#self.connection.shutdown()
self.connection.close()
self._closed = True
def fileno(self):
raise OSError
def flush(self):
pass
def isatty(self):
return False
def readable(self):
return 'r' in self.mode and self.connection.want_read()
def seekable(self):
return False
def writable(self):
return 'w' in self.mode
def read(self, bufsize=None):
if self.closed:
return b''
if bufsize is None:
bufsize = self.bufsize
try:
return self.connection.recv(bufsize)
except SSL.ZeroReturnError:
self._closed = True
return b''
except SSL.SysCallError:
self._closed = True
return b''
def write(self, data):
return self.connection.send(data)
class SecureHTTPServer(WSGIServer):
def __init__(self, server_address, HandlerClass):
WSGIServer.__init__(self, server_address, HandlerClass)
context = get_context('server')
self.socket = SSL.Connection(context, socket.socket(self.address_family,
self.socket_type))
self.server_bind()
self.server_activate()
def shutdown_request(self, request):
try:
request.shutdown()
except:
pass
class SecureHTTPRequestHandler(WSGIRequestHandler):
def setup(self):
self.connection = self.request
self.rfile = SSLIO(self.request, "rb", self.rbufsize)
self.wfile = SSLIO(self.request, "wb", self.wbufsize)
def get_environ(self):
environ = super(SecureHTTPRequestHandler, self).get_environ()
cert = self.connection.get_peer_certificate()
print(self.connection.get_context())
print("fuck stupid devs who don't implement consistent APIs:", cert)
if not cert:
return environ
pub_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, cert.get_pubkey())
issuer = dict(cert.get_issuer().get_components())
subject = dict(cert.get_subject().get_components())
environ['TLS_PUBLIC_KEY'] = pub_key
environ['TLS_ISSUER'] = issuer
environ['TLS_SUBJECT'] = subject
return environ
def simple_app(environ, start_response):
status = '200 OK'
headers = [('Content-type', 'text/plain; charset=utf-8')]
start_response(status, headers)
ret = [("%s: %s\n" % (key, value)).encode("utf-8")
for key, value in environ.items()]
return ret
def server(port,
HandlerClass = SecureHTTPRequestHandler,
ServerClass = SecureHTTPServer):
server_address = ('', port) # (address, port)
httpd = ServerClass(server_address, HandlerClass)
httpd.set_app(simple_app)
sa = httpd.socket.getsockname()
print("Serving HTTPS on", sa[0], "port", sa[1], "...")
httpd.serve_forever()
if __name__ == '__main__':
from multiprocessing import Process
import time
server_process = Process(target=server, args=(8443,))
server_process.start()
#client_process = Process(target=client, args=('127.0.0.1', 8443))
#time.sleep(2)
#client_process.start()
#client_process.join()
server_process.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment