Last active
September 22, 2024 15:21
-
-
Save Object905/cf10ffd97595887bb7b3868c89a793d7 to your computer and use it in GitHub Desktop.
Pingora cache-example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::fmt::Debug; | |
use bytes::Bytes; | |
use lz4_flex; | |
use pingora::{Error, Result}; | |
/// Content types that skip cache compression by default | |
pub const SKIP_COMPRESSION: &'static [&'static str] = &[ | |
"image/avif", | |
"image/webp", | |
"image/png", | |
"image/jpeg", | |
"font/woff2", | |
"font/woff", | |
"video/webm", | |
"video/ogg", | |
"video/mpeg", | |
"video/mp4", | |
"application/zip", | |
"application/gzip", | |
]; | |
pub trait CacheCompression: Default + Debug + Clone + Send + Sync + 'static { | |
fn encode(&self, buf: Bytes) -> Result<Bytes>; | |
fn decode(&self, buf: &Bytes) -> Result<Bytes>; | |
fn decode_range(&mut self, buf: &Bytes, range_start: usize, range_end: usize) -> Result<Bytes> { | |
self.decode(buf) | |
.map(|buf| buf.slice(range_start..range_end)) | |
} | |
fn should_compress(&self, content_type: Option<&str>) -> bool { | |
if let Some(content_type) = content_type { | |
if SKIP_COMPRESSION.contains(&content_type) { | |
return false; | |
} | |
} | |
true | |
} | |
fn compressed(&self) -> bool { | |
true | |
} | |
} | |
#[derive(Debug, Clone, Copy, Default)] | |
pub struct NoCompression; | |
impl CacheCompression for NoCompression { | |
fn encode(&self, buf: Bytes) -> Result<Bytes> { | |
Ok(buf) | |
} | |
fn decode(&self, buf: &Bytes) -> Result<Bytes> { | |
Ok(buf.clone()) | |
} | |
fn should_compress(&self, _content_type: Option<&str>) -> bool { | |
false | |
} | |
fn compressed(&self) -> bool { | |
false | |
} | |
} | |
#[derive(Debug, Clone, Copy, Default)] | |
pub struct LZ4Compression; | |
impl CacheCompression for LZ4Compression { | |
fn encode(&self, buf: Bytes) -> Result<Bytes> { | |
let compressed = lz4_flex::compress_prepend_size(&buf); | |
Ok(compressed.into()) | |
} | |
fn decode(&self, buf: &Bytes) -> Result<Bytes> { | |
let decompressed = lz4_flex::decompress_size_prepended(buf).map_err(|e| { | |
Error::create( | |
pingora::ErrorType::InternalError, | |
pingora::ErrorSource::Internal, | |
None, | |
Some(Box::new(e)), | |
) | |
})?; | |
return Ok(decompressed.into()); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use ahash::RandomState; | |
use async_trait::async_trait; | |
use bytes::{Bytes, BytesMut}; | |
use http::header; | |
use pingora::cache::key::{CacheHashKey, CacheKey, CompactCacheKey, HashBinary}; | |
use pingora::cache::max_file_size::ERR_RESPONSE_TOO_LARGE; | |
use pingora::cache::trace::SpanHandle; | |
use pingora::cache::{CacheMeta, PurgeType, Storage}; | |
use pingora::{Error, Result}; | |
use scc::HashMap; | |
use serde::{Deserialize, Serialize}; | |
use std::any::Any; | |
use std::sync::Arc; | |
use pingora::cache::storage::{HandleHit, HandleMiss, HitHandler, MissHandler}; | |
use super::compression::{CacheCompression, NoCompression}; | |
type BinaryMeta = (Bytes, Bytes); | |
pub type SharedHashMap = Arc<HashMap<HashBinary, SccCacheObject, RandomState>>; | |
/// Cache that uses scc::HashMap. | |
/// Does not support streaming partial writes | |
#[derive(Clone, Debug)] | |
pub struct SccMemoryCache<C: CacheCompression = NoCompression> { | |
pub cache: SharedHashMap, | |
/// Maximum allowed body size for caching | |
pub max_file_size_bytes: Option<usize>, | |
/// Will reject cache admissions with empty body responses | |
pub reject_empty_body: bool, | |
/// Compression that will be used | |
pub compression: C, | |
} | |
impl SccMemoryCache { | |
pub fn from_map(cache: SharedHashMap) -> Self { | |
SccMemoryCache { | |
cache, | |
max_file_size_bytes: None, | |
reject_empty_body: false, | |
compression: NoCompression, | |
} | |
} | |
pub fn new() -> Self { | |
SccMemoryCache { | |
cache: Arc::new(HashMap::with_hasher(RandomState::new())), | |
max_file_size_bytes: None, | |
reject_empty_body: false, | |
compression: NoCompression, | |
} | |
} | |
pub fn with_capacity(capacity: usize) -> Self { | |
SccMemoryCache { | |
cache: Arc::new(HashMap::with_capacity_and_hasher( | |
capacity, | |
RandomState::new(), | |
)), | |
max_file_size_bytes: None, | |
reject_empty_body: false, | |
compression: NoCompression, | |
} | |
} | |
} | |
impl<C: CacheCompression> SccMemoryCache<C> { | |
pub fn with_compression<T: CacheCompression>(self, compression: T) -> SccMemoryCache<T> { | |
SccMemoryCache { | |
compression, | |
cache: self.cache, | |
max_file_size_bytes: self.max_file_size_bytes, | |
reject_empty_body: self.reject_empty_body, | |
} | |
} | |
pub fn with_max_file_size(mut self, max_bytes: Option<usize>) -> Self { | |
self.max_file_size_bytes = max_bytes; | |
self | |
} | |
pub fn with_reject_empty_body(mut self, should_error: bool) -> Self { | |
self.reject_empty_body = should_error; | |
self | |
} | |
} | |
#[async_trait] | |
impl<C: CacheCompression> Storage for SccMemoryCache<C> { | |
async fn lookup( | |
&'static self, | |
key: &CacheKey, | |
_trace: &SpanHandle, | |
) -> Result<Option<(CacheMeta, HitHandler)>> { | |
let hash = key.combined_bin(); | |
let cache_object; | |
if let Some(obj) = self.cache.get_async(&hash).await { | |
cache_object = obj.get().clone(); | |
} else { | |
return Ok(None); | |
} | |
let meta = CacheMeta::deserialize(&cache_object.meta.0, &cache_object.meta.1)?; | |
Ok(Some(( | |
meta, | |
Box::new(SccHitHandler::new(cache_object, self.clone())), | |
))) | |
} | |
async fn get_miss_handler( | |
&'static self, | |
key: &CacheKey, | |
meta: &CacheMeta, | |
_trace: &SpanHandle, | |
) -> Result<MissHandler> { | |
let hash = key.combined_bin(); | |
let raw_meta = meta.serialize()?; | |
let already_compressed = meta | |
.headers() | |
.get(header::CONTENT_ENCODING) | |
.map(|v| !v.is_empty()) | |
.unwrap_or(false); | |
let content_type = meta | |
.headers() | |
.get(header::CONTENT_TYPE) | |
.and_then(|v| v.to_str().ok()); | |
let compress_content_type = self.compression.should_compress(content_type); | |
let meta = (Bytes::from(raw_meta.0), Bytes::from(raw_meta.1)); | |
let miss_handler = SccMissHandler { | |
body_buf: BytesMut::new(), | |
meta: meta, | |
key: hash, | |
inner: self.clone(), | |
compress: self.compression.compressed() && !already_compressed && compress_content_type, | |
}; | |
Ok(Box::new(miss_handler)) | |
} | |
async fn purge( | |
&'static self, | |
key: &CompactCacheKey, | |
_purge_type: PurgeType, | |
_trace: &SpanHandle, | |
) -> Result<bool> { | |
let hash = key.combined_bin(); | |
Ok(self.cache.remove(&hash).is_some()) | |
} | |
async fn update_meta( | |
&'static self, | |
key: &CacheKey, | |
meta: &CacheMeta, | |
_trace: &SpanHandle, | |
) -> Result<bool> { | |
let hash = key.combined_bin(); | |
let new_meta = meta.serialize()?; | |
let new_meta = (Bytes::from(new_meta.0), Bytes::from(new_meta.1)); | |
let updated = self | |
.cache | |
.update_async(&hash, move |_, value| { | |
value.meta = new_meta; | |
}) | |
.await; | |
if let Some(()) = updated { | |
Ok(true) | |
} else { | |
Err(Error::create( | |
pingora::ErrorType::Custom("No meta found for update_meta"), | |
pingora::ErrorSource::Internal, | |
Some(format!("key = {:?}", key).into()), | |
None, | |
)) | |
} | |
} | |
fn support_streaming_partial_write(&self) -> bool { | |
false | |
} | |
fn as_any(&self) -> &(dyn Any + Send + Sync) { | |
self | |
} | |
} | |
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)] | |
pub struct SccCacheObject { | |
meta: BinaryMeta, | |
body: Bytes, | |
/// Uncompressed len of body | |
len: usize, | |
/// Is body compressed | |
compressed: bool, | |
} | |
pub struct SccHitHandler<C: CacheCompression> { | |
cache_object: SccCacheObject, | |
inner: SccMemoryCache<C>, | |
done: bool, | |
range_start: usize, | |
range_end: usize, | |
} | |
impl<C: CacheCompression> SccHitHandler<C> { | |
pub(crate) fn new(cache_object: SccCacheObject, inner: SccMemoryCache<C>) -> Self { | |
let len = cache_object.len; | |
SccHitHandler { | |
cache_object, | |
inner, | |
done: false, | |
range_start: 0, | |
range_end: len, | |
} | |
} | |
} | |
#[async_trait] | |
impl<C: CacheCompression> HandleHit for SccHitHandler<C> { | |
async fn read_body(&mut self) -> Result<Option<Bytes>> { | |
if self.done { | |
Ok(None) | |
} else { | |
self.done = true; | |
if self.cache_object.compressed { | |
self.inner | |
.compression | |
.decode_range(&self.cache_object.body, self.range_start, self.range_end) | |
.map(Some) | |
} else { | |
Ok(Some( | |
self.cache_object | |
.body | |
.slice(self.range_start..self.range_end), | |
)) | |
} | |
} | |
} | |
fn seek(&mut self, start: usize, end: Option<usize>) -> Result<()> { | |
if start >= self.cache_object.len { | |
return Error::e_explain( | |
pingora::ErrorType::InternalError, | |
format!( | |
"seek start out of range {start} >= {}", | |
self.cache_object.len | |
), | |
); | |
} | |
self.range_start = start; | |
if let Some(end) = end { | |
self.range_end = std::cmp::min(self.cache_object.len, end); | |
} | |
self.done = false; | |
Ok(()) | |
} | |
async fn finish( | |
mut self: Box<Self>, | |
_storage: &'static (dyn Storage + Sync), | |
_key: &CacheKey, | |
_trace: &SpanHandle, | |
) -> Result<()> { | |
Ok(()) | |
} | |
fn can_seek(&self) -> bool { | |
true | |
} | |
fn as_any(&self) -> &(dyn Any + Send + Sync) { | |
self | |
} | |
} | |
#[derive(Debug)] | |
struct SccMissHandler<C: CacheCompression> { | |
meta: BinaryMeta, | |
key: HashBinary, | |
body_buf: BytesMut, | |
inner: SccMemoryCache<C>, | |
compress: bool, | |
} | |
#[async_trait] | |
impl<C: CacheCompression> HandleMiss for SccMissHandler<C> { | |
async fn write_body(&mut self, data: bytes::Bytes, _eof: bool) -> Result<()> { | |
if let Some(max_file_size_bytes) = self.inner.max_file_size_bytes { | |
if self.body_buf.len() + data.len() > max_file_size_bytes { | |
return Error::e_explain( | |
ERR_RESPONSE_TOO_LARGE, | |
format!( | |
"writing data of size {} bytes would exceed max file size of {} bytes", | |
data.len(), | |
max_file_size_bytes | |
), | |
); | |
} | |
} | |
self.body_buf.extend_from_slice(&data); | |
Ok(()) | |
} | |
async fn finish(self: Box<Self>) -> Result<usize> { | |
let uncompressed_len = self.body_buf.len(); | |
if uncompressed_len == 0 && self.inner.reject_empty_body { | |
let err = Error::create( | |
pingora::ErrorType::Custom("cache write error: empty body"), | |
pingora::ErrorSource::Internal, | |
None, | |
None, | |
); | |
return Err(err); | |
} | |
let body = if self.compress { | |
self.inner.compression.encode(self.body_buf.freeze())? | |
} else { | |
self.body_buf.freeze() | |
}; | |
let size = body.len() + self.meta.0.len() + self.meta.1.len(); | |
let cache_object = SccCacheObject { | |
body, | |
meta: self.meta, | |
len: uncompressed_len, | |
compressed: self.compress, | |
}; | |
self.inner | |
.cache | |
.insert_async(self.key, cache_object) | |
.await | |
.ok(); | |
Ok(size) | |
} | |
} | |
#[cfg(test)] | |
pub(crate) mod test { | |
use std::time::SystemTime; | |
use super::*; | |
use once_cell::sync::Lazy; | |
use pingora::{cache::CacheMeta, http::ResponseHeader}; | |
use rustracing::span::Span; | |
pub fn gen_meta() -> CacheMeta { | |
let mut header = ResponseHeader::build(200, None).unwrap(); | |
header.append_header("foo1", "bar1").unwrap(); | |
header.append_header("foo2", "bar2").unwrap(); | |
header.append_header("foo3", "bar3").unwrap(); | |
header.append_header("Server", "Pingora").unwrap(); | |
CacheMeta::new(SystemTime::UNIX_EPOCH, SystemTime::UNIX_EPOCH, 0, 0, header) | |
} | |
#[tokio::test] | |
async fn test_write_then_read() { | |
static MEM_CACHE: Lazy<SccMemoryCache> = Lazy::new(SccMemoryCache::new); | |
let span = &Span::inactive().handle(); | |
let key1 = CacheKey::new("", "a", "1"); | |
let res = MEM_CACHE.lookup(&key1, span).await.unwrap(); | |
assert!(res.is_none()); | |
let cache_meta = gen_meta(); | |
let mut miss_handler = MEM_CACHE | |
.get_miss_handler(&key1, &cache_meta, span) | |
.await | |
.unwrap(); | |
miss_handler | |
.write_body(b"test1"[..].into(), false) | |
.await | |
.unwrap(); | |
miss_handler | |
.write_body(b"test2"[..].into(), false) | |
.await | |
.unwrap(); | |
miss_handler.finish().await.unwrap(); | |
let (_cache_meta2, mut hit_handler) = MEM_CACHE.lookup(&key1, span).await.unwrap().unwrap(); | |
let data = hit_handler.read_body().await.unwrap().unwrap(); | |
assert_eq!("test1test2", data); | |
let data = hit_handler.read_body().await.unwrap(); | |
assert!(data.is_none()); | |
} | |
#[tokio::test] | |
async fn test_purge_partial() { | |
static MEM_CACHE: Lazy<SccMemoryCache> = Lazy::new(SccMemoryCache::new); | |
let cache = &MEM_CACHE; | |
let key = CacheKey::new("", "a", "1").to_compact(); | |
let hash = key.combined_bin(); | |
let meta = ( | |
"meta_key".as_bytes().to_vec(), | |
"meta_value".as_bytes().to_vec(), | |
); | |
let cache_object = SccCacheObject { | |
body: Bytes::new(), | |
meta: (Bytes::from(meta.0), Bytes::from(meta.1)), | |
len: 0, | |
compressed: false, | |
}; | |
cache | |
.cache | |
.insert(hash, cache_object) | |
.unwrap_or_else(|_| panic!()); | |
assert!(cache.cache.contains(&hash)); | |
let result = cache | |
.purge(&key, PurgeType::Invalidation, &Span::inactive().handle()) | |
.await; | |
assert!(result.is_ok()); | |
assert!(!cache.cache.contains(&hash)); | |
} | |
#[tokio::test] | |
async fn test_purge_complete() { | |
static MEM_CACHE: Lazy<SccMemoryCache> = Lazy::new(SccMemoryCache::new); | |
let cache = &MEM_CACHE; | |
let key = CacheKey::new("", "a", "1").to_compact(); | |
let hash = key.combined_bin(); | |
let meta = ( | |
"meta_key".as_bytes().to_vec(), | |
"meta_value".as_bytes().to_vec(), | |
); | |
let body = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]; | |
let body_len = body.len(); | |
let cache_object = SccCacheObject { | |
body: Bytes::from(body), | |
len: body_len, | |
meta: (Bytes::from(meta.0), Bytes::from(meta.1)), | |
compressed: false, | |
}; | |
cache | |
.cache | |
.insert(hash.clone(), cache_object) | |
.unwrap_or_else(|_| panic!()); | |
assert!(cache.cache.contains(&hash)); | |
let result = cache | |
.purge(&key, PurgeType::Invalidation, &Span::inactive().handle()) | |
.await; | |
assert!(result.is_ok()); | |
assert!(!cache.cache.contains(&hash)); | |
} | |
#[tokio::test] | |
async fn test_update_meta() { | |
static MEM_CACHE: Lazy<SccMemoryCache> = Lazy::new(SccMemoryCache::new); | |
let cache = &MEM_CACHE; | |
let full_key = CacheKey::new("", "a", "1"); | |
let key = full_key.clone().to_compact(); | |
let hash = key.combined_bin(); | |
let meta = ( | |
"meta_key".as_bytes().to_vec(), | |
"meta_value".as_bytes().to_vec(), | |
); | |
let body = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]; | |
let body_len = body.len(); | |
let cache_object = SccCacheObject { | |
body: Bytes::from(body.clone()), | |
len: body_len, | |
meta: (Bytes::from(meta.0), Bytes::from(meta.1)), | |
compressed: false, | |
}; | |
cache | |
.cache | |
.insert(hash.clone(), cache_object) | |
.unwrap_or_else(|_| panic!()); | |
assert!(cache.cache.contains(&hash)); | |
let new_meta = gen_meta(); | |
assert!(cache | |
.update_meta(&full_key, &new_meta, &Span::inactive().handle()) | |
.await | |
.unwrap()); | |
let (res_meta, mut res_handler) = MEM_CACHE | |
.lookup(&full_key, &Span::inactive().handle()) | |
.await | |
.unwrap() | |
.unwrap(); | |
assert_eq!(res_meta.serialize().unwrap(), new_meta.serialize().unwrap()); | |
assert_eq!(res_handler.read_body().await.unwrap().unwrap(), &body); | |
} | |
#[tokio::test] | |
async fn test_read_range() { | |
static MEM_CACHE: Lazy<SccMemoryCache> = Lazy::new(SccMemoryCache::new); | |
let span = &Span::inactive().handle(); | |
let key1 = CacheKey::new("", "a", "1"); | |
let res = MEM_CACHE.lookup(&key1, span).await.unwrap(); | |
assert!(res.is_none()); | |
let cache_meta = gen_meta(); | |
let mut miss_handler = MEM_CACHE | |
.get_miss_handler(&key1, &cache_meta, span) | |
.await | |
.unwrap(); | |
miss_handler | |
.write_body(b"test1test2"[..].into(), false) | |
.await | |
.unwrap(); | |
miss_handler.finish().await.unwrap(); | |
let (_cache_meta2, mut hit_handler) = MEM_CACHE.lookup(&key1, span).await.unwrap().unwrap(); | |
// out of range | |
assert!(hit_handler.seek(10000, None).is_err()); | |
assert!(hit_handler.seek(5, None).is_ok()); | |
let data = hit_handler.read_body().await.unwrap().unwrap(); | |
assert_eq!("test2", data); | |
let data = hit_handler.read_body().await.unwrap(); | |
assert!(data.is_none()); | |
assert!(hit_handler.seek(4, Some(5)).is_ok()); | |
let data = hit_handler.read_body().await.unwrap().unwrap(); | |
assert_eq!("1", data); | |
let data = hit_handler.read_body().await.unwrap(); | |
assert!(data.is_none()); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use crate::cache::scc_cache::SharedHashMap; | |
use crate::cache::SccMemoryCache; | |
use crate::utils::{FutureBackgroundService, ServiceRegistry}; | |
use futures::{future, Future, FutureExt}; | |
use pingora::cache::eviction::lru::Manager as LRUEvictionManager; | |
use pingora::cache::eviction::EvictionManager; | |
use pingora::services::background::{background_service, BackgroundService}; | |
use pingora::ErrorType::InternalError; | |
use pingora::{Error, OrErr, Result}; | |
use rand::{self, Rng}; | |
use serde::ser::SerializeMap; | |
use serde::Serialize; | |
use std::fs; | |
use std::io::{BufReader, BufWriter, Error as StdIoError, Write}; | |
use std::path::{Path, PathBuf}; | |
use std::pin::Pin; | |
use std::time::Duration; | |
use tokio::runtime::Runtime; | |
use super::compression::CacheCompression; | |
use super::CacheBucket; | |
pub fn load_or_create_cache<C: CacheCompression, const SHARDS: usize>( | |
dir: PathBuf, | |
limit: usize, | |
capacity: usize, | |
compression: C, | |
max_file_size: Option<usize>, | |
save_interval: Option<Duration>, | |
runtime: &Runtime, | |
) -> (CacheSerde<SHARDS>, CacheBucket) { | |
let loaded_cache_serde = runtime | |
.block_on(CacheSerde::<SHARDS>::load(&dir, limit)) | |
.ok(); | |
let mut cache_serde; | |
let mut bucket; | |
if let Some(loaded) = loaded_cache_serde { | |
log::info!( | |
"loaded {:?} cache entries from {:?}", | |
loaded.storage.len(), | |
&dir | |
); | |
let storage = loaded.storage.clone(); | |
bucket = CacheBucket::new( | |
SccMemoryCache::from_map(loaded.storage) | |
.with_max_file_size(max_file_size) | |
.with_compression(compression), | |
); | |
if let Some(eviction) = loaded.eviction { | |
bucket.eviction = Some(eviction); | |
} | |
cache_serde = CacheSerde { | |
eviction: loaded.eviction, | |
storage, | |
}; | |
} else { | |
let new_cache = SccMemoryCache::with_capacity(capacity) | |
.with_max_file_size(max_file_size) | |
.with_compression(compression); | |
let storage = new_cache.cache.clone(); | |
bucket = CacheBucket::new(new_cache); | |
cache_serde = CacheSerde::<SHARDS> { | |
storage, | |
eviction: None, | |
}; | |
} | |
if bucket.eviction.is_none() { | |
let new_eviction = Box::leak(Box::new(LRUEvictionManager::<SHARDS>::with_capacity( | |
limit, capacity, | |
))); | |
cache_serde.eviction = Some(new_eviction); | |
bucket.eviction = Some(new_eviction); | |
} | |
if let Some(interval) = save_interval { | |
ServiceRegistry::global().add_service(background_service( | |
&format!("cache saver {:?}", &dir), | |
cache_serde.background_store_service(dir, interval), | |
)); | |
} | |
(cache_serde, bucket) | |
} | |
#[derive(Clone)] | |
pub struct CacheSerde<const N: usize> { | |
pub storage: SharedHashMap, | |
pub eviction: Option<&'static LRUEvictionManager<N>>, | |
} | |
impl<const N: usize> CacheSerde<N> { | |
pub fn background_store_service( | |
&self, | |
dir: PathBuf, | |
period: Duration, | |
) -> impl BackgroundService { | |
FutureBackgroundService::new(self.periodic_store_future(dir, period)) | |
} | |
pub fn periodic_store_future( | |
&self, | |
dir: PathBuf, | |
period: Duration, | |
) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> { | |
periodic_store_impl(self.clone(), dir, period) | |
.then(|_| future::ready(())) | |
.boxed() | |
} | |
pub async fn load(dir: impl AsRef<Path>, eviction_limit: usize) -> Result<Self> { | |
let dir_owned = dir.as_ref().to_path_buf(); | |
let storage = tokio::task::spawn_blocking(move || Self::load_cache_map(&dir_owned)) | |
.await | |
.unwrap() | |
.map_err(|e| { | |
Error::create( | |
pingora::ErrorType::Custom("failed to load stored cache"), | |
pingora::ErrorSource::Unset, | |
None, | |
Some(Box::new(e)), | |
) | |
})?; | |
let mut loaded_eviction = None; | |
let lru_dir = dir.as_ref().join("lru"); | |
let lru_dir_clone = lru_dir.clone(); | |
if N != 0 | |
&& tokio::task::spawn_blocking(move || lru_dir_clone.exists()) | |
.await | |
.unwrap() | |
{ | |
let to_load_eviction = LRUEvictionManager::<N>::with_capacity(eviction_limit, 512); | |
to_load_eviction | |
.load( | |
lru_dir | |
.as_os_str() | |
.to_str() | |
.expect("invalid utf-8 in lru_dir"), | |
) | |
.await?; | |
loaded_eviction = Some(to_load_eviction); | |
} | |
let mut eviction = None; | |
if let Some(loaded_eviction) = loaded_eviction { | |
eviction = Some(&*Box::leak(Box::new(loaded_eviction))); | |
} | |
Ok(CacheSerde { storage, eviction }) | |
} | |
pub async fn store(&self, dir: impl AsRef<Path>) -> Result<()> { | |
let owned_dir = dir.as_ref().to_path_buf(); | |
let cache_map = self.storage.clone(); | |
tokio::task::spawn_blocking(move || { | |
Self::store_cache_map(cache_map, &owned_dir).map_err(|e| { | |
Error::create( | |
InternalError, | |
pingora::ErrorSource::Internal, | |
None, | |
Some(Box::new(e)), | |
) | |
}) | |
}) | |
.await | |
.unwrap()?; | |
if let Some(eviction) = self.eviction { | |
let lru_dir = dir.as_ref().join("lru"); | |
if eviction.total_items() != 0 { | |
let lru_dir_owned = lru_dir.clone(); | |
tokio::task::spawn_blocking(move || { | |
fs::create_dir_all(&lru_dir_owned).or_err_with(InternalError, || { | |
format!("failed to create dir {:?}", &lru_dir_owned) | |
}) | |
}) | |
.await | |
.unwrap()?; | |
eviction | |
.save(lru_dir.as_os_str().to_str().expect("non-utf-8 dir")) | |
.await?; | |
} | |
} | |
Ok(()) | |
} | |
fn store_cache_map(map: SharedHashMap, dir: &Path) -> Result<(), StdIoError> { | |
if map.is_empty() { | |
return Ok(()); | |
} | |
fs::create_dir_all(&dir)?; | |
let random_filename: String = rand::thread_rng() | |
.sample_iter(&rand::distributions::Alphanumeric) | |
.take(16) | |
.map(char::from) | |
.collect(); | |
let tmp_cache_dest = dir.join(random_filename).with_extension("tmp"); | |
let mut tmp_file = BufWriter::new(fs::File::create_new(&tmp_cache_dest)?); | |
let written = bincode::serde::encode_into_std_write( | |
SharedHashMapSerde(map), | |
&mut tmp_file, | |
bincode::config::standard(), | |
) | |
.map_err(|e| StdIoError::new(std::io::ErrorKind::Other, e))?; | |
log::trace!("Written {:?} bytes while saving cache_map", written); | |
tmp_file.flush()?; | |
drop(tmp_file); | |
fs::rename(tmp_cache_dest, dir.join("storage.bin"))?; | |
Ok(()) | |
} | |
fn load_cache_map(path: &Path) -> Result<SharedHashMap, StdIoError> { | |
let mut reader = BufReader::new(fs::File::open(path.join("storage.bin"))?); | |
let storage: SharedHashMap = | |
bincode::serde::decode_from_std_read(&mut reader, bincode::config::standard()) | |
.map_err(|e| StdIoError::new(std::io::ErrorKind::Other, e))?; | |
Ok(storage) | |
} | |
} | |
async fn periodic_store_impl<const N: usize>( | |
cache_serde: CacheSerde<N>, | |
dir: PathBuf, | |
period: Duration, | |
) -> Result<()> { | |
let mut interval = tokio::time::interval(period); | |
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); | |
interval.tick().await; | |
loop { | |
interval.tick().await; | |
let dest = dir.clone(); | |
cache_serde.store(&dest).await?; | |
} | |
} | |
struct SharedHashMapSerde(pub SharedHashMap); | |
/// Serializes only `cache` in streaming manner, without too much locking | |
impl Serialize for SharedHashMapSerde { | |
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> | |
where | |
S: serde::Serializer, | |
{ | |
let mut serialize_len = self.0.len(); | |
let mut map = serializer.serialize_map(Some(serialize_len))?; | |
if serialize_len > 0 { | |
if let Some(mut head) = self.0.first_entry() { | |
map.serialize_entry(head.key(), head.get())?; | |
serialize_len -= 1; | |
if serialize_len > 0 { | |
while let Some(entry) = head.next() { | |
map.serialize_entry(entry.key(), entry.get())?; | |
head = entry; | |
serialize_len -= 1; | |
if serialize_len == 0 { | |
break; | |
} | |
} | |
} | |
} | |
} | |
map.end() | |
} | |
} | |
#[cfg(test)] | |
mod test { | |
use super::{SharedHashMap, SharedHashMapSerde}; | |
use ahash::HashSet; | |
use once_cell::sync::Lazy; | |
use pingora::cache::CacheKey; | |
use pingora::cache::Storage; | |
use rustracing::span::Span; | |
use crate::cache::{scc_cache::test::gen_meta, SccMemoryCache}; | |
#[tokio::test] | |
async fn test_cache_serde() { | |
static MEM_CACHE: Lazy<SccMemoryCache> = Lazy::new(SccMemoryCache::new); | |
let span = &Span::inactive().handle(); | |
let key1 = CacheKey::new("", "a", "1"); | |
let res = MEM_CACHE.lookup(&key1, span).await.unwrap(); | |
assert!(res.is_none()); | |
let cache_meta = gen_meta(); | |
let mut miss_handler = MEM_CACHE | |
.get_miss_handler(&key1, &cache_meta, span) | |
.await | |
.unwrap(); | |
miss_handler | |
.write_body(b"test1"[..].into(), false) | |
.await | |
.unwrap(); | |
miss_handler | |
.write_body(b"test2"[..].into(), false) | |
.await | |
.unwrap(); | |
miss_handler.finish().await.unwrap(); | |
let ser = SharedHashMapSerde(MEM_CACHE.cache.clone()); | |
let serialized = bincode::serde::encode_to_vec(ser, bincode::config::standard()).unwrap(); | |
let (deserialized, _): (SharedHashMap, usize) = | |
bincode::serde::decode_from_slice(&serialized, bincode::config::standard()).unwrap(); | |
let mut original_keys = HashSet::default(); | |
let mut original_values = HashSet::default(); | |
MEM_CACHE.cache.scan(|key, val| { | |
original_keys.insert(key.clone()); | |
original_values.insert(val.clone()); | |
}); | |
let mut deserialized_keys = HashSet::default(); | |
let mut deserialized_values = HashSet::default(); | |
deserialized.scan(|key, val| { | |
deserialized_keys.insert(key.clone()); | |
let val = val.clone(); | |
deserialized_values.insert(val); | |
}); | |
assert_eq!(original_keys, deserialized_keys); | |
assert_eq!(original_values, deserialized_values); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pub static SHARED_RUNTIME: Lazy<Runtime> = Lazy::new(|| { | |
runtime::Builder::new_multi_thread() | |
.enable_all() | |
.build() | |
.expect("Failed to create tokio shared runtime") | |
}); | |
const CACHE_SAVE_PERIOD: Duration = Duration::from_secs(60 * 15); | |
pub static BACKEND_CACHE: Lazy<CacheBucket> = Lazy::new(|| { | |
const SHARDS: usize = 16; | |
let dir = SHARED_DIR.join("backend"); | |
let (_cache_serde, bucket) = load_or_create_cache::<_, SHARDS>( | |
dir.clone(), | |
MB * 128, | |
8192, | |
NoCompression, | |
Some(MB * 8), | |
Some(CACHE_SAVE_PERIOD), | |
&*SHARED_RUNTIME, | |
); | |
bucket | |
}); | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use once_cell::sync::Lazy; | |
use parking_lot::Mutex; | |
use pingora::{server::Server, services::Service}; | |
pub static GLOBAL_REGISTRY: Lazy<ServiceRegistry> = Lazy::new(|| ServiceRegistry::default()); | |
/// Helper struct that holds [Service]s to be registered to server, so you don't need to carry &mut Server around | |
/// Similar to prometheus::default_registry() approach. | |
#[derive(Default)] | |
pub struct ServiceRegistry { | |
pub services: Mutex<Vec<Box<dyn Service + 'static>>>, | |
} | |
impl ServiceRegistry { | |
/// Get global registry | |
pub fn global() -> &'static ServiceRegistry { | |
&*GLOBAL_REGISTRY | |
} | |
/// Add service to be registered later | |
pub fn add_service<S: Service + 'static>(&self, service: S) { | |
self.services.lock().push(Box::new(service)) | |
} | |
/// Add service to be registered later, returning self for chaining | |
pub fn with_service<S: Service + 'static>(&self, service: S) -> &Self { | |
self.add_service(service); | |
self | |
} | |
/// Drain services from registry and register them to server | |
pub fn register(&self, server: &mut Server) { | |
let services: Vec<_> = self.services.lock().drain(..).collect(); | |
server.add_services(services); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use pingora::cache::eviction::EvictionManager; | |
use pingora::cache::lock::CacheLock; | |
use pingora::cache::predictor::CacheablePredictor; | |
use pingora::cache::HttpCache; | |
use pingora::cache::Storage; | |
use pingora::proxy::Session; | |
/// Caching arguments for `pingora::cache::HttpCache::enable` as one bucket | |
#[derive(Clone, Copy)] | |
pub struct CacheBucket { | |
pub storage: &'static (dyn Storage + Sync), | |
pub eviction: Option<&'static (dyn EvictionManager + Sync)>, | |
pub predictor: Option<&'static (dyn CacheablePredictor + Sync)>, | |
pub cache_lock: Option<&'static CacheLock>, | |
} | |
impl CacheBucket { | |
pub fn new<T>(storage: T) -> Self | |
where | |
T: Storage + Sync + 'static, | |
{ | |
CacheBucket { | |
storage: Box::leak(Box::new(storage)), | |
eviction: None, | |
predictor: None, | |
cache_lock: None, | |
} | |
} | |
pub fn with_eviction<T: EvictionManager + Sync + 'static>(mut self, eviction: T) -> Self { | |
let b = Box::new(eviction); | |
self.eviction = Some(Box::leak(b)); | |
self | |
} | |
pub fn without_eviction(&self) -> Self { | |
let mut this = self.clone(); | |
this.eviction = None; | |
this | |
} | |
pub fn with_predictor<T: CacheablePredictor + Sync + 'static>(mut self, predictor: T) -> Self { | |
let b = Box::new(predictor); | |
self.predictor = Some(Box::leak(b)); | |
self | |
} | |
pub fn without_predictor(&self) -> Self { | |
let mut this = self.clone(); | |
this.predictor = None; | |
this | |
} | |
pub fn with_cache_lock(mut self, cache_lock: CacheLock) -> Self { | |
let b = Box::new(cache_lock); | |
self.cache_lock = Some(Box::leak(b)); | |
self | |
} | |
pub fn without_cache_lock(&self) -> Self { | |
let mut this = self.clone(); | |
this.cache_lock = None; | |
this | |
} | |
pub fn enable_cache(&self, cache: &mut HttpCache) { | |
cache.enable(self.storage, self.eviction, self.predictor, self.cache_lock) | |
} | |
pub fn enable(&self, session: &mut Session) { | |
self.enable_cache(&mut session.cache) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment