Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save flaneur2020/bad87997499751cf67b71aaf572a9e66 to your computer and use it in GitHub Desktop.
Save flaneur2020/bad87997499751cf67b71aaf572a9e66 to your computer and use it in GitHub Desktop.
commit 5db56dd5c2b6c01186e91b6fc0fe993e319c9f46
Author: Li Yazhou <[email protected]>
Date: Wed Jul 9 22:05:01 2025 +0800
poc: use system time for retention & expire
diff --git a/slatedb/src/compaction_execute_bench.rs b/slatedb/src/compaction_execute_bench.rs
index 9b2eba1..4501b98 100644
--- a/slatedb/src/compaction_execute_bench.rs
+++ b/slatedb/src/compaction_execute_bench.rs
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::mem;
use std::sync::Arc;
-use std::time::Duration;
+use std::time::{Duration, SystemTime};
use bytes::BufMut;
use futures::stream::FuturesUnordered;
@@ -237,7 +237,8 @@ impl CompactionExecuteBench {
destination: 0,
ssts,
sorted_runs: vec![],
- compaction_ts: manifest.db_state().last_l0_clock_tick,
+ compaction_time: SystemTime::UNIX_EPOCH
+ + Duration::from_millis(manifest.db_state().last_l0_clock_tick as u64),
is_dest_last_run,
})
}
@@ -270,7 +271,8 @@ impl CompactionExecuteBench {
destination: 0,
ssts: vec![],
sorted_runs: srs,
- compaction_ts: state.last_l0_clock_tick,
+ compaction_time: SystemTime::UNIX_EPOCH
+ + Duration::from_millis(state.last_l0_clock_tick as u64),
is_dest_last_run,
}
}
diff --git a/slatedb/src/compactor.rs b/slatedb/src/compactor.rs
index 004721e..66ac997 100644
--- a/slatedb/src/compactor.rs
+++ b/slatedb/src/compactor.rs
@@ -423,12 +423,13 @@ impl CompactorEventHandler {
.compacted
.last()
.is_some_and(|sr| compaction.destination == sr.id);
+ let compaction_time = self.system_clock.now();
let job = CompactionJob {
id,
destination: compaction.destination,
ssts,
sorted_runs,
- compaction_ts: db_state.last_l0_clock_tick,
+ compaction_time,
is_dest_last_run,
};
self.progress_tracker
diff --git a/slatedb/src/compactor_executor.rs b/slatedb/src/compactor_executor.rs
index bb1a0ce..094f230 100644
--- a/slatedb/src/compactor_executor.rs
+++ b/slatedb/src/compactor_executor.rs
@@ -2,7 +2,7 @@ use std::collections::{HashMap, VecDeque};
use std::mem;
use std::sync::atomic::{self, AtomicBool};
use std::sync::Arc;
-use std::time::Duration;
+use std::time::{Duration, SystemTime};
use futures::future::join_all;
use parking_lot::Mutex;
@@ -32,7 +32,7 @@ pub(crate) struct CompactionJob {
pub(crate) destination: u32,
pub(crate) ssts: Vec<SsTableHandle>,
pub(crate) sorted_runs: Vec<SortedRun>,
- pub(crate) compaction_ts: i64,
+ pub(crate) compaction_time: SystemTime,
pub(crate) is_dest_last_run: bool,
}
@@ -43,7 +43,7 @@ impl std::fmt::Debug for CompactionJob {
.field("destination", &self.destination)
.field("ssts", &self.ssts)
.field("sorted_runs", &self.sorted_runs)
- .field("compaction_ts", &self.compaction_ts)
+ .field("compaction_time", &self.compaction_time)
.field("is_dest_last_run", &self.is_dest_last_run)
.field("estimated_source_bytes", &self.estimated_source_bytes())
.finish()
@@ -168,7 +168,7 @@ impl TokioCompactionExecutorInner {
merge_iter,
self.options.retention_timeout,
compaction.is_dest_last_run,
- compaction.compaction_ts,
+ compaction.compaction_time.clone(),
)
.await?;
Ok(retention_iter)
diff --git a/slatedb/src/retention_iterator.rs b/slatedb/src/retention_iterator.rs
index c49b835..d5ec0cf 100644
--- a/slatedb/src/retention_iterator.rs
+++ b/slatedb/src/retention_iterator.rs
@@ -1,7 +1,7 @@
use async_trait::async_trait;
use std::cmp::Reverse;
use std::collections::BTreeMap;
-use std::time::Duration;
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
use crate::error::SlateDBError;
use crate::iter::KeyValueIterator;
@@ -24,7 +24,7 @@ pub(crate) struct RetentionIterator<T: KeyValueIterator> {
/// Whether to filter out tombstones
filter_tombstone: bool,
/// The current timestamp in seconds since Unix epoch
- start_timestamp: i64,
+ start_time: SystemTime,
/// The total number of bytes processed so far
total_bytes_processed: u64,
}
@@ -35,13 +35,13 @@ impl<T: KeyValueIterator> RetentionIterator<T> {
inner: T,
retention_timeout: Duration,
filter_tombstone: bool,
- start_timestamp: i64,
+ start_time: SystemTime,
) -> Result<Self, SlateDBError> {
Ok(Self {
inner,
retention_timeout,
filter_tombstone,
- start_timestamp,
+ start_time,
buffer: RetentionBuffer::new(),
total_bytes_processed: 0,
})
@@ -55,7 +55,7 @@ impl<T: KeyValueIterator> RetentionIterator<T> {
/// - Transform expired entries into tombstones, and recycle the tombstones in the tail if filter_tombstone is true.
fn apply_retention_filter(
versions: BTreeMap<Reverse<u64>, RowEntry>,
- current_timestamp: i64,
+ current_time: SystemTime,
retention_timeout: Duration,
filter_tombstone: bool,
) -> BTreeMap<Reverse<u64>, RowEntry> {
@@ -63,10 +63,15 @@ impl<T: KeyValueIterator> RetentionIterator<T> {
for (idx, (_, entry)) in versions.into_iter().enumerate() {
// always keep the latest version (idx == 0), for older versions, check if they are within retention window.
// if retention timeout is zero, only the latest version is kept.
+ let current_ts = current_time
+ .duration_since(UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_millis() as i64;
let in_retention_window = entry
.create_ts
.map(|create_ts| {
- create_ts + (retention_timeout.as_millis() as i64) > current_timestamp
+ // create_ts is in milliseconds since unix epoch
+ create_ts + (retention_timeout.as_millis() as i64) > current_ts
})
.unwrap_or(false);
let should_keep = idx == 0 || in_retention_window;
@@ -81,7 +86,7 @@ impl<T: KeyValueIterator> RetentionIterator<T> {
// abstracting this away into generic, pluggable compaction filters
// but for now we do it inline
let entry = match entry.expire_ts.as_ref() {
- Some(expire_ts) if *expire_ts <= current_timestamp => {
+ Some(expire_ts) if *expire_ts <= current_ts => {
// insert a tombstone instead of just filtering out the
// value in the iterator because this may otherwise "revive"
// an older version of the KV pair that has a larger TTL in
@@ -164,12 +169,11 @@ impl<T: KeyValueIterator> KeyValueIterator for RetentionIterator<T> {
}
RetentionBufferState::NeedProcess => {
// Apply retention filtering to collected versions
- let current_timestamp = self.start_timestamp;
let retention_time = self.retention_timeout;
self.buffer.process_retention(|versions| {
Self::apply_retention_filter(
versions,
- current_timestamp,
+ self.start_time,
retention_time,
self.filter_tombstone,
)
@@ -541,7 +545,7 @@ mod tests {
name: &'static str,
input_entries: Vec<RowEntry>,
retention_time: Duration,
- current_timestamp: i64,
+ current_time: SystemTime,
expected_entries: Vec<RowEntry>,
}
@@ -551,7 +555,7 @@ mod tests {
name: "empty_iterator",
input_entries: vec![],
retention_time: Duration::from_secs(3600), // 1 hour
- current_timestamp: 1000,
+ current_time: SystemTime::UNIX_EPOCH + Duration::from_secs(1000),
expected_entries: vec![],
})]
#[case(RetentionIteratorTestCase {
@@ -560,7 +564,7 @@ mod tests {
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(950), // 50 seconds ago
],
retention_time: Duration::from_secs(3600), // 1 hour
- current_timestamp: 1000,
+ current_time: SystemTime::UNIX_EPOCH + Duration::from_secs(1000),
expected_entries: vec![
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(950)
],
@@ -571,7 +575,7 @@ mod tests {
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(500), // 500 seconds ago
],
retention_time: Duration::from_secs(3600), // 1 hour
- current_timestamp: 1000,
+ current_time: SystemTime::UNIX_EPOCH + Duration::from_secs(1000),
expected_entries: vec![
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(500), // 500 + 3600 = 4100 >= 1000, so kept
],
@@ -584,7 +588,7 @@ mod tests {
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(850), // Within retention
],
retention_time: Duration::from_secs(3600), // 1 hour
- current_timestamp: 1000,
+ current_time: SystemTime::UNIX_EPOCH + Duration::from_secs(1000),
expected_entries: vec![
RowEntry::new_value(b"key1", b"value3", 3).with_create_ts(950),
RowEntry::new_value(b"key1", b"value2", 2).with_create_ts(900),
@@ -599,7 +603,7 @@ mod tests {
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(850), // Within retention
],
retention_time: Duration::from_secs(3600), // 1 hour
- current_timestamp: 1000,
+ current_time: SystemTime::UNIX_EPOCH + Duration::from_secs(1000),
expected_entries: vec![
RowEntry::new_value(b"key1", b"value3", 3).with_create_ts(950),
RowEntry::new_value(b"key1", b"value2", 2).with_create_ts(500), // 500 + 3600 = 4100 >= 1000, so kept
@@ -614,7 +618,7 @@ mod tests {
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(850), // Within retention
],
retention_time: Duration::from_secs(3600), // 1 hour
- current_timestamp: 1000,
+ current_time: SystemTime::UNIX_EPOCH + Duration::from_secs(1000),
expected_entries: vec![
RowEntry::new_tombstone(b"key1", 3).with_create_ts(950),
RowEntry::new_value(b"key1", b"value2", 2).with_create_ts(500), // 500 + 3600 = 4100 >= 1000, so kept
@@ -629,7 +633,7 @@ mod tests {
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(850), // Within retention
],
retention_time: Duration::from_secs(3600), // 1 hour
- current_timestamp: 1000,
+ current_time: SystemTime::UNIX_EPOCH + Duration::from_secs(1000),
expected_entries: vec![
RowEntry::new_merge(b"key1", b"merge3", 3).with_create_ts(950),
RowEntry::new_merge(b"key1", b"merge2", 2).with_create_ts(500), // 500 + 3600 = 4100 >= 1000, so kept
@@ -644,7 +648,7 @@ mod tests {
RowEntry::new_value(b"key1", b"value1", 1), // No create_ts
],
retention_time: Duration::from_secs(3600),
- current_timestamp: 1000,
+ current_time: SystemTime::UNIX_EPOCH + Duration::from_secs(1000),
expected_entries: vec![
RowEntry::new_value(b"key1", b"value3", 3),
],
@@ -657,7 +661,7 @@ mod tests {
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(500), // Outside retention
],
retention_time: Duration::from_secs(3600),
- current_timestamp: 1000,
+ current_time: SystemTime::UNIX_EPOCH + Duration::from_secs(1000),
expected_entries: vec![
RowEntry::new_value(b"key1", b"value3", 3).with_create_ts(950),
],
@@ -670,7 +674,7 @@ mod tests {
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(998), // 2 seconds ago
],
retention_time: Duration::from_secs(0), // No retention
- current_timestamp: 1000,
+ current_time: SystemTime::UNIX_EPOCH + Duration::from_secs(1000),
expected_entries: vec![
RowEntry::new_value(b"key1", b"value3", 3).with_create_ts(1000), // Latest always kept
],
@@ -683,7 +687,7 @@ mod tests {
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(10), // Very old
],
retention_time: Duration::from_secs(1000), // Very long retention
- current_timestamp: 1000,
+ current_time: SystemTime::UNIX_EPOCH + Duration::from_secs(1000),
expected_entries: vec![
RowEntry::new_value(b"key1", b"value3", 3).with_create_ts(100),
RowEntry::new_value(b"key1", b"value2", 2).with_create_ts(50),
@@ -702,7 +706,7 @@ mod tests {
let filtered_versions = RetentionIterator::<TestIterator>::apply_retention_filter(
versions,
- test_case.current_timestamp,
+ test_case.current_time,
test_case.retention_time,
true,
);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment