Skip to content

Instantly share code, notes, and snippets.

@Object905
Last active August 17, 2025 12:55
Show Gist options
  • Save Object905/6cafd5e8e56dd60670149296411a407f to your computer and use it in GitHub Desktop.
Save Object905/6cafd5e8e56dd60670149296411a407f to your computer and use it in GitHub Desktop.
Pingora kubernetes/DNS ServiceDiscovery
use async_trait::async_trait;
use hickory_resolver::TokioAsyncResolver;
use http::Extensions;
use pingora::lb::discovery::ServiceDiscovery;
use pingora::lb::selection::{BackendIter, BackendSelection};
use pingora::lb::{Backend, Backends, LoadBalancer};
use pingora::protocols::l4::socket::SocketAddr;
use pingora::{Error, ErrorSource, ErrorType, Result as PingoraResult};
use std::fmt::Debug;
use std::net::{IpAddr, SocketAddrV4};
use std::{
collections::{BTreeSet, HashMap},
sync::Arc,
};
/// Service discovery that resolves domains to Backends with DNS lookup using `hickory_resolver` crate.
///
/// Only IPv4 addresses are used, IPv6 ignored silently.
#[derive(Debug, Clone)]
pub struct DnsDiscovery {
/// Domain that will be resolved
pub domain: String,
// Port used for Backend
pub port: u16,
/// Resolver from `hickory_resolver`
pub resolver: Arc<TokioAsyncResolver>,
/// Extensions that will be set to backends
pub extensions: Option<Extensions>,
}
impl DnsDiscovery {
pub fn new<D: Into<String>>(domain: D, port: u16, resolver: Arc<TokioAsyncResolver>) -> Self {
DnsDiscovery {
domain: domain.into(),
port,
resolver,
extensions: None,
}
}
pub fn with_extensions(mut self, extensions: Extensions) -> Self {
self.extensions = Some(extensions);
self
}
}
#[async_trait]
impl ServiceDiscovery for DnsDiscovery {
async fn discover(&self) -> PingoraResult<(BTreeSet<Backend>, HashMap<u64, bool>)> {
let records = self.resolver.lookup_ip(&self.domain).await.map_err(|err| {
Error::create(
ErrorType::Custom("DNS lookup error"),
ErrorSource::Internal,
Some(format!("{:?}", self).into()),
Some(err.into()),
)
})?;
let result: BTreeSet<_> = records
.iter()
.filter_map(|ip| match ip {
IpAddr::V4(ip) => Some(SocketAddr::Inet(std::net::SocketAddr::V4(
SocketAddrV4::new(ip, self.port),
))),
IpAddr::V6(_) => None,
})
.map(|addr| Backend {
addr: addr,
weight: 1,
ext: Extensions::new(),
})
.collect();
Ok((result, HashMap::new()))
}
}
impl From<DnsDiscovery> for Backends {
fn from(value: DnsDiscovery) -> Self {
Backends::new(Box::new(value))
}
}
impl<S> From<DnsDiscovery> for LoadBalancer<S>
where
S: BackendSelection + 'static,
S::Iter: BackendIter,
{
fn from(value: DnsDiscovery) -> Self {
LoadBalancer::from_backends(value.into())
}
}
use async_trait::async_trait;
use futures::future::ready;
use futures::future::{select, Either};
use futures::{Future, FutureExt, Stream, StreamExt};
use http::Extensions;
use k8s_openapi::api::discovery::v1::EndpointSlice;
use kube::api::Api;
use kube::runtime::reflector::Store;
use kube::runtime::watcher::Config;
use kube::runtime::{reflector, watcher, WatchStreamExt};
use kube::Client;
use pingora::lb::discovery::ServiceDiscovery;
use pingora::lb::selection::{BackendIter, BackendSelection};
use pingora::lb::Backends;
use pingora::lb::{Backend, LoadBalancer};
use pingora::protocols::l4::socket::SocketAddr;
use pingora::server::ShutdownWatch;
use pingora::services::background::BackgroundService;
use pingora::Result as PingoraResult;
use std::collections::{BTreeSet, HashMap};
use std::fmt::Debug;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::Mutex;
/// Service discovery that gets addresses from EndpointSlices from kubernetes cluster.
///
/// Only IPv4 addresses are used, IPv6 ignored silently.
#[derive(Debug, Clone)]
pub struct EndpointSliceDiscovery {
/// Store from kube reflector. Required fields are conditions and addresses.
pub store: Store<EndpointSlice>,
// Port used for Backend. Not checked against EndpointSlice actual ports!
pub port: u16,
/// Extensions that will be set to backends
pub extensions: Option<Extensions>,
}
impl EndpointSliceDiscovery {
/// Make discovery for kubernetes Service.
/// Also returns BackgroundService that should be registered for timely updates of EndpointSlices.
pub fn for_service(
client: Client,
namespace: impl AsRef<str>,
service: impl AsRef<str>,
port: u16,
) -> (Self, EndpointSliceDiscoveryUpdater) {
let filter = Config::default()
.labels(format!("kubernetes.io/service-name={}", service.as_ref()).as_str());
Self::from_kube_config(client, namespace, filter, port)
}
/// Make discovery from `kube::runtime::watcher::Config` for EndpointSlice selection.
pub fn from_kube_config(
client: Client,
namespace: impl AsRef<str>,
config: Config,
port: u16,
) -> (Self, EndpointSliceDiscoveryUpdater) {
let endpoint_slices = Api::<EndpointSlice>::namespaced(client, namespace.as_ref());
let (store, writer) = reflector::store();
let watcher = watcher(endpoint_slices, config);
let watcher_stream = watcher.default_backoff().reflect(writer);
let kube_discovery = EndpointSliceDiscovery {
store: store,
port: port,
extensions: None,
};
let kube_upd_service = EndpointSliceDiscoveryUpdater {
stream: Mutex::new(Some(Box::pin(
watcher_stream
.filter_map(|e| ready(Some(e.unwrap())))
.boxed(),
))),
lb: None,
};
(kube_discovery, kube_upd_service)
}
fn endpoint_slices_to_backends<'a>(
&'a self,
endpoint_slice: &'a EndpointSlice,
) -> impl Iterator<Item = Backend> + 'a {
endpoint_slice
.endpoints
.iter()
.filter(|&e| {
if let Some(conditions) = e.conditions.as_ref() {
if let Some(ready) = conditions.ready {
return ready;
}
}
false
})
.map(|endpoint| {
endpoint
.addresses
.iter()
.map(|ip| ip.parse().expect("Failed to parse endpoint IPv4 address"))
})
.flatten()
.map(|ip| SocketAddr::Inet(std::net::SocketAddr::V4(SocketAddrV4::new(ip, self.port))))
.map(|addr| Backend {
addr: addr,
weight: 1,
ext: self.extensions.clone().unwrap_or_else(Extensions::new),
})
}
}
impl From<EndpointSliceDiscovery> for Backends {
fn from(value: EndpointSliceDiscovery) -> Self {
Backends::new(Box::new(value))
}
}
impl<S> From<EndpointSliceDiscovery> for LoadBalancer<S>
where
S: BackendSelection + 'static,
S::Iter: BackendIter,
{
fn from(value: EndpointSliceDiscovery) -> Self {
LoadBalancer::from_backends(value.into())
}
}
#[async_trait]
impl ServiceDiscovery for EndpointSliceDiscovery {
async fn discover(&self) -> PingoraResult<(BTreeSet<Backend>, HashMap<u64, bool>)> {
self.store.wait_until_ready().await.unwrap();
let state = self.store.state();
let result: BTreeSet<Backend> = state
.into_iter()
.filter(|e| e.address_type == "IPv4")
.map(|e| {
let backends: Vec<Backend> = self.endpoint_slices_to_backends(&*e).collect();
backends
})
.flatten()
.collect();
Ok((result, HashMap::new()))
}
}
/// Helper trait implemented for LoadBalancer to erase generic.
pub trait LoadBalancerUpdater: Send + Sync + 'static {
fn update<'a>(&'a self) -> Pin<Box<dyn Future<Output = PingoraResult<()>> + Send + 'a>>;
}
impl<S> LoadBalancerUpdater for LoadBalancer<S>
where
S: BackendSelection + Send + Sync + 'static,
S::Iter: BackendIter,
{
fn update<'a>(&'a self) -> Pin<Box<dyn Future<Output = PingoraResult<()>> + Send + 'a>> {
self.update().boxed()
}
}
impl<S> LoadBalancerUpdater for Arc<LoadBalancer<S>>
where
S: BackendSelection + Send + Sync + 'static,
S::Iter: BackendIter,
{
fn update<'a>(&'a self) -> Pin<Box<dyn Future<Output = PingoraResult<()>> + Send + 'a>> {
(**self).update().boxed()
}
}
/// Background updater of load balancer when EndpointSlice stream is changed
///
/// User is required to call with_lb/set_lb before registering it as background services, otherwise will panic
pub struct EndpointSliceDiscoveryUpdater {
/// Event stream from kubernetes reflector
stream: Mutex<Option<Pin<Box<dyn Stream<Item = watcher::Event<EndpointSlice>> + Send>>>>,
/// Load balancer that will be updated on EndpointSlice changes. BackgroundService Will panic if left as None.
lb: Option<Box<dyn LoadBalancerUpdater>>,
}
impl EndpointSliceDiscoveryUpdater {
pub fn with_lb<T: LoadBalancerUpdater>(mut self, lb: T) -> Self {
self.set_lb(lb);
self
}
pub fn set_lb<T: LoadBalancerUpdater>(&mut self, lb: T) {
self.lb = Some(Box::new(lb));
}
}
#[async_trait]
impl BackgroundService for EndpointSliceDiscoveryUpdater {
async fn start(&self, mut _shutdown: ShutdownWatch) {
if self.lb.is_none() {
panic!("Need to set `lb` in EndpointSliceDiscoveryUpdater")
}
let owned_stream = self
.stream
.lock()
.unwrap()
.take()
.expect("No stream in EndpointSliceDiscoveryUpdater");
let stream = owned_stream
.for_each(|event| async move {
match event {
watcher::Event::InitDone
| watcher::Event::Apply(_)
| watcher::Event::Delete(_) => {
self.lb
.as_ref()
.unwrap()
.update()
.await
.expect("Failed to update backends");
}
_ => {}
}
()
})
.boxed();
let completed = select(stream, _shutdown.changed().boxed()).await;
match completed {
Either::Left(_) => {
panic!("Kubernetes update stream ended abruptly");
}
Either::Right(_) => {
// terminating
}
}
}
}
let (kube_discovery, kube_updater) = EndpointSliceDiscovery::for_service(kube_client, "default", kube_svc, kube_port);
// make your Backends and load balancer here
// ...
// let lb: Arc<LoadBalancer<Weighted<RoundRobin>>> = ...;
// provide backgroud updater with load balancer to update when kube service is changed
kube_updater.set_lb(lb.clone());
// make background service for pingora
let updater_background_service = background_service(&format!("Kube service updater {} {}", kube_svc, kube_port), kube_updater);
// register it to your server
server.add_service(updater_background_service);
@shenshouer
Copy link

shenshouer commented Nov 28, 2024

@Object905 Thank you very much. With reference to the code you provided, the issue has been resolved. I modified it to use std::sync::LazyLock from the standard library, below is the implementation based on your code:

pub static SHARED_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
    runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .expect("Failed to create tokio shared runtime")
});

pub static KUBE: LazyLock<Option<Client>> = LazyLock::new(|| {
    let config = Config::incluster()
        .ok()
        .or(SHARED_RUNTIME.block_on(Config::infer()).ok())?;
    let _guard = SHARED_RUNTIME.enter();
    let client = Client::try_from(config).ok()?;
    Some(client)
});

@shenshouer
Copy link

I think there might be an issue when using a separate Runtime stored in a static variable during the process of gracefully upgrading the service on the Pingora server.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment