Skip to content

Instantly share code, notes, and snippets.

@tadghh
Created November 26, 2024 20:56
blog cache manager
```rust
use core::{fmt, future::Future};
use std::{io, sync::Arc};
use tokio::{
sync::{OnceCell, RwLock},
task,
};
use crate::server_functions::{blog::ConvertToBlogContentVec, database::get_blogs};
use super::{
blog::{BlogContent, BlogPreview},
database::get_project_previews_db,
project::ProjectPreview,
utility::{get_blog_view_log, insert_all_blog_views, BlogView, BlogViewContainer},
};
macro_rules! spawn_interval_task {
($interval:expr, $task:expr) => {
task::spawn(async move {
let interval_int = if cfg!(blog_test) { 120 } else { $interval };
println!("Initializing cache update loop: {}s", interval_int);
let mut interval =
tokio::time::interval(tokio::time::Duration::from_secs(interval_int));
loop {
interval.tick().await;
$task.await;
}
})
};
}
pub struct BlogCache {
project_previews: Arc<RwLock<Vec<ProjectPreview>>>,
blog_previews: Arc<RwLock<Vec<BlogPreview>>>,
blog_contents: Arc<RwLock<Vec<BlogContent>>>,
blog_views: Arc<RwLock<Vec<BlogViewContainer>>>,
}
impl BlogCache {
pub fn new() -> Self {
let cache = BlogCache {
project_previews: Arc::new(RwLock::new(Vec::new())),
blog_previews: Arc::new(RwLock::new(Vec::new())),
blog_contents: Arc::new(RwLock::new(Vec::new())),
blog_views: Arc::new(RwLock::new(Vec::new())),
};
// #[allow(deprecated, unexpected_cfgs, unused_mut, unused_assignments)]
// task::spawn(async move {
// let mut interval_int = 60 * 60;
// #[cfg(blog_test)]
// {
// interval_int = 120;
// println!("Test configuration detected, using shortened interval");
// }
// println!(
// "Initializing cache update loop {:?}s",
// tokio::time::Duration::from_secs(interval_int)
// );
// let mut interval =
// tokio::time::interval(tokio::time::Duration::from_secs(interval_int));
// loop {
// interval.tick().await;
// // Self::update_projects_cache(&cache_clone).await;
// Self::update_blog_content_cache(&cache_clone).await;
// // TODO deal with blog missing from db, but somehow has view
// Self::update_blog_views_cache(&cache_clone).await;
// }
// });
// Spawn project preview updater
let project_previews = Arc::clone(&cache.project_previews);
spawn_interval_task!(60 * 60, BlogCache::update_projects_cache(&project_previews));
// Spawn blog content updater
let blog_contents = Arc::clone(&cache.blog_contents);
let blog_previews = Arc::clone(&cache.blog_previews);
spawn_interval_task!(
60 * 60,
BlogCache::update_blog_content_cache(&blog_contents, &blog_previews)
);
// Spawn view updater
let blog_views = Arc::clone(&cache.blog_views);
spawn_interval_task!(60 * 60, BlogCache::update_blog_views_cache(&blog_views));
cache
}
async fn update_projects_cache(previews: &Arc<RwLock<Vec<ProjectPreview>>>) {
let db_previews = get_project_previews_db().await.unwrap_or_default();
// Lock the cache and get the cached previews
let mut cache_lock = previews.write().await;
let cached_previews = &mut *cache_lock;
// Update or insert missing/modified projects in the cache
for db_preview in &db_previews {
// Find if the project exists in the cache by ID
if let Some(cached_preview) = cached_previews.iter_mut().find(|p| p.id == db_preview.id)
{
// Check if the cached item is outdated by comparing checksums
if cached_preview.get_checksum() != db_preview.get_checksum() {
*cached_preview = db_preview.clone();
println!("Updated cached content for project ID: {}", db_preview.id);
}
} else {
// If project does not exist in cache, add it
cached_previews.push(db_preview.clone());
println!("Added new project content to cache: {}", db_preview.id);
}
}
}
// CDN my ass
async fn update_blog_views_cache(cache_clone: &Arc<RwLock<Vec<BlogViewContainer>>>) {
// Helper function to insert new views
fn insert_views(
items: Vec<BlogViewContainer>,
) -> impl Future<Output = sqlx::Result<()>> + Send {
let new_view_items = items.clone(); // Clone to move into the closure
println!("pre");
async move {
println!("in move");
match insert_all_blog_views(new_view_items).await {
Ok(_) => println!("insert passed"),
Err(e) => println!("Error sql {:?}", e),
};
Ok(())
}
}
// Get current state
let view_blog_db = get_blog_view_log().await.unwrap();
let mut cached_views_write = cache_clone.write().await;
// let cached_views
println!("Starting blog views cache update");
// Case 1: DB empty but cache has data - sync cache to DB
if view_blog_db.is_empty() && !cached_views_write.is_empty() {
println!(
"Database empty, syncing {} cached views",
cached_views_write.len()
);
let future = insert_views(cached_views_write.to_vec()); // Get the future
tokio::spawn(future);
}
let mut new_view_items = Vec::new();
// Case 2: Both DB and cache have data - reconcile differences
if !cached_views_write.is_empty() && !view_blog_db.is_empty() {
println!("Reconciling {} DB entries with cache", view_blog_db.len());
for db_view in &view_blog_db {
let blog_id = db_view.get_blog_post_id();
if let Some(cached_view) = cached_views_write
.iter_mut()
.find(|p| p.get_blog_post_id() == blog_id)
{
// Find IPs in cache but not in DB
let new_db_ips: Vec<String> = cached_view
.get_ip_addresses()
.iter()
.filter(|ip| !db_view.get_ip_addresses().contains(*ip))
.cloned()
.collect();
// Find IPs in DB but not in cache
let new_cache_ips: Vec<String> = db_view
.get_ip_addresses()
.iter()
.filter(|ip| !cached_view.get_ip_addresses().contains(*ip))
.cloned()
.collect();
// If we found new IPs that need to be stored in DB
if !new_db_ips.is_empty() {
println!("Found {} new IPs for blog {}", new_db_ips.len(), blog_id);
new_view_items.push(BlogViewContainer::new(*blog_id, new_db_ips.clone()));
}
// Merge all unique IPs
if !new_db_ips.is_empty() || !new_cache_ips.is_empty() {
let all_unique_ips: Vec<String> = new_db_ips
.into_iter()
.chain(new_cache_ips.into_iter())
.chain(cached_view.get_ip_addresses().iter().cloned())
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect();
*cached_view = BlogViewContainer::new(*blog_id, all_unique_ips);
println!("Updated cache entry for blog {}", blog_id);
}
} else {
// New blog entry found in DB, add to cache
println!("Adding new blog view {} to cache", blog_id);
cached_views_write.push(db_view.clone());
}
}
} else if !view_blog_db.is_empty() {
println!("Cache is empty, initializing from database");
cached_views_write.extend(view_blog_db);
} else {
println!("database empty there are no views :(");
}
// Insert any new views collected during reconciliation
if !new_view_items.is_empty() {
println!("added new view items {:?}", new_view_items);
let future = insert_views(new_view_items.to_vec()); // Get the future
tokio::spawn(future);
}
println!("Blog views cache update completed");
}
async fn update_blog_content_cache(
cache_clone: &Arc<RwLock<Vec<BlogContent>>>,
preview_cache: &Arc<RwLock<Vec<BlogPreview>>>,
) {
let all_blog_contents = get_blogs().await.unwrap().convert_to_blog_content_vec();
// Lock the cache and get the cached previews
let mut cache_lock = cache_clone.write().await;
let mut preview_lock = preview_cache.write().await;
let cached_blogs = &mut *cache_lock;
let cached_previews = &mut *preview_lock;
// Update or insert missing/modified projects in the cache
for blog_content in &all_blog_contents {
// Find if the project exists in the cache by ID
println!("checking content for blog ID: {}", blog_content.id);
if let Some(cached_blog) = cached_blogs.iter_mut().find(|p| p.id == blog_content.id) {
// Check if the cached item is outdated by comparing checksums
// we check the description so this will catch preview related stuff
if cached_blog.get_checksum() != blog_content.get_checksum() {
*cached_blog = blog_content.clone();
if let Some(cached_preview) =
cached_previews.iter_mut().find(|p| p.id == blog_content.id)
{
// Check if the cached item is outdated by comparing checksums
*cached_preview = blog_content.clone().into();
println!(
"Updated cached content for preview blog ID: {}",
blog_content.id
);
}
println!("Updated cached content for blog ID: {}", blog_content.id);
}
} else {
// If project does not exist in cache, add it
println!("Added new blog content to cache: {}", blog_content.id);
cached_blogs.push(blog_content.clone());
cached_previews.push(blog_content.clone().into());
}
}
println!("Done checking blogs for cache updates");
}
pub async fn get_blog_views(&self) -> Vec<BlogViewContainer> {
// Using .await with tokio's RwLock
match self.blog_views.read().await {
guard => {
let project_count = guard.len();
println!("Retrieved blog views from cache {} ", project_count);
guard.clone()
}
}
}
pub async fn get_blogs_views(&self, blog_id: &i32) -> Option<usize> {
match self.blog_views.read().await {
guard => {
println!("got blog views");
if let Some(wanted_blog) =
guard.iter().find(|blog| blog.get_blog_post_id() == blog_id)
{
Some(wanted_blog.get_ip_addresses().len())
} else {
None
}
}
} // Lock for safe access
}
// Add a BlogPreview to the cache
pub async fn add_preview(&self, preview: BlogPreview) {
let preview_id = preview.id;
let mut guard = self.blog_previews.write().await;
println!("Adding new blog preview to cache {}", preview_id);
guard.push(preview);
}
pub async fn add_view(&self, preview: BlogView) {
let mut contents = self.blog_views.write().await; // Lock for safe access
if let Some(existing_entry) = contents
.iter_mut()
.find(|entry| entry.get_blog_post_id() == preview.get_blog_post_id())
{
// Add IP address to the existing entry
if !existing_entry
.get_ip_addresses()
.contains(&preview.get_view_ip())
{
println!(
"IP {} Viewed Blog {}",
&preview.get_view_ip(),
&preview.get_blog_post_id()
);
existing_entry.add_ip_address(preview.get_view_ip());
}
} else {
// If not found, create a new BlogViewGay entry and add it to contents
println!(
"New IP {} Viewed Blog {}",
&preview.get_view_ip(),
&preview.get_blog_post_id()
);
contents.push(BlogViewContainer::new(
preview.get_blog_post_id().to_owned(),
vec![preview.get_view_ip()],
));
}
}
pub async fn add_project(&self, preview: ProjectPreview) {
let project_id = preview.id;
// let preview_id = preview.id;
let mut guard = self.project_previews.write().await;
println!("Adding new blog preview to cache {}", project_id);
guard.push(preview);
}
pub async fn get_projects(&self) -> Vec<ProjectPreview> {
match self.project_previews.read().await {
guard => {
let project_count = guard.len();
println!("Retrieved project previews from cache {} ", project_count);
guard.clone()
}
}
}
// Add a BlogContent to the cache
pub async fn add_content(&self, content: BlogContent) {
let content_id = content.id;
println!("Adding new blog content to cache {}", content_id);
// Add to blog contents
let mut blog_content_guard = self.blog_contents.write().await;
let mut blog_preview_guard = self.blog_previews.write().await;
// println!("Adding new blog preview to cache {}", project_id);
blog_preview_guard.push(content.clone().into());
blog_content_guard.push(content);
// Add to previews
println!("Successfully added blog content and preview to cache");
}
// Get a reference to all BlogContents
pub async fn get_contents(&self) -> Vec<BlogContent> {
match self.blog_contents.read().await {
guard => {
println!("Retrieved blog contents from cache");
guard.clone()
}
}
}
pub async fn get_blog_previews(&self) -> Vec<BlogPreview> {
match self.blog_previews.read().await {
guard => {
println!("Retrieved blog previews from cache");
guard.clone()
}
}
}
// Method to retrieve a BlogContent by id (title)
pub async fn get_blog_by_id(&self, id: &str) -> Option<BlogContent> {
match self.blog_contents.read().await {
guard => {
let result = guard
.iter()
.find(|content| content.get_title() == id)
.cloned();
match &result {
Some(_) => println!("Found blog content by ID {}", id),
None => println!("Blog content not found {}", id),
}
result
}
}
}
pub async fn get_blog_by_file_id(&self, id: String) -> Option<BlogContent> {
match self.blog_contents.read().await {
guard => {
let result = guard.iter().find(|content| content.file_id == id).cloned();
match &result {
Some(_) => println!("Found blog content by ID {}", id),
None => println!("Blog content not found {}", id),
}
result
}
}
}
// Clear all BlogContents
pub async fn clear_contents(&self) {
let mut blog_content_guard = self.blog_contents.write().await;
blog_content_guard.clear();
println!("Cleared all blog contents from cache");
}
}
impl Clone for BlogCache {
fn clone(&self) -> Self {
BlogCache {
project_previews: Arc::clone(&self.project_previews),
blog_previews: Arc::clone(&self.blog_previews),
blog_contents: Arc::clone(&self.blog_contents),
blog_views: Arc::clone(&self.blog_views),
}
}
}
// Initialize a static cache instance
static CONTENT_CACHE: OnceCell<BlogCache> = OnceCell::const_new();
#[derive(Debug)]
pub enum CacheError {
CacheError(String),
Other(String),
}
// Implement Display for CacheError
impl fmt::Display for CacheError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
CacheError::CacheError(msg) => write!(f, "Cache error: {}", msg),
CacheError::Other(msg) => write!(f, "Error: {}", msg),
}
}
}
impl From<CacheError> for io::Error {
fn from(err: CacheError) -> io::Error {
io::Error::new(io::ErrorKind::Other, err.to_string())
}
}
// Initialize the cache if it hasn't been initialized already
pub async fn initialize_cache() -> Result<(), CacheError> {
println!("Starting cache initialization");
match CONTENT_CACHE.set(BlogCache::new()) {
Ok(_) => Ok(()),
Err(e) => {
println!("Failed to initialize cache {}", e);
Err(CacheError::Other(e.to_string()))
}
}
}
// Function to get a reference to the initialized cache
pub fn get_cache<'a>() -> &'a BlogCache {
match CONTENT_CACHE.get() {
Some(cache) => cache,
None => {
panic!("Cache not initialized");
}
}
}
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment