Created
July 9, 2025 14:07
-
-
Save flaneur2020/bad87997499751cf67b71aaf572a9e66 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| commit 5db56dd5c2b6c01186e91b6fc0fe993e319c9f46 | |
| Author: Li Yazhou <[email protected]> | |
| Date: Wed Jul 9 22:05:01 2025 +0800 | |
| poc: use system time for retention & expire | |
| diff --git a/slatedb/src/compaction_execute_bench.rs b/slatedb/src/compaction_execute_bench.rs | |
| index 9b2eba1..4501b98 100644 | |
| --- a/slatedb/src/compaction_execute_bench.rs | |
| +++ b/slatedb/src/compaction_execute_bench.rs | |
| @@ -1,7 +1,7 @@ | |
| use std::collections::HashMap; | |
| use std::mem; | |
| use std::sync::Arc; | |
| -use std::time::Duration; | |
| +use std::time::{Duration, SystemTime}; | |
| use bytes::BufMut; | |
| use futures::stream::FuturesUnordered; | |
| @@ -237,7 +237,8 @@ impl CompactionExecuteBench { | |
| destination: 0, | |
| ssts, | |
| sorted_runs: vec![], | |
| - compaction_ts: manifest.db_state().last_l0_clock_tick, | |
| + compaction_time: SystemTime::UNIX_EPOCH | |
| + + Duration::from_millis(manifest.db_state().last_l0_clock_tick as u64), | |
| is_dest_last_run, | |
| }) | |
| } | |
| @@ -270,7 +271,8 @@ impl CompactionExecuteBench { | |
| destination: 0, | |
| ssts: vec![], | |
| sorted_runs: srs, | |
| - compaction_ts: state.last_l0_clock_tick, | |
| + compaction_time: SystemTime::UNIX_EPOCH | |
| + + Duration::from_millis(state.last_l0_clock_tick as u64), | |
| is_dest_last_run, | |
| } | |
| } | |
| diff --git a/slatedb/src/compactor.rs b/slatedb/src/compactor.rs | |
| index 004721e..66ac997 100644 | |
| --- a/slatedb/src/compactor.rs | |
| +++ b/slatedb/src/compactor.rs | |
| @@ -423,12 +423,13 @@ impl CompactorEventHandler { | |
| .compacted | |
| .last() | |
| .is_some_and(|sr| compaction.destination == sr.id); | |
| + let compaction_time = self.system_clock.now(); | |
| let job = CompactionJob { | |
| id, | |
| destination: compaction.destination, | |
| ssts, | |
| sorted_runs, | |
| - compaction_ts: db_state.last_l0_clock_tick, | |
| + compaction_time, | |
| is_dest_last_run, | |
| }; | |
| self.progress_tracker | |
| diff --git a/slatedb/src/compactor_executor.rs b/slatedb/src/compactor_executor.rs | |
| index bb1a0ce..094f230 100644 | |
| --- a/slatedb/src/compactor_executor.rs | |
| +++ b/slatedb/src/compactor_executor.rs | |
| @@ -2,7 +2,7 @@ use std::collections::{HashMap, VecDeque}; | |
| use std::mem; | |
| use std::sync::atomic::{self, AtomicBool}; | |
| use std::sync::Arc; | |
| -use std::time::Duration; | |
| +use std::time::{Duration, SystemTime}; | |
| use futures::future::join_all; | |
| use parking_lot::Mutex; | |
| @@ -32,7 +32,7 @@ pub(crate) struct CompactionJob { | |
| pub(crate) destination: u32, | |
| pub(crate) ssts: Vec<SsTableHandle>, | |
| pub(crate) sorted_runs: Vec<SortedRun>, | |
| - pub(crate) compaction_ts: i64, | |
| + pub(crate) compaction_time: SystemTime, | |
| pub(crate) is_dest_last_run: bool, | |
| } | |
| @@ -43,7 +43,7 @@ impl std::fmt::Debug for CompactionJob { | |
| .field("destination", &self.destination) | |
| .field("ssts", &self.ssts) | |
| .field("sorted_runs", &self.sorted_runs) | |
| - .field("compaction_ts", &self.compaction_ts) | |
| + .field("compaction_time", &self.compaction_time) | |
| .field("is_dest_last_run", &self.is_dest_last_run) | |
| .field("estimated_source_bytes", &self.estimated_source_bytes()) | |
| .finish() | |
| @@ -168,7 +168,7 @@ impl TokioCompactionExecutorInner { | |
| merge_iter, | |
| self.options.retention_timeout, | |
| compaction.is_dest_last_run, | |
| - compaction.compaction_ts, | |
| + compaction.compaction_time.clone(), | |
| ) | |
| .await?; | |
| Ok(retention_iter) | |
| diff --git a/slatedb/src/retention_iterator.rs b/slatedb/src/retention_iterator.rs | |
| index c49b835..d5ec0cf 100644 | |
| --- a/slatedb/src/retention_iterator.rs | |
| +++ b/slatedb/src/retention_iterator.rs | |
| @@ -1,7 +1,7 @@ | |
| use async_trait::async_trait; | |
| use std::cmp::Reverse; | |
| use std::collections::BTreeMap; | |
| -use std::time::Duration; | |
| +use std::time::{Duration, SystemTime, UNIX_EPOCH}; | |
| use crate::error::SlateDBError; | |
| use crate::iter::KeyValueIterator; | |
| @@ -24,7 +24,7 @@ pub(crate) struct RetentionIterator<T: KeyValueIterator> { | |
| /// Whether to filter out tombstones | |
| filter_tombstone: bool, | |
| /// The current timestamp in seconds since Unix epoch | |
| - start_timestamp: i64, | |
| + start_time: SystemTime, | |
| /// The total number of bytes processed so far | |
| total_bytes_processed: u64, | |
| } | |
| @@ -35,13 +35,13 @@ impl<T: KeyValueIterator> RetentionIterator<T> { | |
| inner: T, | |
| retention_timeout: Duration, | |
| filter_tombstone: bool, | |
| - start_timestamp: i64, | |
| + start_time: SystemTime, | |
| ) -> Result<Self, SlateDBError> { | |
| Ok(Self { | |
| inner, | |
| retention_timeout, | |
| filter_tombstone, | |
| - start_timestamp, | |
| + start_time, | |
| buffer: RetentionBuffer::new(), | |
| total_bytes_processed: 0, | |
| }) | |
| @@ -55,7 +55,7 @@ impl<T: KeyValueIterator> RetentionIterator<T> { | |
| /// - Transform expired entries into tombstones, and recycle the tombstones in the tail if filter_tombstone is true. | |
| fn apply_retention_filter( | |
| versions: BTreeMap<Reverse<u64>, RowEntry>, | |
| - current_timestamp: i64, | |
| + current_time: SystemTime, | |
| retention_timeout: Duration, | |
| filter_tombstone: bool, | |
| ) -> BTreeMap<Reverse<u64>, RowEntry> { | |
| @@ -63,10 +63,15 @@ impl<T: KeyValueIterator> RetentionIterator<T> { | |
| for (idx, (_, entry)) in versions.into_iter().enumerate() { | |
| // always keep the latest version (idx == 0), for older versions, check if they are within retention window. | |
| // if retention timeout is zero, only the latest version is kept. | |
| + let current_ts = current_time | |
| + .duration_since(UNIX_EPOCH) | |
| + .unwrap_or_default() | |
| + .as_millis() as i64; | |
| let in_retention_window = entry | |
| .create_ts | |
| .map(|create_ts| { | |
| - create_ts + (retention_timeout.as_millis() as i64) > current_timestamp | |
| + // create_ts is in milliseconds since unix epoch | |
| + create_ts + (retention_timeout.as_millis() as i64) > current_ts | |
| }) | |
| .unwrap_or(false); | |
| let should_keep = idx == 0 || in_retention_window; | |
| @@ -81,7 +86,7 @@ impl<T: KeyValueIterator> RetentionIterator<T> { | |
| // abstracting this away into generic, pluggable compaction filters | |
| // but for now we do it inline | |
| let entry = match entry.expire_ts.as_ref() { | |
| - Some(expire_ts) if *expire_ts <= current_timestamp => { | |
| + Some(expire_ts) if *expire_ts <= current_ts => { | |
| // insert a tombstone instead of just filtering out the | |
| // value in the iterator because this may otherwise "revive" | |
| // an older version of the KV pair that has a larger TTL in | |
| @@ -164,12 +169,11 @@ impl<T: KeyValueIterator> KeyValueIterator for RetentionIterator<T> { | |
| } | |
| RetentionBufferState::NeedProcess => { | |
| // Apply retention filtering to collected versions | |
| - let current_timestamp = self.start_timestamp; | |
| let retention_time = self.retention_timeout; | |
| self.buffer.process_retention(|versions| { | |
| Self::apply_retention_filter( | |
| versions, | |
| - current_timestamp, | |
| + self.start_time, | |
| retention_time, | |
| self.filter_tombstone, | |
| ) | |
| @@ -541,7 +545,7 @@ mod tests { | |
| name: &'static str, | |
| input_entries: Vec<RowEntry>, | |
| retention_time: Duration, | |
| - current_timestamp: i64, | |
| + current_time: SystemTime, | |
| expected_entries: Vec<RowEntry>, | |
| } | |
| @@ -551,7 +555,7 @@ mod tests { | |
| name: "empty_iterator", | |
| input_entries: vec![], | |
| retention_time: Duration::from_secs(3600), // 1 hour | |
| - current_timestamp: 1000, | |
| + current_time: SystemTime::UNIX_EPOCH + Duration::from_secs(1000), | |
| expected_entries: vec![], | |
| })] | |
| #[case(RetentionIteratorTestCase { | |
| @@ -560,7 +564,7 @@ mod tests { | |
| RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(950), // 50 seconds ago | |
| ], | |
| retention_time: Duration::from_secs(3600), // 1 hour | |
| - current_timestamp: 1000, | |
| + current_time: SystemTime::UNIX_EPOCH + Duration::from_secs(1000), | |
| expected_entries: vec![ | |
| RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(950) | |
| ], | |
| @@ -571,7 +575,7 @@ mod tests { | |
| RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(500), // 500 seconds ago | |
| ], | |
| retention_time: Duration::from_secs(3600), // 1 hour | |
| - current_timestamp: 1000, | |
| + current_time: SystemTime::UNIX_EPOCH + Duration::from_secs(1000), | |
| expected_entries: vec![ | |
| RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(500), // 500 + 3600 = 4100 >= 1000, so kept | |
| ], | |
| @@ -584,7 +588,7 @@ mod tests { | |
| RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(850), // Within retention | |
| ], | |
| retention_time: Duration::from_secs(3600), // 1 hour | |
| - current_timestamp: 1000, | |
| + current_time: SystemTime::UNIX_EPOCH + Duration::from_secs(1000), | |
| expected_entries: vec![ | |
| RowEntry::new_value(b"key1", b"value3", 3).with_create_ts(950), | |
| RowEntry::new_value(b"key1", b"value2", 2).with_create_ts(900), | |
| @@ -599,7 +603,7 @@ mod tests { | |
| RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(850), // Within retention | |
| ], | |
| retention_time: Duration::from_secs(3600), // 1 hour | |
| - current_timestamp: 1000, | |
| + current_time: SystemTime::UNIX_EPOCH + Duration::from_secs(1000), | |
| expected_entries: vec![ | |
| RowEntry::new_value(b"key1", b"value3", 3).with_create_ts(950), | |
| RowEntry::new_value(b"key1", b"value2", 2).with_create_ts(500), // 500 + 3600 = 4100 >= 1000, so kept | |
| @@ -614,7 +618,7 @@ mod tests { | |
| RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(850), // Within retention | |
| ], | |
| retention_time: Duration::from_secs(3600), // 1 hour | |
| - current_timestamp: 1000, | |
| + current_time: SystemTime::UNIX_EPOCH + Duration::from_secs(1000), | |
| expected_entries: vec![ | |
| RowEntry::new_tombstone(b"key1", 3).with_create_ts(950), | |
| RowEntry::new_value(b"key1", b"value2", 2).with_create_ts(500), // 500 + 3600 = 4100 >= 1000, so kept | |
| @@ -629,7 +633,7 @@ mod tests { | |
| RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(850), // Within retention | |
| ], | |
| retention_time: Duration::from_secs(3600), // 1 hour | |
| - current_timestamp: 1000, | |
| + current_time: SystemTime::UNIX_EPOCH + Duration::from_secs(1000), | |
| expected_entries: vec![ | |
| RowEntry::new_merge(b"key1", b"merge3", 3).with_create_ts(950), | |
| RowEntry::new_merge(b"key1", b"merge2", 2).with_create_ts(500), // 500 + 3600 = 4100 >= 1000, so kept | |
| @@ -644,7 +648,7 @@ mod tests { | |
| RowEntry::new_value(b"key1", b"value1", 1), // No create_ts | |
| ], | |
| retention_time: Duration::from_secs(3600), | |
| - current_timestamp: 1000, | |
| + current_time: SystemTime::UNIX_EPOCH + Duration::from_secs(1000), | |
| expected_entries: vec![ | |
| RowEntry::new_value(b"key1", b"value3", 3), | |
| ], | |
| @@ -657,7 +661,7 @@ mod tests { | |
| RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(500), // Outside retention | |
| ], | |
| retention_time: Duration::from_secs(3600), | |
| - current_timestamp: 1000, | |
| + current_time: SystemTime::UNIX_EPOCH + Duration::from_secs(1000), | |
| expected_entries: vec![ | |
| RowEntry::new_value(b"key1", b"value3", 3).with_create_ts(950), | |
| ], | |
| @@ -670,7 +674,7 @@ mod tests { | |
| RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(998), // 2 seconds ago | |
| ], | |
| retention_time: Duration::from_secs(0), // No retention | |
| - current_timestamp: 1000, | |
| + current_time: SystemTime::UNIX_EPOCH + Duration::from_secs(1000), | |
| expected_entries: vec![ | |
| RowEntry::new_value(b"key1", b"value3", 3).with_create_ts(1000), // Latest always kept | |
| ], | |
| @@ -683,7 +687,7 @@ mod tests { | |
| RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(10), // Very old | |
| ], | |
| retention_time: Duration::from_secs(1000), // Very long retention | |
| - current_timestamp: 1000, | |
| + current_time: SystemTime::UNIX_EPOCH + Duration::from_secs(1000), | |
| expected_entries: vec![ | |
| RowEntry::new_value(b"key1", b"value3", 3).with_create_ts(100), | |
| RowEntry::new_value(b"key1", b"value2", 2).with_create_ts(50), | |
| @@ -702,7 +706,7 @@ mod tests { | |
| let filtered_versions = RetentionIterator::<TestIterator>::apply_retention_filter( | |
| versions, | |
| - test_case.current_timestamp, | |
| + test_case.current_time, | |
| test_case.retention_time, | |
| true, | |
| ); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment