Created
November 26, 2024 20:56
blog cache manager
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
```rust | |
use core::{fmt, future::Future}; | |
use std::{io, sync::Arc}; | |
use tokio::{ | |
sync::{OnceCell, RwLock}, | |
task, | |
}; | |
use crate::server_functions::{blog::ConvertToBlogContentVec, database::get_blogs}; | |
use super::{ | |
blog::{BlogContent, BlogPreview}, | |
database::get_project_previews_db, | |
project::ProjectPreview, | |
utility::{get_blog_view_log, insert_all_blog_views, BlogView, BlogViewContainer}, | |
}; | |
macro_rules! spawn_interval_task { | |
($interval:expr, $task:expr) => { | |
task::spawn(async move { | |
let interval_int = if cfg!(blog_test) { 120 } else { $interval }; | |
println!("Initializing cache update loop: {}s", interval_int); | |
let mut interval = | |
tokio::time::interval(tokio::time::Duration::from_secs(interval_int)); | |
loop { | |
interval.tick().await; | |
$task.await; | |
} | |
}) | |
}; | |
} | |
pub struct BlogCache { | |
project_previews: Arc<RwLock<Vec<ProjectPreview>>>, | |
blog_previews: Arc<RwLock<Vec<BlogPreview>>>, | |
blog_contents: Arc<RwLock<Vec<BlogContent>>>, | |
blog_views: Arc<RwLock<Vec<BlogViewContainer>>>, | |
} | |
impl BlogCache { | |
pub fn new() -> Self { | |
let cache = BlogCache { | |
project_previews: Arc::new(RwLock::new(Vec::new())), | |
blog_previews: Arc::new(RwLock::new(Vec::new())), | |
blog_contents: Arc::new(RwLock::new(Vec::new())), | |
blog_views: Arc::new(RwLock::new(Vec::new())), | |
}; | |
// #[allow(deprecated, unexpected_cfgs, unused_mut, unused_assignments)] | |
// task::spawn(async move { | |
// let mut interval_int = 60 * 60; | |
// #[cfg(blog_test)] | |
// { | |
// interval_int = 120; | |
// println!("Test configuration detected, using shortened interval"); | |
// } | |
// println!( | |
// "Initializing cache update loop {:?}s", | |
// tokio::time::Duration::from_secs(interval_int) | |
// ); | |
// let mut interval = | |
// tokio::time::interval(tokio::time::Duration::from_secs(interval_int)); | |
// loop { | |
// interval.tick().await; | |
// // Self::update_projects_cache(&cache_clone).await; | |
// Self::update_blog_content_cache(&cache_clone).await; | |
// // TODO deal with blog missing from db, but somehow has view | |
// Self::update_blog_views_cache(&cache_clone).await; | |
// } | |
// }); | |
// Spawn project preview updater | |
let project_previews = Arc::clone(&cache.project_previews); | |
spawn_interval_task!(60 * 60, BlogCache::update_projects_cache(&project_previews)); | |
// Spawn blog content updater | |
let blog_contents = Arc::clone(&cache.blog_contents); | |
let blog_previews = Arc::clone(&cache.blog_previews); | |
spawn_interval_task!( | |
60 * 60, | |
BlogCache::update_blog_content_cache(&blog_contents, &blog_previews) | |
); | |
// Spawn view updater | |
let blog_views = Arc::clone(&cache.blog_views); | |
spawn_interval_task!(60 * 60, BlogCache::update_blog_views_cache(&blog_views)); | |
cache | |
} | |
async fn update_projects_cache(previews: &Arc<RwLock<Vec<ProjectPreview>>>) { | |
let db_previews = get_project_previews_db().await.unwrap_or_default(); | |
// Lock the cache and get the cached previews | |
let mut cache_lock = previews.write().await; | |
let cached_previews = &mut *cache_lock; | |
// Update or insert missing/modified projects in the cache | |
for db_preview in &db_previews { | |
// Find if the project exists in the cache by ID | |
if let Some(cached_preview) = cached_previews.iter_mut().find(|p| p.id == db_preview.id) | |
{ | |
// Check if the cached item is outdated by comparing checksums | |
if cached_preview.get_checksum() != db_preview.get_checksum() { | |
*cached_preview = db_preview.clone(); | |
println!("Updated cached content for project ID: {}", db_preview.id); | |
} | |
} else { | |
// If project does not exist in cache, add it | |
cached_previews.push(db_preview.clone()); | |
println!("Added new project content to cache: {}", db_preview.id); | |
} | |
} | |
} | |
// CDN my ass | |
async fn update_blog_views_cache(cache_clone: &Arc<RwLock<Vec<BlogViewContainer>>>) { | |
// Helper function to insert new views | |
fn insert_views( | |
items: Vec<BlogViewContainer>, | |
) -> impl Future<Output = sqlx::Result<()>> + Send { | |
let new_view_items = items.clone(); // Clone to move into the closure | |
println!("pre"); | |
async move { | |
println!("in move"); | |
match insert_all_blog_views(new_view_items).await { | |
Ok(_) => println!("insert passed"), | |
Err(e) => println!("Error sql {:?}", e), | |
}; | |
Ok(()) | |
} | |
} | |
// Get current state | |
let view_blog_db = get_blog_view_log().await.unwrap(); | |
let mut cached_views_write = cache_clone.write().await; | |
// let cached_views | |
println!("Starting blog views cache update"); | |
// Case 1: DB empty but cache has data - sync cache to DB | |
if view_blog_db.is_empty() && !cached_views_write.is_empty() { | |
println!( | |
"Database empty, syncing {} cached views", | |
cached_views_write.len() | |
); | |
let future = insert_views(cached_views_write.to_vec()); // Get the future | |
tokio::spawn(future); | |
} | |
let mut new_view_items = Vec::new(); | |
// Case 2: Both DB and cache have data - reconcile differences | |
if !cached_views_write.is_empty() && !view_blog_db.is_empty() { | |
println!("Reconciling {} DB entries with cache", view_blog_db.len()); | |
for db_view in &view_blog_db { | |
let blog_id = db_view.get_blog_post_id(); | |
if let Some(cached_view) = cached_views_write | |
.iter_mut() | |
.find(|p| p.get_blog_post_id() == blog_id) | |
{ | |
// Find IPs in cache but not in DB | |
let new_db_ips: Vec<String> = cached_view | |
.get_ip_addresses() | |
.iter() | |
.filter(|ip| !db_view.get_ip_addresses().contains(*ip)) | |
.cloned() | |
.collect(); | |
// Find IPs in DB but not in cache | |
let new_cache_ips: Vec<String> = db_view | |
.get_ip_addresses() | |
.iter() | |
.filter(|ip| !cached_view.get_ip_addresses().contains(*ip)) | |
.cloned() | |
.collect(); | |
// If we found new IPs that need to be stored in DB | |
if !new_db_ips.is_empty() { | |
println!("Found {} new IPs for blog {}", new_db_ips.len(), blog_id); | |
new_view_items.push(BlogViewContainer::new(*blog_id, new_db_ips.clone())); | |
} | |
// Merge all unique IPs | |
if !new_db_ips.is_empty() || !new_cache_ips.is_empty() { | |
let all_unique_ips: Vec<String> = new_db_ips | |
.into_iter() | |
.chain(new_cache_ips.into_iter()) | |
.chain(cached_view.get_ip_addresses().iter().cloned()) | |
.collect::<std::collections::HashSet<_>>() | |
.into_iter() | |
.collect(); | |
*cached_view = BlogViewContainer::new(*blog_id, all_unique_ips); | |
println!("Updated cache entry for blog {}", blog_id); | |
} | |
} else { | |
// New blog entry found in DB, add to cache | |
println!("Adding new blog view {} to cache", blog_id); | |
cached_views_write.push(db_view.clone()); | |
} | |
} | |
} else if !view_blog_db.is_empty() { | |
println!("Cache is empty, initializing from database"); | |
cached_views_write.extend(view_blog_db); | |
} else { | |
println!("database empty there are no views :("); | |
} | |
// Insert any new views collected during reconciliation | |
if !new_view_items.is_empty() { | |
println!("added new view items {:?}", new_view_items); | |
let future = insert_views(new_view_items.to_vec()); // Get the future | |
tokio::spawn(future); | |
} | |
println!("Blog views cache update completed"); | |
} | |
async fn update_blog_content_cache( | |
cache_clone: &Arc<RwLock<Vec<BlogContent>>>, | |
preview_cache: &Arc<RwLock<Vec<BlogPreview>>>, | |
) { | |
let all_blog_contents = get_blogs().await.unwrap().convert_to_blog_content_vec(); | |
// Lock the cache and get the cached previews | |
let mut cache_lock = cache_clone.write().await; | |
let mut preview_lock = preview_cache.write().await; | |
let cached_blogs = &mut *cache_lock; | |
let cached_previews = &mut *preview_lock; | |
// Update or insert missing/modified projects in the cache | |
for blog_content in &all_blog_contents { | |
// Find if the project exists in the cache by ID | |
println!("checking content for blog ID: {}", blog_content.id); | |
if let Some(cached_blog) = cached_blogs.iter_mut().find(|p| p.id == blog_content.id) { | |
// Check if the cached item is outdated by comparing checksums | |
// we check the description so this will catch preview related stuff | |
if cached_blog.get_checksum() != blog_content.get_checksum() { | |
*cached_blog = blog_content.clone(); | |
if let Some(cached_preview) = | |
cached_previews.iter_mut().find(|p| p.id == blog_content.id) | |
{ | |
// Check if the cached item is outdated by comparing checksums | |
*cached_preview = blog_content.clone().into(); | |
println!( | |
"Updated cached content for preview blog ID: {}", | |
blog_content.id | |
); | |
} | |
println!("Updated cached content for blog ID: {}", blog_content.id); | |
} | |
} else { | |
// If project does not exist in cache, add it | |
println!("Added new blog content to cache: {}", blog_content.id); | |
cached_blogs.push(blog_content.clone()); | |
cached_previews.push(blog_content.clone().into()); | |
} | |
} | |
println!("Done checking blogs for cache updates"); | |
} | |
pub async fn get_blog_views(&self) -> Vec<BlogViewContainer> { | |
// Using .await with tokio's RwLock | |
match self.blog_views.read().await { | |
guard => { | |
let project_count = guard.len(); | |
println!("Retrieved blog views from cache {} ", project_count); | |
guard.clone() | |
} | |
} | |
} | |
pub async fn get_blogs_views(&self, blog_id: &i32) -> Option<usize> { | |
match self.blog_views.read().await { | |
guard => { | |
println!("got blog views"); | |
if let Some(wanted_blog) = | |
guard.iter().find(|blog| blog.get_blog_post_id() == blog_id) | |
{ | |
Some(wanted_blog.get_ip_addresses().len()) | |
} else { | |
None | |
} | |
} | |
} // Lock for safe access | |
} | |
// Add a BlogPreview to the cache | |
pub async fn add_preview(&self, preview: BlogPreview) { | |
let preview_id = preview.id; | |
let mut guard = self.blog_previews.write().await; | |
println!("Adding new blog preview to cache {}", preview_id); | |
guard.push(preview); | |
} | |
pub async fn add_view(&self, preview: BlogView) { | |
let mut contents = self.blog_views.write().await; // Lock for safe access | |
if let Some(existing_entry) = contents | |
.iter_mut() | |
.find(|entry| entry.get_blog_post_id() == preview.get_blog_post_id()) | |
{ | |
// Add IP address to the existing entry | |
if !existing_entry | |
.get_ip_addresses() | |
.contains(&preview.get_view_ip()) | |
{ | |
println!( | |
"IP {} Viewed Blog {}", | |
&preview.get_view_ip(), | |
&preview.get_blog_post_id() | |
); | |
existing_entry.add_ip_address(preview.get_view_ip()); | |
} | |
} else { | |
// If not found, create a new BlogViewGay entry and add it to contents | |
println!( | |
"New IP {} Viewed Blog {}", | |
&preview.get_view_ip(), | |
&preview.get_blog_post_id() | |
); | |
contents.push(BlogViewContainer::new( | |
preview.get_blog_post_id().to_owned(), | |
vec![preview.get_view_ip()], | |
)); | |
} | |
} | |
pub async fn add_project(&self, preview: ProjectPreview) { | |
let project_id = preview.id; | |
// let preview_id = preview.id; | |
let mut guard = self.project_previews.write().await; | |
println!("Adding new blog preview to cache {}", project_id); | |
guard.push(preview); | |
} | |
pub async fn get_projects(&self) -> Vec<ProjectPreview> { | |
match self.project_previews.read().await { | |
guard => { | |
let project_count = guard.len(); | |
println!("Retrieved project previews from cache {} ", project_count); | |
guard.clone() | |
} | |
} | |
} | |
// Add a BlogContent to the cache | |
pub async fn add_content(&self, content: BlogContent) { | |
let content_id = content.id; | |
println!("Adding new blog content to cache {}", content_id); | |
// Add to blog contents | |
let mut blog_content_guard = self.blog_contents.write().await; | |
let mut blog_preview_guard = self.blog_previews.write().await; | |
// println!("Adding new blog preview to cache {}", project_id); | |
blog_preview_guard.push(content.clone().into()); | |
blog_content_guard.push(content); | |
// Add to previews | |
println!("Successfully added blog content and preview to cache"); | |
} | |
// Get a reference to all BlogContents | |
pub async fn get_contents(&self) -> Vec<BlogContent> { | |
match self.blog_contents.read().await { | |
guard => { | |
println!("Retrieved blog contents from cache"); | |
guard.clone() | |
} | |
} | |
} | |
pub async fn get_blog_previews(&self) -> Vec<BlogPreview> { | |
match self.blog_previews.read().await { | |
guard => { | |
println!("Retrieved blog previews from cache"); | |
guard.clone() | |
} | |
} | |
} | |
// Method to retrieve a BlogContent by id (title) | |
pub async fn get_blog_by_id(&self, id: &str) -> Option<BlogContent> { | |
match self.blog_contents.read().await { | |
guard => { | |
let result = guard | |
.iter() | |
.find(|content| content.get_title() == id) | |
.cloned(); | |
match &result { | |
Some(_) => println!("Found blog content by ID {}", id), | |
None => println!("Blog content not found {}", id), | |
} | |
result | |
} | |
} | |
} | |
pub async fn get_blog_by_file_id(&self, id: String) -> Option<BlogContent> { | |
match self.blog_contents.read().await { | |
guard => { | |
let result = guard.iter().find(|content| content.file_id == id).cloned(); | |
match &result { | |
Some(_) => println!("Found blog content by ID {}", id), | |
None => println!("Blog content not found {}", id), | |
} | |
result | |
} | |
} | |
} | |
// Clear all BlogContents | |
pub async fn clear_contents(&self) { | |
let mut blog_content_guard = self.blog_contents.write().await; | |
blog_content_guard.clear(); | |
println!("Cleared all blog contents from cache"); | |
} | |
} | |
impl Clone for BlogCache { | |
fn clone(&self) -> Self { | |
BlogCache { | |
project_previews: Arc::clone(&self.project_previews), | |
blog_previews: Arc::clone(&self.blog_previews), | |
blog_contents: Arc::clone(&self.blog_contents), | |
blog_views: Arc::clone(&self.blog_views), | |
} | |
} | |
} | |
// Initialize a static cache instance | |
static CONTENT_CACHE: OnceCell<BlogCache> = OnceCell::const_new(); | |
#[derive(Debug)] | |
pub enum CacheError { | |
CacheError(String), | |
Other(String), | |
} | |
// Implement Display for CacheError | |
impl fmt::Display for CacheError { | |
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | |
match self { | |
CacheError::CacheError(msg) => write!(f, "Cache error: {}", msg), | |
CacheError::Other(msg) => write!(f, "Error: {}", msg), | |
} | |
} | |
} | |
impl From<CacheError> for io::Error { | |
fn from(err: CacheError) -> io::Error { | |
io::Error::new(io::ErrorKind::Other, err.to_string()) | |
} | |
} | |
// Initialize the cache if it hasn't been initialized already | |
pub async fn initialize_cache() -> Result<(), CacheError> { | |
println!("Starting cache initialization"); | |
match CONTENT_CACHE.set(BlogCache::new()) { | |
Ok(_) => Ok(()), | |
Err(e) => { | |
println!("Failed to initialize cache {}", e); | |
Err(CacheError::Other(e.to_string())) | |
} | |
} | |
} | |
// Function to get a reference to the initialized cache | |
pub fn get_cache<'a>() -> &'a BlogCache { | |
match CONTENT_CACHE.get() { | |
Some(cache) => cache, | |
None => { | |
panic!("Cache not initialized"); | |
} | |
} | |
} | |
``` |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment