Skip to content

Instantly share code, notes, and snippets.

@alex35mil
Created January 15, 2022 13:30
Show Gist options
  • Save alex35mil/eeb319a160a2a7b81990d0f8ceb130ac to your computer and use it in GitHub Desktop.
Save alex35mil/eeb319a160a2a7b81990d0f8ceb130ac to your computer and use it in GitHub Desktop.
Hybrid Logical Clocks implementation in Rust
// Hybrid Logical Clocks implementation in Rust.
// Based on:
// - https://cse.buffalo.edu/tech-reports/2014-04.pdf
// - https://github.com/jlongster/crdt-example-app/blob/master/shared/timestamp.js
// - https://github.com/jaredly/hybrid-logical-clocks-example/blob/master/src/hlc.js
use std::{
char, cmp, fmt, num, ops, str,
time::{Duration, SystemTime, UNIX_EPOCH},
};
// Note that the order of members is important
// since derived ordering is based on the top-to-bottom declaration order
#[derive(Eq, PartialEq, Ord, PartialOrd, Debug)]
pub struct Hlc {
timestamp: Timestamp,
counter: Counter,
node: Node,
}
#[derive(PartialEq, Debug)]
pub enum IncError {
MaxDriftExceeded {
now: Timestamp,
timestamp: Timestamp,
},
}
impl Hlc {
const DELIMITER: char = '-';
pub fn new(timestamp: Duration, node: &str) -> Self {
Self {
timestamp: Timestamp::new(timestamp),
counter: Counter::new(),
node: Node::new(node),
}
}
pub fn inc(&self) -> Result<Self, IncError> {
self._inc(Timestamp::now())
}
fn _inc(&self, now: Timestamp) -> Result<Self, IncError> {
let (timestamp, counter) = if now > self.timestamp {
(now, Counter::new())
} else {
if &self.timestamp - &now > Timestamp::max_drift() {
return Err(IncError::MaxDriftExceeded {
now,
timestamp: self.timestamp.to_owned(),
});
}
(self.timestamp.to_owned(), self.counter.inc())
};
let hlc = Self {
timestamp,
counter,
node: self.node.to_owned(),
};
Ok(hlc)
}
pub fn recv(&self, remote: Self) -> Self {
self._recv(remote, Timestamp::now())
}
fn _recv(&self, remote: Self, now: Timestamp) -> Self {
if now > self.timestamp && now > remote.timestamp {
Self {
timestamp: now,
counter: Counter::new(),
node: self.node.to_owned(),
}
} else if self.timestamp == remote.timestamp {
Self {
timestamp: self.timestamp.to_owned(),
counter: cmp::max(&self.counter, &remote.counter).inc(),
node: self.node.to_owned(),
}
} else if self.timestamp > remote.timestamp {
Self {
timestamp: self.timestamp.to_owned(),
counter: self.counter.inc(),
node: self.node.to_owned(),
}
} else {
Self {
timestamp: remote.timestamp,
counter: remote.counter.inc(),
node: self.node.to_owned(),
}
}
}
}
impl fmt::Display for Hlc {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{timestamp}{del}{counter}{del}{node}",
timestamp = self.timestamp,
counter = self.counter,
node = self.node,
del = Self::DELIMITER
)
}
}
#[derive(Debug)]
pub enum ParseError {
InvalidInputFormat {
input: String,
},
InvalidTimestamp {
value: String,
error: num::ParseIntError,
},
InvalidCounter {
value: String,
error: num::ParseIntError,
},
}
impl TryFrom<String> for Hlc {
type Error = ParseError;
fn try_from(input: String) -> Result<Self, Self::Error> {
let mut split = input.splitn(3, Self::DELIMITER);
let timestamp = match split.next() {
Some(x) => match Timestamp::try_from(x) {
Ok(timestamp) => timestamp,
Err(error) => {
return Err(ParseError::InvalidTimestamp {
value: x.to_string(),
error,
});
}
},
None => {
return Err(ParseError::InvalidInputFormat {
input: input.to_string(),
});
}
};
let counter = match split.next() {
Some(x) => match Counter::try_from(x) {
Ok(counter) => counter,
Err(error) => {
return Err(ParseError::InvalidCounter {
value: x.to_string(),
error,
});
}
},
None => {
return Err(ParseError::InvalidInputFormat {
input: input.to_string(),
});
}
};
let node = match split.next() {
Some(x) => Node(x.to_string()),
None => {
return Err(ParseError::InvalidInputFormat {
input: input.to_string(),
});
}
};
let hlc = Self {
timestamp,
counter,
node,
};
Ok(hlc)
}
}
#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug)]
pub struct Timestamp(Duration);
impl Timestamp {
fn new(x: Duration) -> Self {
Self(x)
}
fn now() -> Self {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Marty, we have to go back to the future!");
Self(now)
}
fn max_drift() -> Duration {
Duration::from_secs(60)
}
}
impl fmt::Display for Timestamp {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:0>15}", self.0.as_millis())
}
}
impl TryFrom<&str> for Timestamp {
type Error = num::ParseIntError;
fn try_from(s: &str) -> Result<Self, Self::Error> {
match s.parse::<usize>() {
Ok(x) => Ok(Self(Duration::from_millis(x as u64))),
Err(error) => Err(error),
}
}
}
impl ops::Sub for &Timestamp {
type Output = Duration;
fn sub(self, other: Self) -> Self::Output {
self.0 - other.0
}
}
#[derive(Eq, PartialEq, Ord, PartialOrd, Debug)]
pub struct Counter(u16);
impl Counter {
const RADIX: u32 = 16;
fn new() -> Self {
Self(0)
}
fn inc(&self) -> Self {
Self(self.0 + 1)
}
}
impl fmt::Display for Counter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let x = self.0;
let mut result = ['\0'; 128];
let mut used = 0;
let mut x = x as u32;
loop {
let m = x % Self::RADIX;
x /= Self::RADIX;
result[used] = char::from_digit(m, Self::RADIX).unwrap();
used += 1;
if x == 0 {
break;
}
}
let cs = result[..used].iter().rev();
let cs_len = cs.len();
if cs_len < 5 {
write!(f, "{:0>1$}", "0", 5 - cs_len)?;
}
for c in result[..used].iter().rev() {
write!(f, "{}", c)?;
}
Ok(())
}
}
impl TryFrom<&str> for Counter {
type Error = num::ParseIntError;
fn try_from(s: &str) -> Result<Self, Self::Error> {
match u16::from_str_radix(s, Counter::RADIX) {
Ok(x) => Ok(Self(x)),
Err(error) => Err(error),
}
}
}
#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug)]
pub struct Node(String);
impl Node {
fn new(x: &str) -> Self {
Self(x.to_owned())
}
}
impl fmt::Display for Node {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use crate::{Counter, Hlc, IncError, Node, Timestamp};
fn ms(x: u64) -> Duration {
Duration::from_millis(x)
}
#[test]
fn ord_timestamp_gt_works() {
let hlc1 = Hlc {
timestamp: Timestamp(ms(100)),
counter: Counter(1),
node: Node("a".to_string()),
};
let hlc2 = Hlc {
timestamp: Timestamp(ms(1)),
counter: Counter(100),
node: Node("b".to_string()),
};
assert!(hlc1 > hlc2);
}
#[test]
fn ord_timestamp_lt_works() {
let hlc1 = Hlc {
timestamp: Timestamp(ms(1)),
counter: Counter(100),
node: Node("b".to_string()),
};
let hlc2 = Hlc {
timestamp: Timestamp(ms(100)),
counter: Counter(1),
node: Node("a".to_string()),
};
assert!(hlc1 < hlc2);
}
#[test]
fn ord_counter_gt_works() {
let hlc1 = Hlc {
timestamp: Timestamp(ms(1)),
counter: Counter(100),
node: Node("a".to_string()),
};
let hlc2 = Hlc {
timestamp: Timestamp(ms(1)),
counter: Counter(1),
node: Node("b".to_string()),
};
assert!(hlc1 > hlc2);
}
#[test]
fn ord_counter_lt_works() {
let hlc1 = Hlc {
timestamp: Timestamp(ms(1)),
counter: Counter(1),
node: Node("b".to_string()),
};
let hlc2 = Hlc {
timestamp: Timestamp(ms(1)),
counter: Counter(100),
node: Node("a".to_string()),
};
assert!(hlc1 < hlc2);
}
#[test]
fn ord_node_gt_works() {
let hlc1 = Hlc {
timestamp: Timestamp(ms(1)),
counter: Counter(1),
node: Node("b".to_string()),
};
let hlc2 = Hlc {
timestamp: Timestamp(ms(1)),
counter: Counter(1),
node: Node("a".to_string()),
};
assert!(hlc1 > hlc2);
}
#[test]
fn ord_node_lt_works() {
let hlc1 = Hlc {
timestamp: Timestamp(ms(1)),
counter: Counter(1),
node: Node("a".to_string()),
};
let hlc2 = Hlc {
timestamp: Timestamp(ms(1)),
counter: Counter(1),
node: Node("b".to_string()),
};
assert!(hlc1 < hlc2);
}
#[test]
fn eq_works() {
let hlc1 = Hlc {
timestamp: Timestamp(ms(1)),
counter: Counter(1),
node: Node("a".to_string()),
};
let hlc2 = Hlc {
timestamp: Timestamp(ms(1)),
counter: Counter(1),
node: Node("a".to_string()),
};
assert_eq!(hlc1, hlc2);
}
#[test]
fn serde_works() {
let hlc1 = Hlc {
timestamp: Timestamp(ms(1000)),
counter: Counter(100),
node: Node("a-b-c".to_string()),
};
let hlc2: Hlc = hlc1.to_string().try_into().unwrap();
assert_eq!(hlc1, hlc2);
}
#[test]
fn inc_now_in_future_works() {
let now = Timestamp(ms(101));
let hlc = Hlc {
timestamp: Timestamp(ms(100)),
counter: Counter(1),
node: Node("a".to_string()),
};
let actual = hlc._inc(now).unwrap();
let expected = Hlc {
timestamp: Timestamp(ms(101)),
counter: Counter(0),
node: Node("a".to_string()),
};
assert_eq!(actual, expected);
}
#[test]
fn inc_now_in_past_works() {
let now = Timestamp(ms(99));
let hlc = Hlc {
timestamp: Timestamp(ms(100)),
counter: Counter(1),
node: Node("a".to_string()),
};
let actual = hlc._inc(now).unwrap();
let expected = Hlc {
timestamp: Timestamp(ms(100)),
counter: Counter(2),
node: Node("a".to_string()),
};
assert_eq!(actual, expected);
}
#[test]
fn inc_now_in_far_past_errors() {
let now = Timestamp(ms(100000) - Timestamp::max_drift() - ms(1));
let hlc = Hlc {
timestamp: Timestamp(ms(100000)),
counter: Counter(1),
node: Node("a".to_string()),
};
let actual = hlc._inc(now.clone()).unwrap_err();
let expected = IncError::MaxDriftExceeded {
now,
timestamp: hlc.timestamp,
};
assert_eq!(actual, expected);
}
#[test]
fn recv_now_in_future_works() {
let now = Timestamp(ms(101));
let local = Hlc {
timestamp: Timestamp(ms(100)),
counter: Counter(1),
node: Node("a".to_string()),
};
let remote = Hlc {
timestamp: Timestamp(ms(100)),
counter: Counter(1),
node: Node("b".to_string()),
};
let actual = local._recv(remote, now);
let expected = Hlc {
timestamp: Timestamp(ms(101)),
counter: Counter(0),
node: Node("a".to_string()),
};
assert_eq!(actual, expected);
}
#[test]
fn recv_now_in_past_timestamp_local_eq_remote_counter_local_gt_remote_works() {
let now = Timestamp(ms(99));
let local = Hlc {
timestamp: Timestamp(ms(100)),
counter: Counter(2),
node: Node("a".to_string()),
};
let remote = Hlc {
timestamp: Timestamp(ms(100)),
counter: Counter(1),
node: Node("b".to_string()),
};
let actual = local._recv(remote, now);
let expected = Hlc {
timestamp: Timestamp(ms(100)),
counter: Counter(3),
node: Node("a".to_string()),
};
assert_eq!(actual, expected);
}
#[test]
fn recv_now_in_past_timestamp_local_eq_remote_counter_local_lt_remote_works() {
let now = Timestamp(ms(99));
let local = Hlc {
timestamp: Timestamp(ms(100)),
counter: Counter(1),
node: Node("a".to_string()),
};
let remote = Hlc {
timestamp: Timestamp(ms(100)),
counter: Counter(2),
node: Node("b".to_string()),
};
let actual = local._recv(remote, now);
let expected = Hlc {
timestamp: Timestamp(ms(100)),
counter: Counter(3),
node: Node("a".to_string()),
};
assert_eq!(actual, expected);
}
#[test]
fn recv_now_in_past_timestamp_local_gt_remote_works() {
let now = Timestamp(ms(99));
let local = Hlc {
timestamp: Timestamp(ms(101)),
counter: Counter(1),
node: Node("a".to_string()),
};
let remote = Hlc {
timestamp: Timestamp(ms(100)),
counter: Counter(2),
node: Node("b".to_string()),
};
let actual = local._recv(remote, now);
let expected = Hlc {
timestamp: Timestamp(ms(101)),
counter: Counter(2),
node: Node("a".to_string()),
};
assert_eq!(actual, expected);
}
#[test]
fn recv_now_in_past_timestamp_local_lt_remote_works() {
let now = Timestamp(ms(99));
let local = Hlc {
timestamp: Timestamp(ms(100)),
counter: Counter(1),
node: Node("a".to_string()),
};
let remote = Hlc {
timestamp: Timestamp(ms(101)),
counter: Counter(2),
node: Node("b".to_string()),
};
let actual = local._recv(remote, now);
let expected = Hlc {
timestamp: Timestamp(ms(101)),
counter: Counter(3),
node: Node("a".to_string()),
};
assert_eq!(actual, expected);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment