Created
January 15, 2022 13:30
-
-
Save alex35mil/eeb319a160a2a7b81990d0f8ceb130ac to your computer and use it in GitHub Desktop.
Hybrid Logical Clocks implementation in Rust
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Hybrid Logical Clocks implementation in Rust. | |
// Based on: | |
// - https://cse.buffalo.edu/tech-reports/2014-04.pdf | |
// - https://github.com/jlongster/crdt-example-app/blob/master/shared/timestamp.js | |
// - https://github.com/jaredly/hybrid-logical-clocks-example/blob/master/src/hlc.js | |
use std::{ | |
char, cmp, fmt, num, ops, str, | |
time::{Duration, SystemTime, UNIX_EPOCH}, | |
}; | |
// Note that the order of members is important | |
// since derived ordering is based on the top-to-bottom declaration order | |
#[derive(Eq, PartialEq, Ord, PartialOrd, Debug)] | |
pub struct Hlc { | |
timestamp: Timestamp, | |
counter: Counter, | |
node: Node, | |
} | |
#[derive(PartialEq, Debug)] | |
pub enum IncError { | |
MaxDriftExceeded { | |
now: Timestamp, | |
timestamp: Timestamp, | |
}, | |
} | |
impl Hlc { | |
const DELIMITER: char = '-'; | |
pub fn new(timestamp: Duration, node: &str) -> Self { | |
Self { | |
timestamp: Timestamp::new(timestamp), | |
counter: Counter::new(), | |
node: Node::new(node), | |
} | |
} | |
pub fn inc(&self) -> Result<Self, IncError> { | |
self._inc(Timestamp::now()) | |
} | |
fn _inc(&self, now: Timestamp) -> Result<Self, IncError> { | |
let (timestamp, counter) = if now > self.timestamp { | |
(now, Counter::new()) | |
} else { | |
if &self.timestamp - &now > Timestamp::max_drift() { | |
return Err(IncError::MaxDriftExceeded { | |
now, | |
timestamp: self.timestamp.to_owned(), | |
}); | |
} | |
(self.timestamp.to_owned(), self.counter.inc()) | |
}; | |
let hlc = Self { | |
timestamp, | |
counter, | |
node: self.node.to_owned(), | |
}; | |
Ok(hlc) | |
} | |
pub fn recv(&self, remote: Self) -> Self { | |
self._recv(remote, Timestamp::now()) | |
} | |
fn _recv(&self, remote: Self, now: Timestamp) -> Self { | |
if now > self.timestamp && now > remote.timestamp { | |
Self { | |
timestamp: now, | |
counter: Counter::new(), | |
node: self.node.to_owned(), | |
} | |
} else if self.timestamp == remote.timestamp { | |
Self { | |
timestamp: self.timestamp.to_owned(), | |
counter: cmp::max(&self.counter, &remote.counter).inc(), | |
node: self.node.to_owned(), | |
} | |
} else if self.timestamp > remote.timestamp { | |
Self { | |
timestamp: self.timestamp.to_owned(), | |
counter: self.counter.inc(), | |
node: self.node.to_owned(), | |
} | |
} else { | |
Self { | |
timestamp: remote.timestamp, | |
counter: remote.counter.inc(), | |
node: self.node.to_owned(), | |
} | |
} | |
} | |
} | |
impl fmt::Display for Hlc { | |
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | |
write!( | |
f, | |
"{timestamp}{del}{counter}{del}{node}", | |
timestamp = self.timestamp, | |
counter = self.counter, | |
node = self.node, | |
del = Self::DELIMITER | |
) | |
} | |
} | |
#[derive(Debug)] | |
pub enum ParseError { | |
InvalidInputFormat { | |
input: String, | |
}, | |
InvalidTimestamp { | |
value: String, | |
error: num::ParseIntError, | |
}, | |
InvalidCounter { | |
value: String, | |
error: num::ParseIntError, | |
}, | |
} | |
impl TryFrom<String> for Hlc { | |
type Error = ParseError; | |
fn try_from(input: String) -> Result<Self, Self::Error> { | |
let mut split = input.splitn(3, Self::DELIMITER); | |
let timestamp = match split.next() { | |
Some(x) => match Timestamp::try_from(x) { | |
Ok(timestamp) => timestamp, | |
Err(error) => { | |
return Err(ParseError::InvalidTimestamp { | |
value: x.to_string(), | |
error, | |
}); | |
} | |
}, | |
None => { | |
return Err(ParseError::InvalidInputFormat { | |
input: input.to_string(), | |
}); | |
} | |
}; | |
let counter = match split.next() { | |
Some(x) => match Counter::try_from(x) { | |
Ok(counter) => counter, | |
Err(error) => { | |
return Err(ParseError::InvalidCounter { | |
value: x.to_string(), | |
error, | |
}); | |
} | |
}, | |
None => { | |
return Err(ParseError::InvalidInputFormat { | |
input: input.to_string(), | |
}); | |
} | |
}; | |
let node = match split.next() { | |
Some(x) => Node(x.to_string()), | |
None => { | |
return Err(ParseError::InvalidInputFormat { | |
input: input.to_string(), | |
}); | |
} | |
}; | |
let hlc = Self { | |
timestamp, | |
counter, | |
node, | |
}; | |
Ok(hlc) | |
} | |
} | |
#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug)] | |
pub struct Timestamp(Duration); | |
impl Timestamp { | |
fn new(x: Duration) -> Self { | |
Self(x) | |
} | |
fn now() -> Self { | |
let now = SystemTime::now() | |
.duration_since(UNIX_EPOCH) | |
.expect("Marty, we have to go back to the future!"); | |
Self(now) | |
} | |
fn max_drift() -> Duration { | |
Duration::from_secs(60) | |
} | |
} | |
impl fmt::Display for Timestamp { | |
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | |
write!(f, "{:0>15}", self.0.as_millis()) | |
} | |
} | |
impl TryFrom<&str> for Timestamp { | |
type Error = num::ParseIntError; | |
fn try_from(s: &str) -> Result<Self, Self::Error> { | |
match s.parse::<usize>() { | |
Ok(x) => Ok(Self(Duration::from_millis(x as u64))), | |
Err(error) => Err(error), | |
} | |
} | |
} | |
impl ops::Sub for &Timestamp { | |
type Output = Duration; | |
fn sub(self, other: Self) -> Self::Output { | |
self.0 - other.0 | |
} | |
} | |
#[derive(Eq, PartialEq, Ord, PartialOrd, Debug)] | |
pub struct Counter(u16); | |
impl Counter { | |
const RADIX: u32 = 16; | |
fn new() -> Self { | |
Self(0) | |
} | |
fn inc(&self) -> Self { | |
Self(self.0 + 1) | |
} | |
} | |
impl fmt::Display for Counter { | |
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | |
let x = self.0; | |
let mut result = ['\0'; 128]; | |
let mut used = 0; | |
let mut x = x as u32; | |
loop { | |
let m = x % Self::RADIX; | |
x /= Self::RADIX; | |
result[used] = char::from_digit(m, Self::RADIX).unwrap(); | |
used += 1; | |
if x == 0 { | |
break; | |
} | |
} | |
let cs = result[..used].iter().rev(); | |
let cs_len = cs.len(); | |
if cs_len < 5 { | |
write!(f, "{:0>1$}", "0", 5 - cs_len)?; | |
} | |
for c in result[..used].iter().rev() { | |
write!(f, "{}", c)?; | |
} | |
Ok(()) | |
} | |
} | |
impl TryFrom<&str> for Counter { | |
type Error = num::ParseIntError; | |
fn try_from(s: &str) -> Result<Self, Self::Error> { | |
match u16::from_str_radix(s, Counter::RADIX) { | |
Ok(x) => Ok(Self(x)), | |
Err(error) => Err(error), | |
} | |
} | |
} | |
#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug)] | |
pub struct Node(String); | |
impl Node { | |
fn new(x: &str) -> Self { | |
Self(x.to_owned()) | |
} | |
} | |
impl fmt::Display for Node { | |
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | |
write!(f, "{}", self.0) | |
} | |
} | |
#[cfg(test)] | |
mod tests { | |
use std::time::Duration; | |
use crate::{Counter, Hlc, IncError, Node, Timestamp}; | |
fn ms(x: u64) -> Duration { | |
Duration::from_millis(x) | |
} | |
#[test] | |
fn ord_timestamp_gt_works() { | |
let hlc1 = Hlc { | |
timestamp: Timestamp(ms(100)), | |
counter: Counter(1), | |
node: Node("a".to_string()), | |
}; | |
let hlc2 = Hlc { | |
timestamp: Timestamp(ms(1)), | |
counter: Counter(100), | |
node: Node("b".to_string()), | |
}; | |
assert!(hlc1 > hlc2); | |
} | |
#[test] | |
fn ord_timestamp_lt_works() { | |
let hlc1 = Hlc { | |
timestamp: Timestamp(ms(1)), | |
counter: Counter(100), | |
node: Node("b".to_string()), | |
}; | |
let hlc2 = Hlc { | |
timestamp: Timestamp(ms(100)), | |
counter: Counter(1), | |
node: Node("a".to_string()), | |
}; | |
assert!(hlc1 < hlc2); | |
} | |
#[test] | |
fn ord_counter_gt_works() { | |
let hlc1 = Hlc { | |
timestamp: Timestamp(ms(1)), | |
counter: Counter(100), | |
node: Node("a".to_string()), | |
}; | |
let hlc2 = Hlc { | |
timestamp: Timestamp(ms(1)), | |
counter: Counter(1), | |
node: Node("b".to_string()), | |
}; | |
assert!(hlc1 > hlc2); | |
} | |
#[test] | |
fn ord_counter_lt_works() { | |
let hlc1 = Hlc { | |
timestamp: Timestamp(ms(1)), | |
counter: Counter(1), | |
node: Node("b".to_string()), | |
}; | |
let hlc2 = Hlc { | |
timestamp: Timestamp(ms(1)), | |
counter: Counter(100), | |
node: Node("a".to_string()), | |
}; | |
assert!(hlc1 < hlc2); | |
} | |
#[test] | |
fn ord_node_gt_works() { | |
let hlc1 = Hlc { | |
timestamp: Timestamp(ms(1)), | |
counter: Counter(1), | |
node: Node("b".to_string()), | |
}; | |
let hlc2 = Hlc { | |
timestamp: Timestamp(ms(1)), | |
counter: Counter(1), | |
node: Node("a".to_string()), | |
}; | |
assert!(hlc1 > hlc2); | |
} | |
#[test] | |
fn ord_node_lt_works() { | |
let hlc1 = Hlc { | |
timestamp: Timestamp(ms(1)), | |
counter: Counter(1), | |
node: Node("a".to_string()), | |
}; | |
let hlc2 = Hlc { | |
timestamp: Timestamp(ms(1)), | |
counter: Counter(1), | |
node: Node("b".to_string()), | |
}; | |
assert!(hlc1 < hlc2); | |
} | |
#[test] | |
fn eq_works() { | |
let hlc1 = Hlc { | |
timestamp: Timestamp(ms(1)), | |
counter: Counter(1), | |
node: Node("a".to_string()), | |
}; | |
let hlc2 = Hlc { | |
timestamp: Timestamp(ms(1)), | |
counter: Counter(1), | |
node: Node("a".to_string()), | |
}; | |
assert_eq!(hlc1, hlc2); | |
} | |
#[test] | |
fn serde_works() { | |
let hlc1 = Hlc { | |
timestamp: Timestamp(ms(1000)), | |
counter: Counter(100), | |
node: Node("a-b-c".to_string()), | |
}; | |
let hlc2: Hlc = hlc1.to_string().try_into().unwrap(); | |
assert_eq!(hlc1, hlc2); | |
} | |
#[test] | |
fn inc_now_in_future_works() { | |
let now = Timestamp(ms(101)); | |
let hlc = Hlc { | |
timestamp: Timestamp(ms(100)), | |
counter: Counter(1), | |
node: Node("a".to_string()), | |
}; | |
let actual = hlc._inc(now).unwrap(); | |
let expected = Hlc { | |
timestamp: Timestamp(ms(101)), | |
counter: Counter(0), | |
node: Node("a".to_string()), | |
}; | |
assert_eq!(actual, expected); | |
} | |
#[test] | |
fn inc_now_in_past_works() { | |
let now = Timestamp(ms(99)); | |
let hlc = Hlc { | |
timestamp: Timestamp(ms(100)), | |
counter: Counter(1), | |
node: Node("a".to_string()), | |
}; | |
let actual = hlc._inc(now).unwrap(); | |
let expected = Hlc { | |
timestamp: Timestamp(ms(100)), | |
counter: Counter(2), | |
node: Node("a".to_string()), | |
}; | |
assert_eq!(actual, expected); | |
} | |
#[test] | |
fn inc_now_in_far_past_errors() { | |
let now = Timestamp(ms(100000) - Timestamp::max_drift() - ms(1)); | |
let hlc = Hlc { | |
timestamp: Timestamp(ms(100000)), | |
counter: Counter(1), | |
node: Node("a".to_string()), | |
}; | |
let actual = hlc._inc(now.clone()).unwrap_err(); | |
let expected = IncError::MaxDriftExceeded { | |
now, | |
timestamp: hlc.timestamp, | |
}; | |
assert_eq!(actual, expected); | |
} | |
#[test] | |
fn recv_now_in_future_works() { | |
let now = Timestamp(ms(101)); | |
let local = Hlc { | |
timestamp: Timestamp(ms(100)), | |
counter: Counter(1), | |
node: Node("a".to_string()), | |
}; | |
let remote = Hlc { | |
timestamp: Timestamp(ms(100)), | |
counter: Counter(1), | |
node: Node("b".to_string()), | |
}; | |
let actual = local._recv(remote, now); | |
let expected = Hlc { | |
timestamp: Timestamp(ms(101)), | |
counter: Counter(0), | |
node: Node("a".to_string()), | |
}; | |
assert_eq!(actual, expected); | |
} | |
#[test] | |
fn recv_now_in_past_timestamp_local_eq_remote_counter_local_gt_remote_works() { | |
let now = Timestamp(ms(99)); | |
let local = Hlc { | |
timestamp: Timestamp(ms(100)), | |
counter: Counter(2), | |
node: Node("a".to_string()), | |
}; | |
let remote = Hlc { | |
timestamp: Timestamp(ms(100)), | |
counter: Counter(1), | |
node: Node("b".to_string()), | |
}; | |
let actual = local._recv(remote, now); | |
let expected = Hlc { | |
timestamp: Timestamp(ms(100)), | |
counter: Counter(3), | |
node: Node("a".to_string()), | |
}; | |
assert_eq!(actual, expected); | |
} | |
#[test] | |
fn recv_now_in_past_timestamp_local_eq_remote_counter_local_lt_remote_works() { | |
let now = Timestamp(ms(99)); | |
let local = Hlc { | |
timestamp: Timestamp(ms(100)), | |
counter: Counter(1), | |
node: Node("a".to_string()), | |
}; | |
let remote = Hlc { | |
timestamp: Timestamp(ms(100)), | |
counter: Counter(2), | |
node: Node("b".to_string()), | |
}; | |
let actual = local._recv(remote, now); | |
let expected = Hlc { | |
timestamp: Timestamp(ms(100)), | |
counter: Counter(3), | |
node: Node("a".to_string()), | |
}; | |
assert_eq!(actual, expected); | |
} | |
#[test] | |
fn recv_now_in_past_timestamp_local_gt_remote_works() { | |
let now = Timestamp(ms(99)); | |
let local = Hlc { | |
timestamp: Timestamp(ms(101)), | |
counter: Counter(1), | |
node: Node("a".to_string()), | |
}; | |
let remote = Hlc { | |
timestamp: Timestamp(ms(100)), | |
counter: Counter(2), | |
node: Node("b".to_string()), | |
}; | |
let actual = local._recv(remote, now); | |
let expected = Hlc { | |
timestamp: Timestamp(ms(101)), | |
counter: Counter(2), | |
node: Node("a".to_string()), | |
}; | |
assert_eq!(actual, expected); | |
} | |
#[test] | |
fn recv_now_in_past_timestamp_local_lt_remote_works() { | |
let now = Timestamp(ms(99)); | |
let local = Hlc { | |
timestamp: Timestamp(ms(100)), | |
counter: Counter(1), | |
node: Node("a".to_string()), | |
}; | |
let remote = Hlc { | |
timestamp: Timestamp(ms(101)), | |
counter: Counter(2), | |
node: Node("b".to_string()), | |
}; | |
let actual = local._recv(remote, now); | |
let expected = Hlc { | |
timestamp: Timestamp(ms(101)), | |
counter: Counter(3), | |
node: Node("a".to_string()), | |
}; | |
assert_eq!(actual, expected); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment