Skip to content

Instantly share code, notes, and snippets.

@HarryR
Last active March 8, 2018 15:30
Show Gist options
  • Save HarryR/5420bac1025250b3a218a8e327d8a3e6 to your computer and use it in GitHub Desktop.
Save HarryR/5420bac1025250b3a218a8e327d8a3e6 to your computer and use it in GitHub Desktop.
Negotiate an encrypted channel between two long-term NaCl key pairs, ensuring forward secrecy by creating an ephemeral session key. See: https://www.logicista.com/2017/humans-vs-crypto
"""
Cryptochannel provides a way of securely communicating with another
party where each party as a verifiable identity.
It provides both Client and Server components, where the Server has a known
long-term key but the Server doesn't need to know the client's long-term key
prior to it connecting.
The protocol enforces forward secrecy, where if either of the parties long-term
keys are leaked no intercepting party will be able to recover the contents of
the session. All that can be recovered if LTKs are leaked are the long-term
public keys of either side (and hence, their identity).
"""
__all__ = ('Handshake', 'Channel', 'CryptoProtocol')
from nacl.public import PublicKey, PrivateKey, Box
from nacl.secret import SecretBox
from nacl.encoding import RawEncoder
from random import randint
from nacl.utils import random as randbytes
from nacl.hash import sha256
class Handshake(object):
"""
The Handshake verifies the Client is talking to a specific Server instance,
then negotiates two short-term keys to be used for that specific session
between the Client and the Server.
"""
__slots__ = ('_my_ltk', '_their_ltk', '_my_stk', '_their_stk', '_box')
def __init__(self, my_LTK_secret=None, their_LTK_public=None):
if their_LTK_public is not None:
assert isinstance(their_LTK_public, PublicKey)
if my_LTK_secret is None:
my_LTK_secret = PrivateKey.generate()
assert isinstance(my_LTK_secret, PrivateKey)
self._my_ltk = my_LTK_secret
self._their_ltk = their_LTK_public
self._my_stk = None
self._their_stk = None
self._box = None
@property
def my_ltk_public(self):
"""
My long-term public key, for convenience
"""
return self._my_ltk.public_key
@property
def box(self):
"""
The Box used to encrypt/decrypt messages
Is None, until after handshake negotiation
"""
return self._box
def step1_send(self):
"""
Handshake Round 1 - Client Sends to Server
Encrypt our public-LTK with our private-STK
Then send our public-STK and encrypted-LTK to their public-LTK
Our public-STK is known to anybody, but our public-LTK is only known
to the holder of their private-LTK.
The first NONCE_SIZE of our public-STK is used as the box nonce, this
is used to save space during the handshake and enforce protocol.
"""
assert self._their_ltk is not None
assert self._their_stk is None
assert self._my_stk is None
assert self._my_ltk is not None
self._my_stk = PrivateKey.generate()
# Encrypt our LTK with a secret between our STK and their LTK
box = Box(self._my_stk, self._their_ltk)
my_stk_public = bytes(self._my_stk.public_key)
nonce = my_stk_public[:SecretBox.NONCE_SIZE]
msg = bytes(self._my_ltk.public_key)
encrypted = box.encrypt(msg, nonce).ciphertext
# Send our public-STK and encryted public-LTK
return bytes(self._my_stk.public_key) + encrypted
def step1_recv(self, data):
"""
Handshake Round 1 - Server Recvs from Client
Receive their public-LTK and their public-STK
Their public-LTK will only be known to us, but can be decrypted if an
interceptor holds our private-LTK.
"""
assert self._their_ltk is None
assert self._their_stk is None
assert self._my_stk is None
assert self._my_ltk is not None
self._their_stk = PublicKey(data[:PublicKey.SIZE])
# Decrypt their LTK with a secret between our LTK and their STK
box = Box(self._my_ltk, self._their_stk)
nonce = bytes(self._their_stk)[:SecretBox.NONCE_SIZE]
encrypted_ltk = data[PublicKey.SIZE:]
decrypted_ltk = box.decrypt(encrypted_ltk, nonce)
self._their_ltk = PublicKey(decrypted_ltk)
return True
def step2_send(self):
"""
Handshake Round 2 - Servers Sends to Client
Generate our new STK, create box between our STK and their public-STK
Reply with our public-STK, with box between our LTK and their public-STK
For them to receive our public-STK they must hold their private-STK.
Our public-STK will only be known to them, and cannot be intercepted
by replaying a message with knowledge of their private-LTK.
"""
assert self._their_ltk is not None
assert self._their_stk is not None
assert self._my_stk is None
# Create session box, between our STK and their LTK
self._my_stk = PrivateKey.generate()
self._box = Box(self._my_stk, self._their_stk)
# Encrypt our STK with a secret between our LTK and their LTK
box = Box(self._my_ltk, self._their_ltk)
msg = bytes(self._my_stk.public_key)
return box.encrypt(msg, nonce=randbytes(Box.NONCE_SIZE))
def step2_recv(self, data):
"""
Handshake Round 2 - Client Recvs from Server
Receive their short term key.
"""
assert self._their_stk is None
assert self._their_ltk is not None
assert self._my_stk is not None
assert self._my_ltk is not None
# Decrypt their STK with a secret key between my LTK and their STK
box = Box(self._my_ltk, self._their_ltk)
their_stk = box.decrypt(data)
self._their_stk = PublicKey(their_stk)
# Create session box, between our STK and their STK
self._box = Box(self._my_stk, self._their_stk)
return True
class Channel(object):
"""
Provides a sequentially ordered encrypted communications channel between
two parties using a shared secret.
After successful receipt of a message 'step()' is called to move forward.
The same NONCE will be used to encrypt until step() is called.
For this reason ordering and delivery guarantees must be external to the
channel, the same packet from a sequence can be transmitted multiple times,
or split and transmitted in pieces, but the same data must not be
re-encrypted after truncation with the same NONCE in the same step.
"""
__slots__ = ('_box', '_state', 'faults')
def __init__(self, box):
assert isinstance(box, (Box, SecretBox))
self._box = box
self._state = sha256(bytes(self._box), encoder=RawEncoder)
self.faults = 0
@property
def nonce(self):
"""
Current NONCE in the sequence
"""
return self._state[:self._box.NONCE_SIZE]
def encrypt(self, data, mtu=32000):
"""
Returns encrypted data with MAC
Total packet size must be within `mtu`.
"""
if len(data) + 16 > mtu:
raise RuntimeError('Encryption overheads exceed maximum packet length')
return self._box.encrypt(data, nonce=self.nonce).ciphertext
def step(self):
"""
Step forward in the sequence, cycling the NONCE
"""
self._state = sha256(self._state, encoder=RawEncoder)
def decrypt(self, data):
"""
Decrypt data, on failure the fault count will increase
It is advised to have a cut-off threshold to disconnect after too many
faults, as it could indicate interception or order transport failures.
"""
try:
return self._box.decrypt(data, nonce=self.nonce)
except Exception:
self.faults += 1
raise
class CryptoProtocol(object):
"""
Implements the cryptographic protocol by sending and receiving messages
between two parties with a fully synchronised state, because it is agnostic
to the contents of the messages the only way it can ensure synchronisation
is if both parties act in lock-step.
The side which starts sending first needs to poll for answers and can't
arbitrarily be sent messages without breaking the encryption, e.g. another
person with the same short term key wouldn't be able to insert a message
in between the others without the sender finding out.
"""
__slots__ = ('mtu', '_transport', '_channel', '_my_ltk', '_their_ltk')
def __init__(self, transport, my_ltk, their_ltk=None):
self._transport = transport
self._my_ltk = my_ltk
self._their_ltk = their_ltk
self._channel = None
def connect(self):
"""
Be the initiator of the connection, sending the first message
"""
assert self._channel is None
shake = Handshake(self._my_ltk, self._their_ltk)
self._transport.send(shake.step1_send())
shake.step2_recv(self._transport.recv())
self._transport.send(shake.step2_send())
return Channel(shake.box)
def accept(self):
"""
Be the acceptor of the connection, receiving the first message
"""
assert self._channel is None
shake = Handshake(self._my_ltk, self._their_ltk)
shake.step1_recv(self._transport.recv())
self._transport.send(shake.step2_send())
shake.step2_recv(self._transport.recv())
return Channel(shake.box)
def send(self, msg):
"""
Send a message via the transport
"""
assert self._channel is not None
packet = self._channel.encrypt(msg, self._transport.mtu)
self._transport.send(packet)
self._channel.step()
def recv(self):
"""
Receive a message via the transport
"""
assert self._channel is not None
faults_forgive = 2
faults_max = 5 # 2/5 or 40%
msg = None
while self._channel.faults < faults_max:
try:
packet = self._transport.recv()
msg = self._channel.decrypt(packet)
break
except:
continue
if not msg:
raise RuntimeError("Too many failures receiving encrypted data")
self._channel.step()
if self._channel.faults:
self._channel.faults = min(0, max(0, self._channel.faults - faults_forgive))
return msg
def test_handshake():
"""
Verify that the Handshake process works and they agree on a shared secret
"""
server = Handshake()
client = Handshake(None, server.my_ltk_public)
step1_client_to_server = client.step1_send()
step1_result = server.step1_recv(step1_client_to_server)
assert step1_result is True
step2_server_to_client = server.step2_send()
step2_result = client.step2_recv(step2_server_to_client)
assert step2_result is True
assert bytes(client.box) == bytes(server.box)
def test_channel():
"""
Verify the integrity of the channel is maintained after a step
"""
for _ in range(0, 100):
box = SecretBox(randbytes(SecretBox.KEY_SIZE))
client = Channel(box)
server = Channel(box)
prev_nonce = None
prev_packet = None
for step in range(0, 100):
msg = str(step)
if randint(0, 1) == 0:
packet = client.encrypt(msg)
assert server.decrypt(packet) == msg
else:
packet = server.encrypt(msg)
assert client.decrypt(packet) == msg
prev_packet = packet
client.step()
server.step()
assert client.nonce == server.nonce
assert client.nonce != prev_nonce
prev_nonce = client.nonce
try:
client.decrypt(prev_packet)
except:
pass
else:
assert False
if __name__ == "__main__":
test_handshake()
test_channel()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment