Created
November 5, 2024 00:04
-
-
Save louis030195/21d333b39bac18a14b3ee2a340fb3e8c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use anyhow::Result; | |
use bincode; | |
use chrono::{DateTime, Duration, Utc}; | |
use dirs::cache_dir; | |
use screenpipe_core::find_ffmpeg_path; | |
use serde::{Deserialize, Serialize}; | |
use sha2::{Digest, Sha256}; | |
use std::collections::{BTreeMap, HashMap}; | |
use std::path::PathBuf; | |
use std::sync::Arc; | |
use std::time::SystemTime; | |
use tokio::fs; | |
use tokio::process::Command; | |
use tokio::sync::mpsc::Sender; | |
use tokio::sync::{mpsc, oneshot}; | |
use tracing::{debug, error}; | |
use crate::db::{FrameData, OCREntry}; | |
use crate::DatabaseManager; | |
type FrameChannel = mpsc::Sender<TimeSeriesFrame>; | |
#[derive(Debug, Clone)] | |
pub struct TimeSeriesFrame { | |
pub timestamp: DateTime<Utc>, | |
pub frame_data: Vec<DeviceFrame>, | |
} | |
#[derive(Debug, Clone)] | |
pub struct DeviceFrame { | |
pub device_id: String, | |
pub image_data: Vec<u8>, | |
pub metadata: FrameMetadata, | |
pub audio_entries: Vec<AudioEntry>, | |
} | |
#[derive(Debug, Clone, Serialize, Deserialize)] | |
pub struct AudioEntry { | |
pub transcription: String, | |
pub device_name: String, | |
pub is_input: bool, | |
pub audio_file_path: String, | |
pub duration_secs: f64, | |
} | |
impl From<crate::db::AudioEntry> for AudioEntry { | |
fn from(db_entry: crate::db::AudioEntry) -> Self { | |
Self { | |
transcription: db_entry.transcription, | |
device_name: db_entry.device_name, | |
is_input: db_entry.is_input, | |
audio_file_path: db_entry.audio_file_path, | |
duration_secs: db_entry.duration_secs, | |
} | |
} | |
} | |
#[derive(Debug, Clone, Serialize, Deserialize)] | |
pub struct FrameMetadata { | |
pub file_path: String, | |
pub app_name: String, | |
pub window_name: String, | |
pub transcription: String, | |
pub ocr_text: String, | |
} | |
#[derive(Debug)] | |
enum CacheMessage { | |
Store { | |
cache_key: String, | |
frame_data: Vec<u8>, | |
device_data: OCREntry, | |
audio_entries: Vec<AudioEntry>, | |
response: oneshot::Sender<Result<()>>, | |
}, | |
Get { | |
cache_key: String, | |
response: | |
oneshot::Sender<Result<Option<(Vec<u8>, FrameMetadata, (DateTime<Utc>, String))>>>, | |
}, | |
} | |
#[derive(Debug, Serialize, Deserialize)] | |
struct CachedFrame { | |
#[serde(with = "chrono::serde::ts_microseconds")] | |
timestamp: DateTime<Utc>, | |
device_id: String, | |
checksum: String, | |
metadata: FrameMetadata, | |
frame_size: u64, | |
compression: CompressionType, | |
source_video: String, | |
#[serde(with = "chrono::serde::ts_microseconds")] | |
cached_at: DateTime<Utc>, | |
audio_entries: Vec<AudioEntry>, | |
} | |
#[derive(Debug, Serialize, Deserialize)] | |
enum CompressionType { | |
Jpeg { quality: u8 }, | |
} | |
#[derive(Debug)] | |
struct CacheEntry { | |
frame: CachedFrame, | |
path: PathBuf, | |
last_accessed: SystemTime, | |
} | |
#[derive(Debug, Clone)] | |
struct CacheConfig { | |
cache_dir: PathBuf, | |
max_cache_size_gb: f64, | |
frame_retention_days: u64, | |
compression_quality: u8, | |
} | |
impl Default for CacheConfig { | |
fn default() -> Self { | |
Self { | |
cache_dir: PathBuf::from("frame_cache"), | |
max_cache_size_gb: 10.0, | |
frame_retention_days: 7, | |
compression_quality: 50, | |
} | |
} | |
} | |
struct FrameDiskCache { | |
config: CacheConfig, | |
entries: BTreeMap<(DateTime<Utc>, String), CacheEntry>, | |
total_size: u64, | |
index_path: PathBuf, | |
} | |
impl FrameDiskCache { | |
async fn new(config: CacheConfig) -> Result<Self> { | |
let cache_dir = &config.cache_dir; | |
let index_path = cache_dir.join("cache_index.bin"); | |
fs::create_dir_all(cache_dir).await?; | |
let mut cache = Self { | |
config, | |
entries: BTreeMap::new(), | |
total_size: 0, | |
index_path, | |
}; | |
if cache.index_path.exists() { | |
if let Err(e) = cache.load_index().await { | |
debug!("could not load existing cache index: {}", e); | |
cache.entries.clear(); | |
cache.total_size = 0; | |
} | |
} else { | |
cache.save_index().await?; | |
} | |
Ok(cache) | |
} | |
async fn load_index(&mut self) -> Result<()> { | |
match fs::read(&self.index_path).await { | |
Ok(data) if !data.is_empty() => match bincode::deserialize::<Vec<CachedFrame>>(&data) { | |
Ok(frames) => { | |
for frame in frames { | |
let path = self.get_frame_path(&frame.timestamp, &frame.device_id); | |
if let Ok(metadata) = fs::metadata(&path).await { | |
self.entries.insert( | |
(frame.timestamp, frame.device_id.clone()), | |
CacheEntry { | |
frame, | |
path, | |
last_accessed: metadata.accessed()?, | |
}, | |
); | |
self.total_size += metadata.len(); | |
} | |
} | |
debug!("loaded {} cached frames", self.entries.len()); | |
} | |
Err(e) => error!("failed to deserialize cache index: {}", e), | |
}, | |
Ok(_) => debug!("cache index is empty, starting fresh"), | |
Err(e) => error!("failed to read cache index: {}", e), | |
} | |
Ok(()) | |
} | |
async fn save_index(&self) -> Result<()> { | |
let frames: Vec<_> = self.entries.values().map(|entry| &entry.frame).collect(); | |
let temp_path = self.index_path.with_extension("tmp"); | |
let encoded = if frames.is_empty() { | |
bincode::serialize(&Vec::<CachedFrame>::new())? | |
} else { | |
bincode::serialize(&frames)? | |
}; | |
fs::write(&temp_path, encoded).await?; | |
fs::rename(&temp_path, &self.index_path).await?; | |
Ok(()) | |
} | |
async fn store_frame( | |
&mut self, | |
cache_key: &str, | |
frame_data: &[u8], | |
device_data: OCREntry, | |
audio_entries: &[AudioEntry], | |
) -> Result<()> { | |
debug!("storing frame with cache key: {}", cache_key); | |
let (timestamp_str, device_id) = cache_key | |
.split_once("||") | |
.ok_or_else(|| anyhow::anyhow!("invalid cache key format"))?; | |
let timestamp = parse_timestamp(timestamp_str)?; | |
let frame_path = self.get_frame_path(×tamp.into(), device_id); | |
if let Some(parent) = frame_path.parent() { | |
fs::create_dir_all(parent).await?; | |
} | |
let mut hasher = Sha256::new(); | |
hasher.update(frame_data); | |
let checksum = format!("{:x}", hasher.finalize()); | |
let cached_frame = CachedFrame { | |
timestamp: timestamp.into(), | |
device_id: device_id.to_string(), | |
checksum, | |
metadata: FrameMetadata { | |
file_path: device_data.video_file_path.clone(), | |
app_name: device_data.app_name.clone(), | |
window_name: device_data.window_name.clone(), | |
transcription: audio_entries | |
.iter() | |
.map(|a| a.transcription.clone()) | |
.collect::<Vec<_>>() | |
.join(" "), | |
ocr_text: device_data.text.clone(), | |
}, | |
frame_size: frame_data.len() as u64, | |
compression: CompressionType::Jpeg { | |
quality: self.config.compression_quality, | |
}, | |
source_video: device_data.video_file_path, | |
cached_at: Utc::now(), | |
audio_entries: audio_entries.to_vec(), | |
}; | |
fs::write(&frame_path, frame_data).await?; | |
self.entries.insert( | |
(timestamp.into(), device_id.to_string()), | |
CacheEntry { | |
frame: cached_frame, | |
path: frame_path, | |
last_accessed: SystemTime::now(), | |
}, | |
); | |
self.total_size += frame_data.len() as u64; | |
self.save_index().await?; | |
Ok(()) | |
} | |
async fn get_frame_data( | |
&self, | |
cache_key: &str, | |
) -> Result<Option<(Vec<u8>, FrameMetadata, (DateTime<Utc>, String))>> { | |
let (timestamp_str, device_id) = match cache_key.split_once("||") { | |
Some(parts) => parts, | |
None => return Ok(None), | |
}; | |
debug!("cache lookup for key: {}", cache_key); | |
let timestamp = match parse_timestamp(timestamp_str) { | |
Ok(ts) => ts, | |
Err(e) => { | |
debug!("failed to parse timestamp {}: {}", timestamp_str, e); | |
return Ok(None); | |
} | |
}; | |
let frame_path = self.get_frame_path(×tamp.into(), device_id); | |
debug!("looking for cached frame at: {:?}", frame_path); | |
if !frame_path.exists() { | |
debug!("cache miss - frame not found at {:?}", frame_path); | |
return Ok(None); | |
} | |
debug!("cache hit - reading frame from {:?}", frame_path); | |
let frame_data = fs::read(&frame_path).await?; | |
if let Some(entry) = self.entries.get(&(timestamp.into(), device_id.to_string())) { | |
let mut hasher = Sha256::new(); | |
hasher.update(&frame_data); | |
let checksum = format!("{:x}", hasher.finalize()); | |
if checksum != entry.frame.checksum { | |
debug!("checksum mismatch for frame at {}:{}", timestamp, device_id); | |
return Ok(None); | |
} | |
debug!("successfully retrieved cached frame"); | |
Ok(Some(( | |
frame_data, | |
entry.frame.metadata.clone(), | |
(timestamp.into(), device_id.to_string()), | |
))) | |
} else { | |
debug!("cache miss - no entry in index for frame"); | |
Ok(None) | |
} | |
} | |
fn get_frame_path(&self, timestamp: &DateTime<Utc>, device_id: &str) -> PathBuf { | |
self.config.cache_dir.join(format!( | |
"{}_{}.cache", | |
timestamp.timestamp_micros(), | |
device_id.replace(['/', '\\', ':'], "_") | |
)) | |
} | |
} | |
async fn run_cache_manager(mut cache: FrameDiskCache, mut rx: mpsc::Receiver<CacheMessage>) { | |
while let Some(msg) = rx.recv().await { | |
match msg { | |
CacheMessage::Store { | |
cache_key, | |
frame_data, | |
device_data, | |
audio_entries, | |
response, | |
} => { | |
let result = cache | |
.store_frame(&cache_key, &frame_data, device_data, &audio_entries) | |
.await; | |
let _ = response.send(result); | |
} | |
CacheMessage::Get { | |
cache_key, | |
response, | |
} => { | |
let result = cache.get_frame_data(&cache_key).await; | |
let _ = response.send(result); | |
} | |
} | |
} | |
} | |
#[derive(Clone)] | |
pub struct FrameCache { | |
pub screenpipe_dir: PathBuf, | |
cache_tx: mpsc::Sender<CacheMessage>, | |
db: Arc<DatabaseManager>, | |
} | |
impl FrameCache { | |
pub async fn new(screenpipe_dir: PathBuf, db: Arc<DatabaseManager>) -> Result<Self> { | |
let cache_config = CacheConfig { | |
cache_dir: cache_dir().unwrap().join("screenpipe").join("frames"), | |
..Default::default() | |
}; | |
fs::create_dir_all(&cache_config.cache_dir).await?; | |
let (cache_tx, cache_rx) = mpsc::channel(100); | |
let disk_cache = FrameDiskCache::new(cache_config).await?; | |
tokio::spawn(run_cache_manager(disk_cache, cache_rx)); | |
Ok(Self { | |
screenpipe_dir, | |
cache_tx, | |
db, | |
}) | |
} | |
async fn extract_frames_batch( | |
&self, | |
start_time: DateTime<Utc>, | |
end_time: DateTime<Utc>, | |
frame_tx: FrameChannel, | |
) -> Result<()> { | |
let mut extraction_queue = HashMap::new(); | |
let mut total_frames = 0; | |
debug!( | |
"extracting frames for time range: {} to {}", | |
start_time, end_time | |
); | |
let mut chunks = self.db.find_video_chunks(start_time, end_time).await?; | |
// Sort by timestamp to ensure consistent ordering | |
chunks.frames.sort_by_key(|a| (a.timestamp, a.offset_index)); | |
debug!("found {} chunks to process", chunks.frames.len()); | |
// First pass: process all cache hits | |
for chunk in &chunks.frames { | |
let mut timeseries_frame = TimeSeriesFrame { | |
timestamp: chunk.timestamp, | |
frame_data: Vec::new(), | |
}; | |
for device_data in &chunk.ocr_entries { | |
let cache_key = format!("{}||{}", chunk.timestamp, device_data.device_name); | |
debug!("checking cache for key: {}", cache_key); | |
let (response_tx, response_rx) = oneshot::channel(); | |
self.cache_tx | |
.send(CacheMessage::Get { | |
cache_key: cache_key.clone(), | |
response: response_tx, | |
}) | |
.await?; | |
match response_rx.await? { | |
Ok(Some((frame_data, metadata, _))) => { | |
debug!("cache hit for {}", cache_key); | |
timeseries_frame.frame_data.push(DeviceFrame { | |
device_id: device_data.device_name.clone(), | |
image_data: frame_data, | |
metadata, | |
audio_entries: chunk | |
.audio_entries | |
.iter() | |
.map(|a| AudioEntry { | |
transcription: a.transcription.clone(), | |
device_name: a.device_name.clone(), | |
is_input: a.is_input, | |
audio_file_path: a.audio_file_path.clone(), | |
duration_secs: a.duration_secs, | |
}) | |
.collect(), | |
}); | |
} | |
_ => { | |
debug!("cache miss for {}", cache_key); | |
extraction_queue | |
.entry(device_data.video_file_path.clone()) | |
.or_insert_with(Vec::new) | |
.push((chunk.clone(), device_data.clone())); | |
} | |
} | |
} | |
if !timeseries_frame.frame_data.is_empty() { | |
total_frames += timeseries_frame.frame_data.len(); | |
debug!( | |
"sending cached frame batch with {} devices", | |
timeseries_frame.frame_data.len() | |
); | |
frame_tx.send(timeseries_frame).await?; | |
} | |
} | |
// Second pass: handle cache misses | |
if !extraction_queue.is_empty() { | |
let ffmpeg = find_ffmpeg_path().ok_or_else(|| anyhow::anyhow!("ffmpeg not found"))?; | |
for (file_path, tasks) in extraction_queue { | |
debug!("extracting {} frames from {}", tasks.len(), file_path); | |
let extracted = extract_frame( | |
ffmpeg.clone(), | |
file_path, | |
tasks, | |
frame_tx.clone(), | |
self.cache_tx.clone(), | |
) | |
.await?; | |
total_frames += extracted; | |
} | |
} | |
debug!("total frames processed: {}", total_frames); | |
Ok(()) | |
} | |
pub async fn get_frames( | |
&self, | |
timestamp: DateTime<Utc>, | |
duration_minutes: i64, | |
frame_tx: Sender<TimeSeriesFrame>, | |
descending: bool, | |
) -> Result<()> { | |
let start = timestamp - Duration::minutes(duration_minutes / 2); | |
let end = timestamp + Duration::minutes(duration_minutes / 2); | |
let (extract_tx, mut extract_rx) = mpsc::channel(100); | |
let mut streamer = OrderedFrameStreamer::new(frame_tx, Duration::seconds(60 * 5), descending); | |
// Spawn extraction task | |
let mut extraction_handle = { | |
let cache_clone = self.clone(); | |
tokio::spawn(async move { | |
let result = cache_clone | |
.extract_frames_batch(start, end, extract_tx) | |
.await; | |
debug!("extraction task completed: {:?}", result.is_ok()); | |
result | |
}) | |
}; | |
let timeout_duration = tokio::time::Duration::from_secs(10 * duration_minutes as u64); | |
let result = tokio::time::timeout(timeout_duration, async { | |
loop { | |
tokio::select! { | |
maybe_frame = extract_rx.recv() => { | |
match maybe_frame { | |
Some(frame) => { | |
if let Err(e) = streamer.push(frame).await { | |
debug!("failed to push frame: {}", e); | |
break; | |
} | |
} | |
None => { | |
debug!("extraction channel closed"); | |
break; | |
} | |
} | |
} | |
result = &mut extraction_handle => { | |
match result { | |
Ok(Ok(())) => debug!("extraction completed successfully"), | |
Ok(Err(e)) => debug!("extraction failed: {}", e), | |
Err(e) => debug!("extraction task panicked: {}", e), | |
} | |
break; | |
} | |
} | |
} | |
if let Err(e) = streamer.finish().await { | |
debug!("error during final flush: {}", e); | |
} | |
}) | |
.await; | |
match result { | |
Ok(_) => Ok(()), | |
Err(_) => { | |
debug!( | |
"frame extraction timed out after {} seconds", | |
timeout_duration.as_secs() | |
); | |
Ok(()) | |
} | |
} | |
} | |
} | |
async fn extract_frame( | |
ffmpeg: PathBuf, | |
video_file_path: String, | |
tasks: Vec<(FrameData, OCREntry)>, | |
frame_tx: FrameChannel, | |
cache_tx: mpsc::Sender<CacheMessage>, | |
) -> Result<usize> { | |
if !is_video_file_complete(&ffmpeg, &video_file_path).await? { | |
debug!("skipping incomplete video file: {}", video_file_path); | |
return Ok(0); | |
} | |
let temp_dir = tempfile::tempdir()?; | |
let output_pattern = temp_dir.path().join("frame%d.jpg"); | |
let mut cmd = Command::new(&ffmpeg); | |
cmd.args([ | |
"-i", | |
&video_file_path, | |
"-c:v", | |
"mjpeg", | |
"-q:v", | |
"8", | |
"-qmin", | |
"8", | |
"-qmax", | |
"12", | |
"-vsync", | |
"0", | |
"-threads", | |
"2", | |
output_pattern.to_str().unwrap(), | |
]); | |
debug!("running ffmpeg command: {:?}", cmd); | |
let output = cmd.output().await?; | |
if !output.status.success() { | |
error!("ffmpeg error: {}", String::from_utf8_lossy(&output.stderr)); | |
return Ok(0); | |
} | |
let mut processed = 0; | |
let mut entries = tokio::fs::read_dir(temp_dir.path()).await?; | |
let mut all_frames = Vec::new(); | |
while let Some(entry) = entries.next_entry().await? { | |
let frame_data = tokio::fs::read(entry.path()).await?; | |
all_frames.push(frame_data); | |
} | |
debug!("extracted {} frames from video", all_frames.len()); | |
for (task_index, (chunk, device_data)) in tasks.iter().enumerate() { | |
if task_index >= all_frames.len() { | |
debug!("warning: ran out of frames at index {}", task_index); | |
break; | |
} | |
let frame_data = &all_frames[task_index]; | |
let cache_key = format!("{}||{}", chunk.timestamp, device_data.device_name); | |
debug!("processing frame {} with key {}", task_index, cache_key); | |
// Store in cache first | |
let (response_tx, response_rx) = oneshot::channel(); | |
cache_tx | |
.send(CacheMessage::Store { | |
cache_key: cache_key.clone(), | |
frame_data: frame_data.clone(), | |
device_data: device_data.clone(), | |
audio_entries: chunk | |
.audio_entries | |
.clone() | |
.into_iter() | |
.map(Into::into) | |
.collect(), | |
response: response_tx, | |
}) | |
.await?; | |
response_rx.await??; | |
// Then send the frame | |
frame_tx | |
.send(TimeSeriesFrame { | |
timestamp: chunk.timestamp, | |
frame_data: vec![DeviceFrame { | |
device_id: device_data.device_name.clone(), | |
image_data: frame_data.clone(), | |
metadata: FrameMetadata { | |
file_path: device_data.video_file_path.clone(), | |
app_name: device_data.app_name.clone(), | |
window_name: device_data.window_name.clone(), | |
transcription: chunk | |
.audio_entries | |
.iter() | |
.map(|a| a.transcription.clone()) | |
.collect::<Vec<_>>() | |
.join(" "), | |
ocr_text: device_data.text.clone(), | |
}, | |
audio_entries: chunk | |
.audio_entries | |
.iter() | |
.map(|a| AudioEntry { | |
transcription: a.transcription.clone(), | |
device_name: a.device_name.clone(), | |
is_input: a.is_input, | |
audio_file_path: a.audio_file_path.clone(), | |
duration_secs: a.duration_secs, | |
}) | |
.collect(), | |
}], | |
}) | |
.await?; | |
processed += 1; | |
} | |
debug!("processed {} frames from video file", processed); | |
Ok(processed) | |
} | |
async fn is_video_file_complete(ffmpeg_path: &PathBuf, file_path: &str) -> Result<bool> { | |
if let Ok(metadata) = tokio::fs::metadata(file_path).await { | |
if let Ok(modified) = metadata.modified() { | |
let age = SystemTime::now() | |
.duration_since(modified) | |
.unwrap_or_default(); | |
if age.as_secs() < 60 { | |
return Ok(false); | |
} | |
} | |
} | |
match Command::new(&ffmpeg_path) | |
.args(&["-v", "error", "-i", file_path, "-f", "null", "-"]) | |
.output() | |
.await | |
{ | |
Ok(output) => { | |
let is_complete = output.status.success(); | |
if !is_complete { | |
debug!( | |
"file {} is incomplete or corrupted: {:?}", | |
file_path, | |
String::from_utf8_lossy(&output.stderr) | |
); | |
} | |
Ok(is_complete) | |
} | |
Err(e) => { | |
debug!("failed to check file {}: {}", file_path, e); | |
Ok(false) | |
} | |
} | |
} | |
fn parse_timestamp(timestamp_str: &str) -> Result<DateTime<Utc>> { | |
// First try direct RFC3339 parsing | |
if let Ok(dt) = DateTime::parse_from_rfc3339(timestamp_str) { | |
return Ok(dt.with_timezone(&Utc)); | |
} | |
// Handle " UTC" suffix by converting to Z format | |
let cleaned = timestamp_str.trim_end_matches(" UTC").replace(' ', "T"); | |
// Ensure we have a Z or +00:00 timezone marker | |
let timestamp_with_tz = if !cleaned.ends_with('Z') && !cleaned.contains('+') { | |
format!("{}Z", cleaned) | |
} else { | |
cleaned | |
}; | |
DateTime::parse_from_rfc3339(×tamp_with_tz) | |
.map(|dt| dt.with_timezone(&Utc)) | |
.map_err(|e| anyhow::anyhow!("failed to parse timestamp '{}': {}", timestamp_str, e)) | |
} | |
struct OrderedFrameStreamer { | |
buffer: BTreeMap<DateTime<Utc>, Vec<TimeSeriesFrame>>, | |
window_size: Duration, | |
last_sent: Option<DateTime<Utc>>, | |
tx: mpsc::Sender<TimeSeriesFrame>, | |
descending: bool, | |
} | |
impl OrderedFrameStreamer { | |
fn new(tx: mpsc::Sender<TimeSeriesFrame>, window_size: Duration, descending: bool) -> Self { | |
Self { | |
buffer: BTreeMap::new(), | |
window_size, | |
last_sent: None, | |
tx, | |
descending, | |
} | |
} | |
async fn push(&mut self, frame: TimeSeriesFrame) -> Result<()> { | |
let ts = frame.timestamp; | |
self.buffer.entry(ts).or_default().push(frame); | |
// Sort frames within each timestamp by device_id for consistency | |
if let Some(frames) = self.buffer.get_mut(&ts) { | |
frames.sort_by(|a, b| { | |
a.frame_data | |
.first() | |
.map(|f| &f.device_id) | |
.cmp(&b.frame_data.first().map(|f| &f.device_id)) | |
}); | |
} | |
self.flush_window().await | |
} | |
async fn flush_window(&mut self) -> Result<()> { | |
if self.last_sent.is_none() && !self.buffer.is_empty() { | |
self.last_sent = if self.descending { | |
self.buffer.keys().next_back().copied() | |
} else { | |
self.buffer.keys().next().copied() | |
}; | |
} | |
let Some(last_sent) = self.last_sent else { | |
return Ok(()); | |
}; | |
// Determine the window range based on direction | |
let window_range = if self.descending { | |
(last_sent - self.window_size)..=last_sent | |
} else { | |
last_sent..=(last_sent + self.window_size) | |
}; | |
// Collect frames that are ready to be sent | |
let mut ready_frames = Vec::new(); | |
for (_, frames) in self.buffer.range(window_range) { | |
ready_frames.extend(frames.iter().cloned()); | |
} | |
// Sort all collected frames | |
ready_frames.sort_by_key(|frame| { | |
if self.descending { | |
std::cmp::Reverse(frame.timestamp) | |
} else { | |
std::cmp::Reverse(std::cmp::Reverse(frame.timestamp)).0 | |
} | |
}); | |
// Send frames and update buffer | |
for frame in ready_frames { | |
let ts = frame.timestamp; | |
self.tx.send(frame).await?; | |
self.buffer.remove(&ts); | |
self.last_sent = Some(ts); | |
} | |
Ok(()) | |
} | |
async fn finish(self) -> Result<()> { | |
let mut all_frames = Vec::new(); | |
// Collect all remaining frames | |
for frames in self.buffer.values() { | |
all_frames.extend(frames.iter().cloned()); | |
} | |
// Sort them according to the desired order | |
all_frames.sort_by_key(|frame| { | |
if self.descending { | |
std::cmp::Reverse(frame.timestamp) | |
} else { | |
std::cmp::Reverse(std::cmp::Reverse(frame.timestamp)).0 | |
} | |
}); | |
// Send all remaining frames | |
for frame in all_frames { | |
self.tx.send(frame).await?; | |
} | |
Ok(()) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment