Skip to content

Instantly share code, notes, and snippets.

@oleganza
Created January 2, 2020 15:52
Show Gist options
  • Save oleganza/b269a6062dd4d674762ac82dcec8731e to your computer and use it in GitHub Desktop.
Save oleganza/b269a6062dd4d674762ac82dcec8731e to your computer and use it in GitHub Desktop.
//! ```ascii
//! ┌──────────────────────────────────────────────────────────────────────────────────────┐
//! │ _______ __ __ ______ _______ ______ _______ _ _ _______ _ _ _______ │
//! │ | \_/ |_____] |______ |_____/ |______ |_____| |_____| |____/ |______ │
//! │ |_____ | |_____] |______ | \_ ______| | | | | | \_ |______ │
//! │ │
//! └──────────────────────────────────────────────────────────────────────────────────────┘
//! ```
//!
//! # C Y B E R S H A K E
//!
//! Yet Another Handshake Protocol for p2p sessions.
//!
//! You start with a local private key, remote public key (optional),
//! and a pair of `AsyncRead` and `AsyncWrite` interfaces.
//!
//! The protocol performs mutual authentication and, if it succeeded,
//! returns a pair of wrappers around these interfaces,
//! that keep track of the encryption keys.
//!
//! ## Features
//!
//! * **Symmetric and low-latency.** Handshake is performed by both ends simultaneously.
//! * **Mutual-authentication.** If you provide remote public key, you have a guarantee that you talk to them.
//! * **Key blinding.** Long-term identity keys are never transmitted in the clear.
//! * **Foward secrecy.** Keys are rotated on sent message.
//! * **Robust encryption.** AES-SIV-PMAC provides high-speed cipher with resistance to nonce-misuse.
//!
//! ## TODO
//!
//! * Streaming API to send larger portions of data wrapped in async streams.
//!
use byteorder::{ByteOrder, LittleEndian};
use core::marker::Unpin;
use miscreant::{generic_array::GenericArray, Aes128PmacSiv};
use rand_core::{CryptoRng, RngCore};
use curve25519_dalek::constants::RISTRETTO_BASEPOINT_POINT;
use curve25519_dalek::ristretto::{CompressedRistretto, RistrettoPoint};
use curve25519_dalek::scalar::Scalar;
use curve25519_dalek::traits::VartimeMultiscalarMul;
use merlin::Transcript; // TODO: change for raw Strobe.
use tokio::io;
use tokio::prelude::*;
/// The current version of the protocol is 0.
/// In the future we may add more versions, version bits or whatever.
const ONLY_SUPPORTED_VERSION: u64 = 0;
/// Private key for encrypting and authenticating connection.
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
pub struct PrivateKey {
secret: Scalar,
pubkey: PublicKey,
}
/// Public key for authenticating connection.
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
pub struct PublicKey {
point: CompressedRistretto,
}
/// An endpoint for sending messages to remote party.
/// All messages are ordered and encryption key is ratcheted after each sent message.
pub struct Outgoing<W: io::AsyncWrite + Unpin> {
writer: W,
seq: u64,
kdf: Transcript,
}
/// An endpoint for receiving messages from a remote party.
/// All messages are ordered and encryption key is ratcheted after each received message.
/// Recipient's incoming.seq corresponds to the sender's outgoing.seq.
pub struct Incoming<R: io::AsyncRead + Unpin> {
reader: R,
seq: u64,
kdf: Transcript,
message_maxlen: usize,
}
/// Kinds of failures that may happen during the handshake.
#[derive(Debug)]
pub enum Error {
/// I/O error (connection closed, not enough data, etc).
IoError(io::Error),
/// Point failed to decode correctly.
ProtocolError,
/// Received message is declared too large - not reading.
MessageTooLong(usize),
/// Version used by remote peer is not supported.
UnsupportedVersion,
}
/// Performs the key exchange with a remote end using byte-oriented read- and write- interfaces
/// (e.g. TcpSocket halves).
/// Returns the identity key of the remote peer, along with read- and write- interfaces
/// that perform encryption and authentication behind the scenes.
/// If you need to verify the identity per local policy or certificates, use the returned public key.
pub async fn cybershake<R, W, RNG>(
local_identity: &PrivateKey,
mut reader: R,
mut writer: W,
message_maxlen: usize,
rng: &mut RNG,
) -> Result<(PublicKey, Outgoing<W>, Incoming<R>), Error>
where
R: io::AsyncRead + Unpin,
W: io::AsyncWrite + Unpin,
RNG: RngCore + CryptoRng,
{
// We are going to need an additional ephemeral D-H key,
// and a salt for blinding the reusable identity key.
let mut keygen_rng = Transcript::new(b"Cybershake.randomness")
.build_rng()
.rekey_with_witness_bytes(b"local_privkey", local_identity.as_secret_bytes())
.finalize(rng);
let local_ephemeral = PrivateKey::from(Scalar::random(&mut keygen_rng));
const SALT_LEN: usize = 16;
let mut local_salt = [0u8; SALT_LEN];
keygen_rng.fill_bytes(&mut local_salt[..]);
let local_blinded_identity = local_identity.blind(&local_salt);
// Now we send our first, unencrypted, message:
//
// [version] [blinded local identity pubkey] [local ephemeral pubkey]
// u64-le 32 bytes 32 bytes
writer
.write(&encode_u64le(ONLY_SUPPORTED_VERSION)[..])
.await?;
writer
.write(local_blinded_identity.pubkey.as_bytes())
.await?;
writer.write(local_ephemeral.pubkey.as_bytes()).await?;
writer.flush().await?;
// Receive the similar message from the other end (that was sent simultaneously).
let mut remote_version_buf = [0u8; 8];
reader.read_exact(&mut remote_version_buf[..]).await?;
let remote_version = LittleEndian::read_u64(&remote_version_buf);
if remote_version != ONLY_SUPPORTED_VERSION {
return Err(Error::UnsupportedVersion);
}
let remote_blinded_identity = PublicKey::read_from(&mut reader).await?;
let remote_ephemeral = PublicKey::read_from(&mut reader).await?;
// Now, perform a triple Diffie-Hellman shared key generation.
let t = cybershake_x3dh(
&local_blinded_identity,
&local_ephemeral,
&remote_blinded_identity,
&remote_ephemeral,
)?;
// We will have two independent derivations of the shared key:
// one for the outgoing messages, and another one for incoming messages.
let mut kdf_outgoing = t.clone();
let mut kdf_incoming = t;
kdf_outgoing.append_message(b"src", local_blinded_identity.pubkey.as_bytes());
kdf_incoming.append_message(b"src", remote_blinded_identity.as_bytes());
// Now we prepare endpoints for reading and writing messages,
// but don't give them to the user until we authenticate the connection.
let mut outgoing = Outgoing {
writer,
seq: 0,
kdf: kdf_outgoing,
};
let mut incoming = Incoming {
reader,
seq: 0,
kdf: kdf_incoming,
message_maxlen,
};
// In order to authenticate the session, we send our first encrypted message
// in which we show the salt and the root key.
// If the transmission was successful (authenticated decryption succeeded),
// we check the blinded key and then let user continue using the session.
// Prepare and send the message: salt and local identity pubkey.
let msg_len = SALT_LEN + 32;
let mut local_salt_and_id = Vec::<u8>::with_capacity(msg_len);
local_salt_and_id.extend_from_slice(&local_salt[..]);
local_salt_and_id.extend_from_slice(local_identity.pubkey.as_bytes());
outgoing.send_message(&local_salt_and_id).await?;
// Receive the message from another end: their salt and their identity pubkey.
let remote_salt_and_id = incoming.receive_message().await?;
if remote_salt_and_id.len() != msg_len {
return Err(Error::ProtocolError);
}
let mut remote_salt = [0u8; SALT_LEN];
remote_salt[..].copy_from_slice(&remote_salt_and_id[0..SALT_LEN]);
let received_remote_identity =
PublicKey::read_from(&mut &remote_salt_and_id[SALT_LEN..]).await?;
// Blinded key is also a secure commitment to the underlying key.
// Here we check that the remote party has sent us the correct identity key
// matching the blinded key they used for X3DH.
let received_remote_id_blinded = received_remote_identity
.blind(&remote_salt)
.ok_or(Error::ProtocolError)?;
if received_remote_id_blinded != remote_blinded_identity {
return Err(Error::ProtocolError);
}
Ok((received_remote_identity, outgoing, incoming))
}
// TODO: implement AsyncWrite for this, buffering the data and encrypting on flush or on each N-byte chunk.
impl<W: AsyncWrite + Unpin> Outgoing<W> {
pub async fn send_message(&mut self, msg: &[u8]) -> Result<(), Error> {
self.kdf.append_u64(b"seq", self.seq);
let mut key = [0u8; 32];
self.kdf.challenge_bytes(b"key", &mut key);
let ad = encode_u64le(self.seq);
let ciphertext = Aes128PmacSiv::new(GenericArray::clone_from_slice(&key))
.encrypt(&[&ad], msg)
.map_err(|_| Error::ProtocolError)?;
self.seq += 1;
// Write the length prefix and the ciphertext.
self.writer
.write(&encode_u64le(ciphertext.len() as u64)[..])
.await?;
self.writer.write(&ciphertext[..]).await?;
self.writer.flush().await?;
Ok(())
}
}
impl<W: AsyncRead + Unpin> Incoming<W> {
pub async fn receive_message(&mut self) -> Result<Vec<u8>, Error> {
let mut lenbuf = [0u8; 8];
let seq = self.seq;
self.seq += 1;
self.reader.read_exact(&mut lenbuf[..]).await?;
let len = LittleEndian::read_u64(&lenbuf) as usize;
// length must include IV prefix (16 bytes)
if len < 16 {
return Err(Error::ProtocolError);
}
// Check the message length and fail before changing any of the remaining state.
if (len - 16) > self.message_maxlen {
return Err(Error::MessageTooLong(len - 16));
}
let mut ciphertext = Vec::with_capacity(len);
ciphertext.resize(len, 0u8);
self.reader.read_exact(&mut ciphertext[..]).await?;
self.kdf.append_u64(b"seq", seq);
let mut key = [0u8; 32];
self.kdf.challenge_bytes(b"key", &mut key);
let ad = encode_u64le(seq);
let plaintext = Aes128PmacSiv::new(GenericArray::clone_from_slice(&key))
.decrypt(&[&ad], &ciphertext)
.map_err(|_| Error::ProtocolError)?;
Ok(plaintext)
}
/// Converts to the Stream
pub fn into_stream(self) -> impl futures::stream::Stream<Item = Result<Vec<u8>, Error>> {
futures::stream::unfold(self, |mut src| {
async move {
let res = src.receive_message().await;
Some((res, src))
}
})
}
}
/// This is a YOLO variant of Signal's X3DH that's aimed at improved performance:
/// instead of doing independent computation of three DH instances,
/// compressing them, and feeding independently into a hash,
/// we add them all together, separated by a Fiat-Shamir challenges (x, y):
///
/// X3DH = Hash(DH(eph1, eph2) + x * DH(id1, eph2) + y * DH(id2, eph1))
///
/// This is allows reusing doublings across all three instances,
/// and do a single point compression in the end instead of three.
///
/// To get consistent results on both ends, we reorder keys so the "first" party
/// is the one with the lower compressed identity public key.
fn cybershake_x3dh(
id1: &PrivateKey,
eph1: &PrivateKey,
id2: &PublicKey,
eph2: &PublicKey,
) -> Result<Transcript, Error> {
let mut t = Transcript::new(b"Cybershake.X3DH");
let keep_order = id1.pubkey.as_bytes() < id2.as_bytes();
{
let (id1, eph1, id2, eph2) = if keep_order {
(&id1.pubkey, &eph1.pubkey, id2, eph2)
} else {
(id2, eph2, &id1.pubkey, &eph1.pubkey)
};
t.append_message(b"id1", id1.as_bytes());
t.append_message(b"id2", id2.as_bytes());
t.append_message(b"eph1", eph1.as_bytes());
t.append_message(b"eph2", eph2.as_bytes());
}
let x = challenge_scalar(b"x", &mut t);
let y = challenge_scalar(b"y", &mut t);
let (x, y) = if keep_order { (x, y) } else { (y, x) };
use core::iter;
let shared_secret = RistrettoPoint::optional_multiscalar_mul(
iter::once(&(eph1.as_scalar() + (x * id1.as_scalar())))
.chain(iter::once(&(eph1.as_scalar() * y))),
iter::once(eph2.as_point().decompress()).chain(iter::once(id2.as_point().decompress())),
)
.ok_or(Error::ProtocolError)?;
t.append_message(b"x3dh", shared_secret.compress().as_bytes());
Ok(t)
}
impl From<io::Error> for Error {
fn from(error: io::Error) -> Self {
Error::IoError(error)
}
}
impl From<Scalar> for PrivateKey {
fn from(secret: Scalar) -> Self {
PrivateKey {
secret,
pubkey: PublicKey::from(secret * RISTRETTO_BASEPOINT_POINT),
}
}
}
impl From<CompressedRistretto> for PublicKey {
fn from(point: CompressedRistretto) -> Self {
PublicKey { point }
}
}
impl From<RistrettoPoint> for PublicKey {
fn from(point: RistrettoPoint) -> Self {
PublicKey::from(point.compress())
}
}
impl PrivateKey {
/// Converts the private key to an underlying Ristretto scalar.
pub fn as_scalar(&self) -> &Scalar {
&self.secret
}
/// Converts the private key to its binary encoding.
pub fn as_secret_bytes(&self) -> &[u8] {
&self.secret.as_bytes()[..]
}
/// Converts the private key to its public counterpart.
pub fn to_public_key(&self) -> PublicKey {
self.pubkey
}
/// Blinds the private key.
fn blind(&self, salt: &[u8; 16]) -> Self {
PrivateKey::from(self.secret + keyblinding_factor(&self.pubkey.point, salt))
}
}
impl PublicKey {
/// Converts the public key to an underlying compressed Ristretto point.
pub fn as_point(&self) -> &CompressedRistretto {
&self.point
}
/// Converts the public key to its binary encoding.
pub fn as_bytes(&self) -> &[u8] {
&self.point.as_bytes()[..]
}
/// Blinds the public key.
fn blind(&self, salt: &[u8; 16]) -> Option<Self> {
self.point.decompress().map(|p| {
PublicKey::from(p + keyblinding_factor(&self.point, salt) * RISTRETTO_BASEPOINT_POINT)
})
}
/// Reads pubkey from a reader.
async fn read_from<R: AsyncRead + Unpin>(reader: &mut R) -> Result<Self, Error> {
let mut buf = [0u8; 32];
reader.read_exact(&mut buf[..]).await?;
Ok(Self::from(CompressedRistretto(buf)))
}
}
fn keyblinding_factor(pubkey: &CompressedRistretto, salt: &[u8; 16]) -> Scalar {
let mut t = Transcript::new(b"Cybershake.keyblinding");
t.append_message(b"key", pubkey.as_bytes());
t.append_message(b"salt", &salt[..]);
challenge_scalar(b"factor", &mut t)
}
fn challenge_scalar(label: &'static [u8], transcript: &mut Transcript) -> Scalar {
let mut buf = [0u8; 64];
transcript.challenge_bytes(label, &mut buf);
Scalar::from_bytes_mod_order_wide(&buf)
}
fn encode_u64le(i: u64) -> [u8; 8] {
let mut buf = [0u8; 8];
LittleEndian::write_u64(&mut buf, i);
buf
}
//! Collection of utilities for communicating with peers.
//! - Use PeerLink::spawn() to establish a fully authenticated connection over a given socket stream.
//! - Use PeerID to identify the peer.
use tokio::io;
use tokio::prelude::*;
use tokio::sync;
use tokio::task;
use std::hash::{Hash,Hasher};
use futures::stream::StreamExt;
use rand::thread_rng;
use crate::cybershake;
/// Various kinds of messages that peers can send and receive.
pub enum PeerMessage {
Data(String)
}
/// Identifier of the peer.
#[derive(Clone,Copy,Debug,PartialEq)]
pub struct PeerID(cybershake::PublicKey);
/// Interface for communication with the peer.
pub struct PeerLink {
peer_id: PeerID,
channel: sync::mpsc::Sender<PeerMessage>
}
/// Notification message that the peer sends.
pub enum PeerNotification {
/// Received a message from a peer
Received(PeerID, PeerMessage),
/// Peer got disconnected. This message is not sent if the peer was stopped by the host.
Disconnected(PeerID),
}
impl PeerID {
/// Returns a string representation of the PeerID
pub fn to_string(&self) -> String {
hex::encode(self.0.as_bytes())
}
}
impl Hash for PeerID {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.as_bytes().hash(state);
}
}
impl PeerLink {
/// Returns the ID of the peer.
pub fn id(&self) -> &PeerID {
&self.peer_id
}
/// Sends a message to the peer.
pub async fn send(&mut self, msg: PeerMessage) -> Result<(), String> {
self.channel.send(msg).await.map_err(|_err| "Peer disconnected".into())
}
/// Spawns a peer task that will send notifications to a provided channel.
/// Returns a PeerLink through which commands can be sent.
///
pub async fn spawn<S,N>(
host_identity: &cybershake::PrivateKey,
mut notifications_channel: sync::mpsc::Sender<N>,
socket: S,
) -> Result<Self, cybershake::Error>
where
S: AsyncRead + AsyncWrite + Unpin + 'static,
N: From<PeerNotification> + 'static
{
let (r, w) = io::split(socket);
let r = io::BufReader::new(r);
let w = io::BufWriter::new(w);
let (id_pubkey, mut outgoing, incoming) =
cybershake::cybershake(host_identity, r, w, 1000_000, &mut thread_rng()).await?;
let id = PeerID(id_pubkey);
let retid = id.clone();
let (cmd_sender, cmd_receiver) = sync::mpsc::channel::<PeerMessage>(100);
enum PeerError {
Failed(cybershake::Error),
Stopped,
}
enum PeerEvent {
Send(PeerMessage),
Receive(String),
}
// Returns Result<PeerEvent, Option<cybershake::Error>>
// This configures a merged stream of commands from the host and messages from the peer.
let mut stream = futures::stream::select(
cmd_receiver
.map(|msg| Ok(PeerEvent::Send(msg)))
// if owner drops the PeerLink, we'll get PeerError::Stopped here.
.chain(futures::stream::once(async { Err(PeerError::Stopped) })),
incoming.into_stream().map(|maybe_msg| {
maybe_msg
.map(|m| {
PeerEvent::Receive(
String::from_utf8(m).unwrap_or("[Invalid UTF-8 string]".into()),
)
})
.map_err(PeerError::Failed)
}),
)
.boxed_local();
task::spawn_local(async move {
while let Some(result) = stream.next().await {
// First, handle successful events (think of this as Result::async_map)
let result = (async {
match result? {
PeerEvent::Send(PeerMessage::Data(msg)) => outgoing
.send_message(msg.as_bytes())
.await
.map_err(PeerError::Failed),
PeerEvent::Receive(msg) => {
notifications_channel
.send(PeerNotification::Received(id.clone(), PeerMessage::Data(msg)).into())
.await
.map_err(|_| PeerError::Stopped) // stop the actor if the recipient no longer interested in notifications.
}
}
}).await;
// Second, handle the errors that occured before or after event processing.
match result {
Ok(_) => continue,
Err(PeerError::Failed(_err)) => {
let _ = notifications_channel
.send(PeerNotification::Disconnected(id.clone()).into())
.await; // ignore failure since we are on the way out anyway
break;
}
Err(PeerError::Stopped) => {
// If the peer is stopped, exit silently.
break;
}
}
}
});
Ok(Self {
peer_id: retid,
channel: cmd_sender
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment