Skip to content

Instantly share code, notes, and snippets.

@Object905
Last active September 22, 2024 15:21
Show Gist options
  • Save Object905/cf10ffd97595887bb7b3868c89a793d7 to your computer and use it in GitHub Desktop.
Save Object905/cf10ffd97595887bb7b3868c89a793d7 to your computer and use it in GitHub Desktop.
Pingora cache-example
use std::fmt::Debug;
use bytes::Bytes;
use lz4_flex;
use pingora::{Error, Result};
/// Content types that skip cache compression by default
pub const SKIP_COMPRESSION: &'static [&'static str] = &[
"image/avif",
"image/webp",
"image/png",
"image/jpeg",
"font/woff2",
"font/woff",
"video/webm",
"video/ogg",
"video/mpeg",
"video/mp4",
"application/zip",
"application/gzip",
];
pub trait CacheCompression: Default + Debug + Clone + Send + Sync + 'static {
fn encode(&self, buf: Bytes) -> Result<Bytes>;
fn decode(&self, buf: &Bytes) -> Result<Bytes>;
fn decode_range(&mut self, buf: &Bytes, range_start: usize, range_end: usize) -> Result<Bytes> {
self.decode(buf)
.map(|buf| buf.slice(range_start..range_end))
}
fn should_compress(&self, content_type: Option<&str>) -> bool {
if let Some(content_type) = content_type {
if SKIP_COMPRESSION.contains(&content_type) {
return false;
}
}
true
}
fn compressed(&self) -> bool {
true
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct NoCompression;
impl CacheCompression for NoCompression {
fn encode(&self, buf: Bytes) -> Result<Bytes> {
Ok(buf)
}
fn decode(&self, buf: &Bytes) -> Result<Bytes> {
Ok(buf.clone())
}
fn should_compress(&self, _content_type: Option<&str>) -> bool {
false
}
fn compressed(&self) -> bool {
false
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct LZ4Compression;
impl CacheCompression for LZ4Compression {
fn encode(&self, buf: Bytes) -> Result<Bytes> {
let compressed = lz4_flex::compress_prepend_size(&buf);
Ok(compressed.into())
}
fn decode(&self, buf: &Bytes) -> Result<Bytes> {
let decompressed = lz4_flex::decompress_size_prepended(buf).map_err(|e| {
Error::create(
pingora::ErrorType::InternalError,
pingora::ErrorSource::Internal,
None,
Some(Box::new(e)),
)
})?;
return Ok(decompressed.into());
}
}
use ahash::RandomState;
use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use http::header;
use pingora::cache::key::{CacheHashKey, CacheKey, CompactCacheKey, HashBinary};
use pingora::cache::max_file_size::ERR_RESPONSE_TOO_LARGE;
use pingora::cache::trace::SpanHandle;
use pingora::cache::{CacheMeta, PurgeType, Storage};
use pingora::{Error, Result};
use scc::HashMap;
use serde::{Deserialize, Serialize};
use std::any::Any;
use std::sync::Arc;
use pingora::cache::storage::{HandleHit, HandleMiss, HitHandler, MissHandler};
use super::compression::{CacheCompression, NoCompression};
type BinaryMeta = (Bytes, Bytes);
pub type SharedHashMap = Arc<HashMap<HashBinary, SccCacheObject, RandomState>>;
/// Cache that uses scc::HashMap.
/// Does not support streaming partial writes
#[derive(Clone, Debug)]
pub struct SccMemoryCache<C: CacheCompression = NoCompression> {
pub cache: SharedHashMap,
/// Maximum allowed body size for caching
pub max_file_size_bytes: Option<usize>,
/// Will reject cache admissions with empty body responses
pub reject_empty_body: bool,
/// Compression that will be used
pub compression: C,
}
impl SccMemoryCache {
pub fn from_map(cache: SharedHashMap) -> Self {
SccMemoryCache {
cache,
max_file_size_bytes: None,
reject_empty_body: false,
compression: NoCompression,
}
}
pub fn new() -> Self {
SccMemoryCache {
cache: Arc::new(HashMap::with_hasher(RandomState::new())),
max_file_size_bytes: None,
reject_empty_body: false,
compression: NoCompression,
}
}
pub fn with_capacity(capacity: usize) -> Self {
SccMemoryCache {
cache: Arc::new(HashMap::with_capacity_and_hasher(
capacity,
RandomState::new(),
)),
max_file_size_bytes: None,
reject_empty_body: false,
compression: NoCompression,
}
}
}
impl<C: CacheCompression> SccMemoryCache<C> {
pub fn with_compression<T: CacheCompression>(self, compression: T) -> SccMemoryCache<T> {
SccMemoryCache {
compression,
cache: self.cache,
max_file_size_bytes: self.max_file_size_bytes,
reject_empty_body: self.reject_empty_body,
}
}
pub fn with_max_file_size(mut self, max_bytes: Option<usize>) -> Self {
self.max_file_size_bytes = max_bytes;
self
}
pub fn with_reject_empty_body(mut self, should_error: bool) -> Self {
self.reject_empty_body = should_error;
self
}
}
#[async_trait]
impl<C: CacheCompression> Storage for SccMemoryCache<C> {
async fn lookup(
&'static self,
key: &CacheKey,
_trace: &SpanHandle,
) -> Result<Option<(CacheMeta, HitHandler)>> {
let hash = key.combined_bin();
let cache_object;
if let Some(obj) = self.cache.get_async(&hash).await {
cache_object = obj.get().clone();
} else {
return Ok(None);
}
let meta = CacheMeta::deserialize(&cache_object.meta.0, &cache_object.meta.1)?;
Ok(Some((
meta,
Box::new(SccHitHandler::new(cache_object, self.clone())),
)))
}
async fn get_miss_handler(
&'static self,
key: &CacheKey,
meta: &CacheMeta,
_trace: &SpanHandle,
) -> Result<MissHandler> {
let hash = key.combined_bin();
let raw_meta = meta.serialize()?;
let already_compressed = meta
.headers()
.get(header::CONTENT_ENCODING)
.map(|v| !v.is_empty())
.unwrap_or(false);
let content_type = meta
.headers()
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok());
let compress_content_type = self.compression.should_compress(content_type);
let meta = (Bytes::from(raw_meta.0), Bytes::from(raw_meta.1));
let miss_handler = SccMissHandler {
body_buf: BytesMut::new(),
meta: meta,
key: hash,
inner: self.clone(),
compress: self.compression.compressed() && !already_compressed && compress_content_type,
};
Ok(Box::new(miss_handler))
}
async fn purge(
&'static self,
key: &CompactCacheKey,
_purge_type: PurgeType,
_trace: &SpanHandle,
) -> Result<bool> {
let hash = key.combined_bin();
Ok(self.cache.remove(&hash).is_some())
}
async fn update_meta(
&'static self,
key: &CacheKey,
meta: &CacheMeta,
_trace: &SpanHandle,
) -> Result<bool> {
let hash = key.combined_bin();
let new_meta = meta.serialize()?;
let new_meta = (Bytes::from(new_meta.0), Bytes::from(new_meta.1));
let updated = self
.cache
.update_async(&hash, move |_, value| {
value.meta = new_meta;
})
.await;
if let Some(()) = updated {
Ok(true)
} else {
Err(Error::create(
pingora::ErrorType::Custom("No meta found for update_meta"),
pingora::ErrorSource::Internal,
Some(format!("key = {:?}", key).into()),
None,
))
}
}
fn support_streaming_partial_write(&self) -> bool {
false
}
fn as_any(&self) -> &(dyn Any + Send + Sync) {
self
}
}
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
pub struct SccCacheObject {
meta: BinaryMeta,
body: Bytes,
/// Uncompressed len of body
len: usize,
/// Is body compressed
compressed: bool,
}
pub struct SccHitHandler<C: CacheCompression> {
cache_object: SccCacheObject,
inner: SccMemoryCache<C>,
done: bool,
range_start: usize,
range_end: usize,
}
impl<C: CacheCompression> SccHitHandler<C> {
pub(crate) fn new(cache_object: SccCacheObject, inner: SccMemoryCache<C>) -> Self {
let len = cache_object.len;
SccHitHandler {
cache_object,
inner,
done: false,
range_start: 0,
range_end: len,
}
}
}
#[async_trait]
impl<C: CacheCompression> HandleHit for SccHitHandler<C> {
async fn read_body(&mut self) -> Result<Option<Bytes>> {
if self.done {
Ok(None)
} else {
self.done = true;
if self.cache_object.compressed {
self.inner
.compression
.decode_range(&self.cache_object.body, self.range_start, self.range_end)
.map(Some)
} else {
Ok(Some(
self.cache_object
.body
.slice(self.range_start..self.range_end),
))
}
}
}
fn seek(&mut self, start: usize, end: Option<usize>) -> Result<()> {
if start >= self.cache_object.len {
return Error::e_explain(
pingora::ErrorType::InternalError,
format!(
"seek start out of range {start} >= {}",
self.cache_object.len
),
);
}
self.range_start = start;
if let Some(end) = end {
self.range_end = std::cmp::min(self.cache_object.len, end);
}
self.done = false;
Ok(())
}
async fn finish(
mut self: Box<Self>,
_storage: &'static (dyn Storage + Sync),
_key: &CacheKey,
_trace: &SpanHandle,
) -> Result<()> {
Ok(())
}
fn can_seek(&self) -> bool {
true
}
fn as_any(&self) -> &(dyn Any + Send + Sync) {
self
}
}
#[derive(Debug)]
struct SccMissHandler<C: CacheCompression> {
meta: BinaryMeta,
key: HashBinary,
body_buf: BytesMut,
inner: SccMemoryCache<C>,
compress: bool,
}
#[async_trait]
impl<C: CacheCompression> HandleMiss for SccMissHandler<C> {
async fn write_body(&mut self, data: bytes::Bytes, _eof: bool) -> Result<()> {
if let Some(max_file_size_bytes) = self.inner.max_file_size_bytes {
if self.body_buf.len() + data.len() > max_file_size_bytes {
return Error::e_explain(
ERR_RESPONSE_TOO_LARGE,
format!(
"writing data of size {} bytes would exceed max file size of {} bytes",
data.len(),
max_file_size_bytes
),
);
}
}
self.body_buf.extend_from_slice(&data);
Ok(())
}
async fn finish(self: Box<Self>) -> Result<usize> {
let uncompressed_len = self.body_buf.len();
if uncompressed_len == 0 && self.inner.reject_empty_body {
let err = Error::create(
pingora::ErrorType::Custom("cache write error: empty body"),
pingora::ErrorSource::Internal,
None,
None,
);
return Err(err);
}
let body = if self.compress {
self.inner.compression.encode(self.body_buf.freeze())?
} else {
self.body_buf.freeze()
};
let size = body.len() + self.meta.0.len() + self.meta.1.len();
let cache_object = SccCacheObject {
body,
meta: self.meta,
len: uncompressed_len,
compressed: self.compress,
};
self.inner
.cache
.insert_async(self.key, cache_object)
.await
.ok();
Ok(size)
}
}
#[cfg(test)]
pub(crate) mod test {
use std::time::SystemTime;
use super::*;
use once_cell::sync::Lazy;
use pingora::{cache::CacheMeta, http::ResponseHeader};
use rustracing::span::Span;
pub fn gen_meta() -> CacheMeta {
let mut header = ResponseHeader::build(200, None).unwrap();
header.append_header("foo1", "bar1").unwrap();
header.append_header("foo2", "bar2").unwrap();
header.append_header("foo3", "bar3").unwrap();
header.append_header("Server", "Pingora").unwrap();
CacheMeta::new(SystemTime::UNIX_EPOCH, SystemTime::UNIX_EPOCH, 0, 0, header)
}
#[tokio::test]
async fn test_write_then_read() {
static MEM_CACHE: Lazy<SccMemoryCache> = Lazy::new(SccMemoryCache::new);
let span = &Span::inactive().handle();
let key1 = CacheKey::new("", "a", "1");
let res = MEM_CACHE.lookup(&key1, span).await.unwrap();
assert!(res.is_none());
let cache_meta = gen_meta();
let mut miss_handler = MEM_CACHE
.get_miss_handler(&key1, &cache_meta, span)
.await
.unwrap();
miss_handler
.write_body(b"test1"[..].into(), false)
.await
.unwrap();
miss_handler
.write_body(b"test2"[..].into(), false)
.await
.unwrap();
miss_handler.finish().await.unwrap();
let (_cache_meta2, mut hit_handler) = MEM_CACHE.lookup(&key1, span).await.unwrap().unwrap();
let data = hit_handler.read_body().await.unwrap().unwrap();
assert_eq!("test1test2", data);
let data = hit_handler.read_body().await.unwrap();
assert!(data.is_none());
}
#[tokio::test]
async fn test_purge_partial() {
static MEM_CACHE: Lazy<SccMemoryCache> = Lazy::new(SccMemoryCache::new);
let cache = &MEM_CACHE;
let key = CacheKey::new("", "a", "1").to_compact();
let hash = key.combined_bin();
let meta = (
"meta_key".as_bytes().to_vec(),
"meta_value".as_bytes().to_vec(),
);
let cache_object = SccCacheObject {
body: Bytes::new(),
meta: (Bytes::from(meta.0), Bytes::from(meta.1)),
len: 0,
compressed: false,
};
cache
.cache
.insert(hash, cache_object)
.unwrap_or_else(|_| panic!());
assert!(cache.cache.contains(&hash));
let result = cache
.purge(&key, PurgeType::Invalidation, &Span::inactive().handle())
.await;
assert!(result.is_ok());
assert!(!cache.cache.contains(&hash));
}
#[tokio::test]
async fn test_purge_complete() {
static MEM_CACHE: Lazy<SccMemoryCache> = Lazy::new(SccMemoryCache::new);
let cache = &MEM_CACHE;
let key = CacheKey::new("", "a", "1").to_compact();
let hash = key.combined_bin();
let meta = (
"meta_key".as_bytes().to_vec(),
"meta_value".as_bytes().to_vec(),
);
let body = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0];
let body_len = body.len();
let cache_object = SccCacheObject {
body: Bytes::from(body),
len: body_len,
meta: (Bytes::from(meta.0), Bytes::from(meta.1)),
compressed: false,
};
cache
.cache
.insert(hash.clone(), cache_object)
.unwrap_or_else(|_| panic!());
assert!(cache.cache.contains(&hash));
let result = cache
.purge(&key, PurgeType::Invalidation, &Span::inactive().handle())
.await;
assert!(result.is_ok());
assert!(!cache.cache.contains(&hash));
}
#[tokio::test]
async fn test_update_meta() {
static MEM_CACHE: Lazy<SccMemoryCache> = Lazy::new(SccMemoryCache::new);
let cache = &MEM_CACHE;
let full_key = CacheKey::new("", "a", "1");
let key = full_key.clone().to_compact();
let hash = key.combined_bin();
let meta = (
"meta_key".as_bytes().to_vec(),
"meta_value".as_bytes().to_vec(),
);
let body = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0];
let body_len = body.len();
let cache_object = SccCacheObject {
body: Bytes::from(body.clone()),
len: body_len,
meta: (Bytes::from(meta.0), Bytes::from(meta.1)),
compressed: false,
};
cache
.cache
.insert(hash.clone(), cache_object)
.unwrap_or_else(|_| panic!());
assert!(cache.cache.contains(&hash));
let new_meta = gen_meta();
assert!(cache
.update_meta(&full_key, &new_meta, &Span::inactive().handle())
.await
.unwrap());
let (res_meta, mut res_handler) = MEM_CACHE
.lookup(&full_key, &Span::inactive().handle())
.await
.unwrap()
.unwrap();
assert_eq!(res_meta.serialize().unwrap(), new_meta.serialize().unwrap());
assert_eq!(res_handler.read_body().await.unwrap().unwrap(), &body);
}
#[tokio::test]
async fn test_read_range() {
static MEM_CACHE: Lazy<SccMemoryCache> = Lazy::new(SccMemoryCache::new);
let span = &Span::inactive().handle();
let key1 = CacheKey::new("", "a", "1");
let res = MEM_CACHE.lookup(&key1, span).await.unwrap();
assert!(res.is_none());
let cache_meta = gen_meta();
let mut miss_handler = MEM_CACHE
.get_miss_handler(&key1, &cache_meta, span)
.await
.unwrap();
miss_handler
.write_body(b"test1test2"[..].into(), false)
.await
.unwrap();
miss_handler.finish().await.unwrap();
let (_cache_meta2, mut hit_handler) = MEM_CACHE.lookup(&key1, span).await.unwrap().unwrap();
// out of range
assert!(hit_handler.seek(10000, None).is_err());
assert!(hit_handler.seek(5, None).is_ok());
let data = hit_handler.read_body().await.unwrap().unwrap();
assert_eq!("test2", data);
let data = hit_handler.read_body().await.unwrap();
assert!(data.is_none());
assert!(hit_handler.seek(4, Some(5)).is_ok());
let data = hit_handler.read_body().await.unwrap().unwrap();
assert_eq!("1", data);
let data = hit_handler.read_body().await.unwrap();
assert!(data.is_none());
}
}
use crate::cache::scc_cache::SharedHashMap;
use crate::cache::SccMemoryCache;
use crate::utils::{FutureBackgroundService, ServiceRegistry};
use futures::{future, Future, FutureExt};
use pingora::cache::eviction::lru::Manager as LRUEvictionManager;
use pingora::cache::eviction::EvictionManager;
use pingora::services::background::{background_service, BackgroundService};
use pingora::ErrorType::InternalError;
use pingora::{Error, OrErr, Result};
use rand::{self, Rng};
use serde::ser::SerializeMap;
use serde::Serialize;
use std::fs;
use std::io::{BufReader, BufWriter, Error as StdIoError, Write};
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::time::Duration;
use tokio::runtime::Runtime;
use super::compression::CacheCompression;
use super::CacheBucket;
pub fn load_or_create_cache<C: CacheCompression, const SHARDS: usize>(
dir: PathBuf,
limit: usize,
capacity: usize,
compression: C,
max_file_size: Option<usize>,
save_interval: Option<Duration>,
runtime: &Runtime,
) -> (CacheSerde<SHARDS>, CacheBucket) {
let loaded_cache_serde = runtime
.block_on(CacheSerde::<SHARDS>::load(&dir, limit))
.ok();
let mut cache_serde;
let mut bucket;
if let Some(loaded) = loaded_cache_serde {
log::info!(
"loaded {:?} cache entries from {:?}",
loaded.storage.len(),
&dir
);
let storage = loaded.storage.clone();
bucket = CacheBucket::new(
SccMemoryCache::from_map(loaded.storage)
.with_max_file_size(max_file_size)
.with_compression(compression),
);
if let Some(eviction) = loaded.eviction {
bucket.eviction = Some(eviction);
}
cache_serde = CacheSerde {
eviction: loaded.eviction,
storage,
};
} else {
let new_cache = SccMemoryCache::with_capacity(capacity)
.with_max_file_size(max_file_size)
.with_compression(compression);
let storage = new_cache.cache.clone();
bucket = CacheBucket::new(new_cache);
cache_serde = CacheSerde::<SHARDS> {
storage,
eviction: None,
};
}
if bucket.eviction.is_none() {
let new_eviction = Box::leak(Box::new(LRUEvictionManager::<SHARDS>::with_capacity(
limit, capacity,
)));
cache_serde.eviction = Some(new_eviction);
bucket.eviction = Some(new_eviction);
}
if let Some(interval) = save_interval {
ServiceRegistry::global().add_service(background_service(
&format!("cache saver {:?}", &dir),
cache_serde.background_store_service(dir, interval),
));
}
(cache_serde, bucket)
}
#[derive(Clone)]
pub struct CacheSerde<const N: usize> {
pub storage: SharedHashMap,
pub eviction: Option<&'static LRUEvictionManager<N>>,
}
impl<const N: usize> CacheSerde<N> {
pub fn background_store_service(
&self,
dir: PathBuf,
period: Duration,
) -> impl BackgroundService {
FutureBackgroundService::new(self.periodic_store_future(dir, period))
}
pub fn periodic_store_future(
&self,
dir: PathBuf,
period: Duration,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
periodic_store_impl(self.clone(), dir, period)
.then(|_| future::ready(()))
.boxed()
}
pub async fn load(dir: impl AsRef<Path>, eviction_limit: usize) -> Result<Self> {
let dir_owned = dir.as_ref().to_path_buf();
let storage = tokio::task::spawn_blocking(move || Self::load_cache_map(&dir_owned))
.await
.unwrap()
.map_err(|e| {
Error::create(
pingora::ErrorType::Custom("failed to load stored cache"),
pingora::ErrorSource::Unset,
None,
Some(Box::new(e)),
)
})?;
let mut loaded_eviction = None;
let lru_dir = dir.as_ref().join("lru");
let lru_dir_clone = lru_dir.clone();
if N != 0
&& tokio::task::spawn_blocking(move || lru_dir_clone.exists())
.await
.unwrap()
{
let to_load_eviction = LRUEvictionManager::<N>::with_capacity(eviction_limit, 512);
to_load_eviction
.load(
lru_dir
.as_os_str()
.to_str()
.expect("invalid utf-8 in lru_dir"),
)
.await?;
loaded_eviction = Some(to_load_eviction);
}
let mut eviction = None;
if let Some(loaded_eviction) = loaded_eviction {
eviction = Some(&*Box::leak(Box::new(loaded_eviction)));
}
Ok(CacheSerde { storage, eviction })
}
pub async fn store(&self, dir: impl AsRef<Path>) -> Result<()> {
let owned_dir = dir.as_ref().to_path_buf();
let cache_map = self.storage.clone();
tokio::task::spawn_blocking(move || {
Self::store_cache_map(cache_map, &owned_dir).map_err(|e| {
Error::create(
InternalError,
pingora::ErrorSource::Internal,
None,
Some(Box::new(e)),
)
})
})
.await
.unwrap()?;
if let Some(eviction) = self.eviction {
let lru_dir = dir.as_ref().join("lru");
if eviction.total_items() != 0 {
let lru_dir_owned = lru_dir.clone();
tokio::task::spawn_blocking(move || {
fs::create_dir_all(&lru_dir_owned).or_err_with(InternalError, || {
format!("failed to create dir {:?}", &lru_dir_owned)
})
})
.await
.unwrap()?;
eviction
.save(lru_dir.as_os_str().to_str().expect("non-utf-8 dir"))
.await?;
}
}
Ok(())
}
fn store_cache_map(map: SharedHashMap, dir: &Path) -> Result<(), StdIoError> {
if map.is_empty() {
return Ok(());
}
fs::create_dir_all(&dir)?;
let random_filename: String = rand::thread_rng()
.sample_iter(&rand::distributions::Alphanumeric)
.take(16)
.map(char::from)
.collect();
let tmp_cache_dest = dir.join(random_filename).with_extension("tmp");
let mut tmp_file = BufWriter::new(fs::File::create_new(&tmp_cache_dest)?);
let written = bincode::serde::encode_into_std_write(
SharedHashMapSerde(map),
&mut tmp_file,
bincode::config::standard(),
)
.map_err(|e| StdIoError::new(std::io::ErrorKind::Other, e))?;
log::trace!("Written {:?} bytes while saving cache_map", written);
tmp_file.flush()?;
drop(tmp_file);
fs::rename(tmp_cache_dest, dir.join("storage.bin"))?;
Ok(())
}
fn load_cache_map(path: &Path) -> Result<SharedHashMap, StdIoError> {
let mut reader = BufReader::new(fs::File::open(path.join("storage.bin"))?);
let storage: SharedHashMap =
bincode::serde::decode_from_std_read(&mut reader, bincode::config::standard())
.map_err(|e| StdIoError::new(std::io::ErrorKind::Other, e))?;
Ok(storage)
}
}
async fn periodic_store_impl<const N: usize>(
cache_serde: CacheSerde<N>,
dir: PathBuf,
period: Duration,
) -> Result<()> {
let mut interval = tokio::time::interval(period);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
interval.tick().await;
loop {
interval.tick().await;
let dest = dir.clone();
cache_serde.store(&dest).await?;
}
}
struct SharedHashMapSerde(pub SharedHashMap);
/// Serializes only `cache` in streaming manner, without too much locking
impl Serialize for SharedHashMapSerde {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut serialize_len = self.0.len();
let mut map = serializer.serialize_map(Some(serialize_len))?;
if serialize_len > 0 {
if let Some(mut head) = self.0.first_entry() {
map.serialize_entry(head.key(), head.get())?;
serialize_len -= 1;
if serialize_len > 0 {
while let Some(entry) = head.next() {
map.serialize_entry(entry.key(), entry.get())?;
head = entry;
serialize_len -= 1;
if serialize_len == 0 {
break;
}
}
}
}
}
map.end()
}
}
#[cfg(test)]
mod test {
use super::{SharedHashMap, SharedHashMapSerde};
use ahash::HashSet;
use once_cell::sync::Lazy;
use pingora::cache::CacheKey;
use pingora::cache::Storage;
use rustracing::span::Span;
use crate::cache::{scc_cache::test::gen_meta, SccMemoryCache};
#[tokio::test]
async fn test_cache_serde() {
static MEM_CACHE: Lazy<SccMemoryCache> = Lazy::new(SccMemoryCache::new);
let span = &Span::inactive().handle();
let key1 = CacheKey::new("", "a", "1");
let res = MEM_CACHE.lookup(&key1, span).await.unwrap();
assert!(res.is_none());
let cache_meta = gen_meta();
let mut miss_handler = MEM_CACHE
.get_miss_handler(&key1, &cache_meta, span)
.await
.unwrap();
miss_handler
.write_body(b"test1"[..].into(), false)
.await
.unwrap();
miss_handler
.write_body(b"test2"[..].into(), false)
.await
.unwrap();
miss_handler.finish().await.unwrap();
let ser = SharedHashMapSerde(MEM_CACHE.cache.clone());
let serialized = bincode::serde::encode_to_vec(ser, bincode::config::standard()).unwrap();
let (deserialized, _): (SharedHashMap, usize) =
bincode::serde::decode_from_slice(&serialized, bincode::config::standard()).unwrap();
let mut original_keys = HashSet::default();
let mut original_values = HashSet::default();
MEM_CACHE.cache.scan(|key, val| {
original_keys.insert(key.clone());
original_values.insert(val.clone());
});
let mut deserialized_keys = HashSet::default();
let mut deserialized_values = HashSet::default();
deserialized.scan(|key, val| {
deserialized_keys.insert(key.clone());
let val = val.clone();
deserialized_values.insert(val);
});
assert_eq!(original_keys, deserialized_keys);
assert_eq!(original_values, deserialized_values);
}
}
pub static SHARED_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("Failed to create tokio shared runtime")
});
const CACHE_SAVE_PERIOD: Duration = Duration::from_secs(60 * 15);
pub static BACKEND_CACHE: Lazy<CacheBucket> = Lazy::new(|| {
const SHARDS: usize = 16;
let dir = SHARED_DIR.join("backend");
let (_cache_serde, bucket) = load_or_create_cache::<_, SHARDS>(
dir.clone(),
MB * 128,
8192,
NoCompression,
Some(MB * 8),
Some(CACHE_SAVE_PERIOD),
&*SHARED_RUNTIME,
);
bucket
});
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use pingora::{server::Server, services::Service};
pub static GLOBAL_REGISTRY: Lazy<ServiceRegistry> = Lazy::new(|| ServiceRegistry::default());
/// Helper struct that holds [Service]s to be registered to server, so you don't need to carry &mut Server around
/// Similar to prometheus::default_registry() approach.
#[derive(Default)]
pub struct ServiceRegistry {
pub services: Mutex<Vec<Box<dyn Service + 'static>>>,
}
impl ServiceRegistry {
/// Get global registry
pub fn global() -> &'static ServiceRegistry {
&*GLOBAL_REGISTRY
}
/// Add service to be registered later
pub fn add_service<S: Service + 'static>(&self, service: S) {
self.services.lock().push(Box::new(service))
}
/// Add service to be registered later, returning self for chaining
pub fn with_service<S: Service + 'static>(&self, service: S) -> &Self {
self.add_service(service);
self
}
/// Drain services from registry and register them to server
pub fn register(&self, server: &mut Server) {
let services: Vec<_> = self.services.lock().drain(..).collect();
server.add_services(services);
}
}
use pingora::cache::eviction::EvictionManager;
use pingora::cache::lock::CacheLock;
use pingora::cache::predictor::CacheablePredictor;
use pingora::cache::HttpCache;
use pingora::cache::Storage;
use pingora::proxy::Session;
/// Caching arguments for `pingora::cache::HttpCache::enable` as one bucket
#[derive(Clone, Copy)]
pub struct CacheBucket {
pub storage: &'static (dyn Storage + Sync),
pub eviction: Option<&'static (dyn EvictionManager + Sync)>,
pub predictor: Option<&'static (dyn CacheablePredictor + Sync)>,
pub cache_lock: Option<&'static CacheLock>,
}
impl CacheBucket {
pub fn new<T>(storage: T) -> Self
where
T: Storage + Sync + 'static,
{
CacheBucket {
storage: Box::leak(Box::new(storage)),
eviction: None,
predictor: None,
cache_lock: None,
}
}
pub fn with_eviction<T: EvictionManager + Sync + 'static>(mut self, eviction: T) -> Self {
let b = Box::new(eviction);
self.eviction = Some(Box::leak(b));
self
}
pub fn without_eviction(&self) -> Self {
let mut this = self.clone();
this.eviction = None;
this
}
pub fn with_predictor<T: CacheablePredictor + Sync + 'static>(mut self, predictor: T) -> Self {
let b = Box::new(predictor);
self.predictor = Some(Box::leak(b));
self
}
pub fn without_predictor(&self) -> Self {
let mut this = self.clone();
this.predictor = None;
this
}
pub fn with_cache_lock(mut self, cache_lock: CacheLock) -> Self {
let b = Box::new(cache_lock);
self.cache_lock = Some(Box::leak(b));
self
}
pub fn without_cache_lock(&self) -> Self {
let mut this = self.clone();
this.cache_lock = None;
this
}
pub fn enable_cache(&self, cache: &mut HttpCache) {
cache.enable(self.storage, self.eviction, self.predictor, self.cache_lock)
}
pub fn enable(&self, session: &mut Session) {
self.enable_cache(&mut session.cache)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment