Last active
March 8, 2018 15:30
-
-
Save HarryR/5420bac1025250b3a218a8e327d8a3e6 to your computer and use it in GitHub Desktop.
Negotiate an encrypted channel between two long-term NaCl key pairs, ensuring forward secrecy by creating an ephemeral session key. See: https://www.logicista.com/2017/humans-vs-crypto
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Cryptochannel provides a way of securely communicating with another | |
party where each party as a verifiable identity. | |
It provides both Client and Server components, where the Server has a known | |
long-term key but the Server doesn't need to know the client's long-term key | |
prior to it connecting. | |
The protocol enforces forward secrecy, where if either of the parties long-term | |
keys are leaked no intercepting party will be able to recover the contents of | |
the session. All that can be recovered if LTKs are leaked are the long-term | |
public keys of either side (and hence, their identity). | |
""" | |
__all__ = ('Handshake', 'Channel', 'CryptoProtocol') | |
from nacl.public import PublicKey, PrivateKey, Box | |
from nacl.secret import SecretBox | |
from nacl.encoding import RawEncoder | |
from random import randint | |
from nacl.utils import random as randbytes | |
from nacl.hash import sha256 | |
class Handshake(object): | |
""" | |
The Handshake verifies the Client is talking to a specific Server instance, | |
then negotiates two short-term keys to be used for that specific session | |
between the Client and the Server. | |
""" | |
__slots__ = ('_my_ltk', '_their_ltk', '_my_stk', '_their_stk', '_box') | |
def __init__(self, my_LTK_secret=None, their_LTK_public=None): | |
if their_LTK_public is not None: | |
assert isinstance(their_LTK_public, PublicKey) | |
if my_LTK_secret is None: | |
my_LTK_secret = PrivateKey.generate() | |
assert isinstance(my_LTK_secret, PrivateKey) | |
self._my_ltk = my_LTK_secret | |
self._their_ltk = their_LTK_public | |
self._my_stk = None | |
self._their_stk = None | |
self._box = None | |
@property | |
def my_ltk_public(self): | |
""" | |
My long-term public key, for convenience | |
""" | |
return self._my_ltk.public_key | |
@property | |
def box(self): | |
""" | |
The Box used to encrypt/decrypt messages | |
Is None, until after handshake negotiation | |
""" | |
return self._box | |
def step1_send(self): | |
""" | |
Handshake Round 1 - Client Sends to Server | |
Encrypt our public-LTK with our private-STK | |
Then send our public-STK and encrypted-LTK to their public-LTK | |
Our public-STK is known to anybody, but our public-LTK is only known | |
to the holder of their private-LTK. | |
The first NONCE_SIZE of our public-STK is used as the box nonce, this | |
is used to save space during the handshake and enforce protocol. | |
""" | |
assert self._their_ltk is not None | |
assert self._their_stk is None | |
assert self._my_stk is None | |
assert self._my_ltk is not None | |
self._my_stk = PrivateKey.generate() | |
# Encrypt our LTK with a secret between our STK and their LTK | |
box = Box(self._my_stk, self._their_ltk) | |
my_stk_public = bytes(self._my_stk.public_key) | |
nonce = my_stk_public[:SecretBox.NONCE_SIZE] | |
msg = bytes(self._my_ltk.public_key) | |
encrypted = box.encrypt(msg, nonce).ciphertext | |
# Send our public-STK and encryted public-LTK | |
return bytes(self._my_stk.public_key) + encrypted | |
def step1_recv(self, data): | |
""" | |
Handshake Round 1 - Server Recvs from Client | |
Receive their public-LTK and their public-STK | |
Their public-LTK will only be known to us, but can be decrypted if an | |
interceptor holds our private-LTK. | |
""" | |
assert self._their_ltk is None | |
assert self._their_stk is None | |
assert self._my_stk is None | |
assert self._my_ltk is not None | |
self._their_stk = PublicKey(data[:PublicKey.SIZE]) | |
# Decrypt their LTK with a secret between our LTK and their STK | |
box = Box(self._my_ltk, self._their_stk) | |
nonce = bytes(self._their_stk)[:SecretBox.NONCE_SIZE] | |
encrypted_ltk = data[PublicKey.SIZE:] | |
decrypted_ltk = box.decrypt(encrypted_ltk, nonce) | |
self._their_ltk = PublicKey(decrypted_ltk) | |
return True | |
def step2_send(self): | |
""" | |
Handshake Round 2 - Servers Sends to Client | |
Generate our new STK, create box between our STK and their public-STK | |
Reply with our public-STK, with box between our LTK and their public-STK | |
For them to receive our public-STK they must hold their private-STK. | |
Our public-STK will only be known to them, and cannot be intercepted | |
by replaying a message with knowledge of their private-LTK. | |
""" | |
assert self._their_ltk is not None | |
assert self._their_stk is not None | |
assert self._my_stk is None | |
# Create session box, between our STK and their LTK | |
self._my_stk = PrivateKey.generate() | |
self._box = Box(self._my_stk, self._their_stk) | |
# Encrypt our STK with a secret between our LTK and their LTK | |
box = Box(self._my_ltk, self._their_ltk) | |
msg = bytes(self._my_stk.public_key) | |
return box.encrypt(msg, nonce=randbytes(Box.NONCE_SIZE)) | |
def step2_recv(self, data): | |
""" | |
Handshake Round 2 - Client Recvs from Server | |
Receive their short term key. | |
""" | |
assert self._their_stk is None | |
assert self._their_ltk is not None | |
assert self._my_stk is not None | |
assert self._my_ltk is not None | |
# Decrypt their STK with a secret key between my LTK and their STK | |
box = Box(self._my_ltk, self._their_ltk) | |
their_stk = box.decrypt(data) | |
self._their_stk = PublicKey(their_stk) | |
# Create session box, between our STK and their STK | |
self._box = Box(self._my_stk, self._their_stk) | |
return True | |
class Channel(object): | |
""" | |
Provides a sequentially ordered encrypted communications channel between | |
two parties using a shared secret. | |
After successful receipt of a message 'step()' is called to move forward. | |
The same NONCE will be used to encrypt until step() is called. | |
For this reason ordering and delivery guarantees must be external to the | |
channel, the same packet from a sequence can be transmitted multiple times, | |
or split and transmitted in pieces, but the same data must not be | |
re-encrypted after truncation with the same NONCE in the same step. | |
""" | |
__slots__ = ('_box', '_state', 'faults') | |
def __init__(self, box): | |
assert isinstance(box, (Box, SecretBox)) | |
self._box = box | |
self._state = sha256(bytes(self._box), encoder=RawEncoder) | |
self.faults = 0 | |
@property | |
def nonce(self): | |
""" | |
Current NONCE in the sequence | |
""" | |
return self._state[:self._box.NONCE_SIZE] | |
def encrypt(self, data, mtu=32000): | |
""" | |
Returns encrypted data with MAC | |
Total packet size must be within `mtu`. | |
""" | |
if len(data) + 16 > mtu: | |
raise RuntimeError('Encryption overheads exceed maximum packet length') | |
return self._box.encrypt(data, nonce=self.nonce).ciphertext | |
def step(self): | |
""" | |
Step forward in the sequence, cycling the NONCE | |
""" | |
self._state = sha256(self._state, encoder=RawEncoder) | |
def decrypt(self, data): | |
""" | |
Decrypt data, on failure the fault count will increase | |
It is advised to have a cut-off threshold to disconnect after too many | |
faults, as it could indicate interception or order transport failures. | |
""" | |
try: | |
return self._box.decrypt(data, nonce=self.nonce) | |
except Exception: | |
self.faults += 1 | |
raise | |
class CryptoProtocol(object): | |
""" | |
Implements the cryptographic protocol by sending and receiving messages | |
between two parties with a fully synchronised state, because it is agnostic | |
to the contents of the messages the only way it can ensure synchronisation | |
is if both parties act in lock-step. | |
The side which starts sending first needs to poll for answers and can't | |
arbitrarily be sent messages without breaking the encryption, e.g. another | |
person with the same short term key wouldn't be able to insert a message | |
in between the others without the sender finding out. | |
""" | |
__slots__ = ('mtu', '_transport', '_channel', '_my_ltk', '_their_ltk') | |
def __init__(self, transport, my_ltk, their_ltk=None): | |
self._transport = transport | |
self._my_ltk = my_ltk | |
self._their_ltk = their_ltk | |
self._channel = None | |
def connect(self): | |
""" | |
Be the initiator of the connection, sending the first message | |
""" | |
assert self._channel is None | |
shake = Handshake(self._my_ltk, self._their_ltk) | |
self._transport.send(shake.step1_send()) | |
shake.step2_recv(self._transport.recv()) | |
self._transport.send(shake.step2_send()) | |
return Channel(shake.box) | |
def accept(self): | |
""" | |
Be the acceptor of the connection, receiving the first message | |
""" | |
assert self._channel is None | |
shake = Handshake(self._my_ltk, self._their_ltk) | |
shake.step1_recv(self._transport.recv()) | |
self._transport.send(shake.step2_send()) | |
shake.step2_recv(self._transport.recv()) | |
return Channel(shake.box) | |
def send(self, msg): | |
""" | |
Send a message via the transport | |
""" | |
assert self._channel is not None | |
packet = self._channel.encrypt(msg, self._transport.mtu) | |
self._transport.send(packet) | |
self._channel.step() | |
def recv(self): | |
""" | |
Receive a message via the transport | |
""" | |
assert self._channel is not None | |
faults_forgive = 2 | |
faults_max = 5 # 2/5 or 40% | |
msg = None | |
while self._channel.faults < faults_max: | |
try: | |
packet = self._transport.recv() | |
msg = self._channel.decrypt(packet) | |
break | |
except: | |
continue | |
if not msg: | |
raise RuntimeError("Too many failures receiving encrypted data") | |
self._channel.step() | |
if self._channel.faults: | |
self._channel.faults = min(0, max(0, self._channel.faults - faults_forgive)) | |
return msg | |
def test_handshake(): | |
""" | |
Verify that the Handshake process works and they agree on a shared secret | |
""" | |
server = Handshake() | |
client = Handshake(None, server.my_ltk_public) | |
step1_client_to_server = client.step1_send() | |
step1_result = server.step1_recv(step1_client_to_server) | |
assert step1_result is True | |
step2_server_to_client = server.step2_send() | |
step2_result = client.step2_recv(step2_server_to_client) | |
assert step2_result is True | |
assert bytes(client.box) == bytes(server.box) | |
def test_channel(): | |
""" | |
Verify the integrity of the channel is maintained after a step | |
""" | |
for _ in range(0, 100): | |
box = SecretBox(randbytes(SecretBox.KEY_SIZE)) | |
client = Channel(box) | |
server = Channel(box) | |
prev_nonce = None | |
prev_packet = None | |
for step in range(0, 100): | |
msg = str(step) | |
if randint(0, 1) == 0: | |
packet = client.encrypt(msg) | |
assert server.decrypt(packet) == msg | |
else: | |
packet = server.encrypt(msg) | |
assert client.decrypt(packet) == msg | |
prev_packet = packet | |
client.step() | |
server.step() | |
assert client.nonce == server.nonce | |
assert client.nonce != prev_nonce | |
prev_nonce = client.nonce | |
try: | |
client.decrypt(prev_packet) | |
except: | |
pass | |
else: | |
assert False | |
if __name__ == "__main__": | |
test_handshake() | |
test_channel() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment