Last active
September 11, 2020 08:55
-
-
Save manuels/8852953 to your computer and use it in GitHub Desktop.
dtls for python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: latin-1 -*- | |
# | |
# Copyright (C) AB Strakt | |
# Copyright (C) Jean-Paul Calderone | |
# See LICENSE for details. | |
""" | |
Simple SSL client, using blocking I/O | |
""" | |
from OpenSSL import SSL | |
import sys, os, select, socket | |
def verify_cb(conn, cert, errnum, depth, ok): | |
# This obviously has to be updated | |
print 'Got certificate: %s' % cert.get_subject() | |
return ok | |
if len(sys.argv) < 3: | |
print 'Usage: python[2] client.py HOST PORT' | |
sys.exit(1) | |
dir = os.path.dirname(sys.argv[0]) | |
if dir == '': | |
dir = os.curdir | |
# Initialize context | |
ctx = SSL.Context(SSL.DTLSv1_METHOD) | |
ctx.set_verify(SSL.VERIFY_PEER, verify_cb) # Demand a certificate | |
ctx.use_privatekey_file (os.path.join(dir, 'client.pkey')) | |
ctx.use_certificate_file(os.path.join(dir, 'client.cert')) | |
ctx.load_verify_locations(os.path.join(dir, 'CA.cert')) | |
# Set up client | |
sock = SSL.Connection(ctx, socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) | |
addr = (sys.argv[1], int(sys.argv[2])) | |
sock.connect(addr) | |
while 1: | |
line = sys.stdin.readline() | |
if line == '': | |
break | |
try: | |
sock.sendto(line, addr) | |
print 'received', sock.recvfrom(1024) | |
sys.stdout.flush() | |
except SSL.Error as e: | |
print e | |
print 'Connection died unexpectedly' | |
break | |
sock.shutdown() | |
sock.close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: latin-1 -*- | |
# | |
# Copyright (C) AB Strakt | |
# Copyright (C) Jean-Paul Calderone | |
# See LICENSE for details. | |
""" | |
Simple echo server, using nonblocking I/O | |
""" | |
from OpenSSL import SSL | |
import sys, os, select, socket | |
def verify_cb(conn, cert, errnum, depth, ok): | |
# This obviously has to be updated | |
print 'Got certificate: %s' % cert.get_subject() | |
return ok | |
if len(sys.argv) < 2: | |
print 'Usage: python[2] server.py PORT' | |
sys.exit(1) | |
dir = os.path.dirname(sys.argv[0]) | |
if dir == '': | |
dir = os.curdir | |
# Initialize context | |
ctx = SSL.Context(SSL.DTLSv1_METHOD) | |
ctx.set_options(SSL.OP_NO_SSLv2) | |
ctx.set_verify(SSL.VERIFY_PEER|SSL.VERIFY_FAIL_IF_NO_PEER_CERT, verify_cb) # Demand a certificate | |
ctx.use_privatekey_file (os.path.join(dir, 'server.pkey')) | |
ctx.use_certificate_file(os.path.join(dir, 'server.cert')) | |
ctx.load_verify_locations(os.path.join(dir, 'CA.cert')) | |
# Set up server | |
server = SSL.Connection(ctx, socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) | |
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
server.bind(('', int(sys.argv[1]))) | |
#server.listen(3) | |
server.setblocking(0) | |
clients = {} | |
writers = {} | |
def dropClient(cli, errors=None): | |
if errors: | |
print 'Client %s left unexpectedly:' % (clients[cli],) | |
print ' ', errors | |
else: | |
print 'Client %s left politely' % (clients[cli],) | |
del clients[cli] | |
if writers.has_key(cli): | |
del writers[cli] | |
if not errors: | |
cli.shutdown() | |
cli.close() | |
while 1: | |
try: | |
r,w,_ = select.select([server]+clients.keys(), writers.keys(), []) | |
except: | |
break | |
for cli in r: | |
print 'something happended', cli, cli==server | |
try: | |
#cli,addr = server.accept() | |
ret, addr = server.recvfrom(1024) | |
print 'Connection from %s' % (addr,) | |
clients[cli] = addr | |
except (SSL.WantReadError, SSL.WantWriteError, SSL.WantX509LookupError): | |
pass | |
except SSL.ZeroReturnError: | |
dropClient(cli) | |
except SSL.Error, errors: | |
dropClient(cli, errors) | |
except Exception as e: | |
print e | |
pass | |
else: | |
if not writers.has_key(cli): | |
writers[cli] = '' | |
print 'got', ret | |
writers[cli] = writers[cli] + ret | |
for cli, addr in clients.iteritems(): | |
try: | |
print 'sending response', cli, addr | |
ret = server.sendto(writers[cli], addr) | |
except (SSL.WantReadError, SSL.WantWriteError, SSL.WantX509LookupError) as e: | |
print e | |
pass | |
except SSL.ZeroReturnError: | |
dropClient(cli) | |
except SSL.Error, errors: | |
dropClient(cli, errors) | |
else: | |
writers[cli] = writers[cli][ret:] | |
if writers[cli] == '': | |
del writers[cli] | |
for cli in clients.keys(): | |
cli.close() | |
server.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
pyopenssl library do not support DTLSv1_METHOD. TLS only. (https://pyopenssl.org/en/stable/api/ssl.html)
Try this: rbit/pydtls#15 (comment)