Created
June 21, 2014 01:13
-
-
Save zbyte64/e3d311b2bcc2069aa7fa to your computer and use it in GitHub Desktop.
Attempt to grab peer certificate when serving
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from OpenSSL import crypto, SSL | |
from socket import gethostname | |
import socket | |
import os | |
import re | |
from pprint import pprint | |
from time import gmtime, mktime | |
from os.path import exists, join | |
CERT_FILE = "myapp.crt" | |
KEY_FILE = "myapp.key" | |
cert_dir = './certs/' | |
def create_self_signed_cert(certname): | |
""" | |
If datacard.crt and datacard.key don't exist in cert_dir, create a new | |
self-signed cert and keypair and write them into that directory. | |
""" | |
cert_file = '%s.cert' % certname | |
key_file = '%s.pkey' % certname | |
if (not exists(join(cert_dir, cert_file)) | |
or not exists(join(cert_dir, key_file))): | |
# create a key pair | |
k = crypto.PKey() | |
k.generate_key(crypto.TYPE_RSA, 1024) | |
# create a self-signed cert | |
cert = crypto.X509() | |
cert.get_subject().C = "US" | |
cert.get_subject().ST = "Minnesota" | |
cert.get_subject().L = "Minnetonka" | |
cert.get_subject().O = "my company" | |
cert.get_subject().OU = "my organization" | |
cert.get_subject().CN = gethostname() | |
cert.set_serial_number(1000) | |
cert.gmtime_adj_notBefore(0) | |
cert.gmtime_adj_notAfter(10*365*24*60*60) | |
cert.set_issuer(cert.get_subject()) | |
cert.set_pubkey(k) | |
cert.sign(k, 'sha1') | |
open(join(cert_dir, cert_file), "wb").write( | |
crypto.dump_certificate(crypto.FILETYPE_PEM, cert)) | |
open(join(cert_dir, key_file), "wb").write( | |
crypto.dump_privatekey(crypto.FILETYPE_PEM, k)) | |
def get_context(certname): | |
def callback_function(conn, cert, errno, depth, result): | |
print("cb", certname, result) | |
if depth == 0 and (errno == 9 or errno == 10): | |
return False # or raise Exception("Certificate not yet valid or expired") | |
pub_key = cert.get_pubkey() | |
issuer = cert.get_issuer() | |
common_name = cert.get_subject() | |
print("[%s] Peer Subect:"%certname, dict(common_name.get_components())) | |
print("[%s] Peer Issuer:"%certname, dict(issuer.get_components())) | |
print("[%s] Peer PUB Key:"%certname, crypto.dump_privatekey(crypto.FILETYPE_PEM,pub_key)) | |
return True | |
context = SSL.Context(SSL.TLSv1_METHOD) # Use TLS Method | |
context.set_options(SSL.OP_NO_SSLv2) # Don't accept SSLv2 | |
#context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT | SSL.VERIFY_CLIENT_ONCE, callback_function) | |
context.set_verify(SSL.VERIFY_FAIL_IF_NO_PEER_CERT | SSL.VERIFY_CLIENT_ONCE, callback_function) | |
#context.set_verify(SSL.VERIFY_NONE, callback_function) | |
context.set_session_id("My_experimental_AJAX_Server") | |
create_self_signed_cert(certname) | |
context.use_privatekey_file (os.path.join(cert_dir, '%s.pkey' % certname)) | |
context.use_certificate_file(os.path.join(cert_dir, '%s.cert' % certname)) | |
#context.load_verify_locations(ca_file, ca_path) | |
print("generated", certname) | |
return context | |
def make_payload(headers): | |
return '\r\n'.join(headers) + '\r\n' | |
def client(ip_addr, port): | |
import time | |
context = get_context('client') | |
payload = make_payload([ | |
'GET / HTTP/1.1', | |
'Host: %s'%ip_addr, | |
'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', | |
'Accept-Language: en-US,en;q=0.5', | |
'Accept-Encoding: gzip, deflate', | |
'DNT: 1', | |
'Connection: keep-alive', | |
]) | |
sock = socket.socket() | |
ssl_sock = SSL.Connection(context, sock) | |
#ssl_sock.setblocking(True) | |
ssl_sock.connect((ip_addr, port)) | |
ssl_sock.do_handshake() | |
ssl_sock.send(payload) | |
#TODO figure out how to read properly | |
print("time to read") | |
#ssl_sock.listen(0) | |
data = ssl_sock.recv(1024) | |
print("cl:", data) | |
response = "" | |
while len(data): | |
print('cl:', data) | |
response += data | |
data = sock.recv(1024) | |
print('[CLIENT]', response) | |
ssl_sock.close() | |
from socketserver import BaseServer | |
from wsgiref.simple_server import WSGIRequestHandler, WSGIServer | |
from OpenSSL import SSL | |
import io | |
class SSLIO(io.IOBase): | |
def __init__(self, connection, mode='rwb', bufsize=1024): | |
self.connection = connection | |
self.mode = mode | |
self.bufsize = bufsize | |
self._closed = False | |
@property | |
def closed(self): | |
return self._closed | |
def close(self): | |
#self.connection.shutdown() | |
self.connection.close() | |
self._closed = True | |
def fileno(self): | |
raise OSError | |
def flush(self): | |
pass | |
def isatty(self): | |
return False | |
def readable(self): | |
return 'r' in self.mode and self.connection.want_read() | |
def seekable(self): | |
return False | |
def writable(self): | |
return 'w' in self.mode | |
def read(self, bufsize=None): | |
if self.closed: | |
return b'' | |
if bufsize is None: | |
bufsize = self.bufsize | |
try: | |
return self.connection.recv(bufsize) | |
except SSL.ZeroReturnError: | |
self._closed = True | |
return b'' | |
except SSL.SysCallError: | |
self._closed = True | |
return b'' | |
def write(self, data): | |
return self.connection.send(data) | |
class SecureHTTPServer(WSGIServer): | |
def __init__(self, server_address, HandlerClass): | |
WSGIServer.__init__(self, server_address, HandlerClass) | |
context = get_context('server') | |
self.socket = SSL.Connection(context, socket.socket(self.address_family, | |
self.socket_type)) | |
self.server_bind() | |
self.server_activate() | |
def shutdown_request(self, request): | |
try: | |
request.shutdown() | |
except: | |
pass | |
class SecureHTTPRequestHandler(WSGIRequestHandler): | |
def setup(self): | |
self.connection = self.request | |
self.rfile = SSLIO(self.request, "rb", self.rbufsize) | |
self.wfile = SSLIO(self.request, "wb", self.wbufsize) | |
def get_environ(self): | |
environ = super(SecureHTTPRequestHandler, self).get_environ() | |
cert = self.connection.get_peer_certificate() | |
print(self.connection.get_context()) | |
print("fuck stupid devs who don't implement consistent APIs:", cert) | |
if not cert: | |
return environ | |
pub_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, cert.get_pubkey()) | |
issuer = dict(cert.get_issuer().get_components()) | |
subject = dict(cert.get_subject().get_components()) | |
environ['TLS_PUBLIC_KEY'] = pub_key | |
environ['TLS_ISSUER'] = issuer | |
environ['TLS_SUBJECT'] = subject | |
return environ | |
def simple_app(environ, start_response): | |
status = '200 OK' | |
headers = [('Content-type', 'text/plain; charset=utf-8')] | |
start_response(status, headers) | |
ret = [("%s: %s\n" % (key, value)).encode("utf-8") | |
for key, value in environ.items()] | |
return ret | |
def server(port, | |
HandlerClass = SecureHTTPRequestHandler, | |
ServerClass = SecureHTTPServer): | |
server_address = ('', port) # (address, port) | |
httpd = ServerClass(server_address, HandlerClass) | |
httpd.set_app(simple_app) | |
sa = httpd.socket.getsockname() | |
print("Serving HTTPS on", sa[0], "port", sa[1], "...") | |
httpd.serve_forever() | |
if __name__ == '__main__': | |
from multiprocessing import Process | |
import time | |
server_process = Process(target=server, args=(8443,)) | |
server_process.start() | |
#client_process = Process(target=client, args=('127.0.0.1', 8443)) | |
#time.sleep(2) | |
#client_process.start() | |
#client_process.join() | |
server_process.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment