Created
July 30, 2021 20:58
-
-
Save xrl/3c5727e30e78ae300539fd93defc031b to your computer and use it in GitHub Desktop.
Using rust kube-rs to check, poll, and renew advisory locks in Kubernetes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use k8s_openapi::api::coordination::v1::{Lease as KubeLease, LeaseSpec as KubeLeaseSpec}; | |
use chrono::{Local, Utc}; | |
use k8s_openapi::apimachinery::pkg::apis::meta::v1::MicroTime; | |
use std::time::Duration; | |
use kube::api::{PatchParams, PostParams, ObjectMeta}; | |
use kube::Api; | |
use tokio::task::JoinHandle; | |
use tokio::sync::oneshot::Sender; | |
const LEASE_DURATION_SECONDS: u64 = 5; | |
pub struct Lease { | |
join_handle: JoinHandle<()>, | |
sender: Sender<()> | |
} | |
impl Lease { | |
pub fn lease_name() -> &'static str { | |
"cousteau" | |
} | |
pub fn lease_duration() -> u64 { | |
LEASE_DURATION_SECONDS | |
} | |
pub async fn available(kube_api_client: crate::Client, ns: &str, lease_name: &str) -> Result<bool, String> { | |
let lease_client: Api<KubeLease> = kube::Api::namespaced(kube_api_client.kube_api_client.clone(), ns); | |
let get_lease = lease_client.get(lease_name).await; | |
match get_lease { | |
Err(kube::Error::Api(kube::error::ErrorResponse{ code: 404, ..})) => Ok(true), | |
Err(err) => Err(format!("{:#?}", err)), | |
Ok(lease) => Ok(Self::lease_expired(&lease)) | |
} | |
} | |
pub async fn acquire_or_create(kube_api_client: crate::Client, ns: &str, lease_name: &str, identity: &str) -> Result<Lease, ()>{ | |
// let lease_client = self.kube_api_client | |
let lease_client: Api<KubeLease> = kube::Api::namespaced(kube_api_client.kube_api_client.clone(), ns); | |
// check for lease | |
let lease = loop { | |
let get_lease = lease_client.get(lease_name).await; | |
if let Err(kube::Error::Api(kube::error::ErrorResponse { code: 404, .. })) = get_lease { | |
trace!("lease does not existing, instantiating with defaults"); | |
let lease = lease_client.create(&PostParams::default(), &KubeLease { | |
metadata: ObjectMeta { namespace: Some(ns.to_string()), name: Some(lease_name.to_string()), ..Default::default() }, | |
spec: Some(KubeLeaseSpec { | |
acquire_time: Some(Self::now()), | |
lease_duration_seconds: Some(LEASE_DURATION_SECONDS as i32), | |
holder_identity: Some(identity.to_string()), | |
lease_transitions: Some(1), | |
..Default::default() | |
}) | |
}).await.unwrap(); | |
break lease; | |
} else if let Ok(mut lease) = get_lease { | |
if Self::lease_expired(&lease) { | |
trace!("the lease expired, taking ownership"); | |
lease.metadata.managed_fields = None; | |
let spec = lease.spec.as_mut().unwrap(); | |
if spec.lease_transitions.is_none() { | |
spec.lease_transitions = Some(0); | |
} | |
spec.lease_transitions.as_mut().map(|lt| *lt = *lt + 1); | |
spec.acquire_time = Some(Self::now()); | |
spec.renew_time = None; | |
spec.lease_duration_seconds = Some(LEASE_DURATION_SECONDS as i32); | |
spec.holder_identity = Some(identity.to_string()); | |
lease = lease_client.patch(lease_name, | |
&PatchParams::apply("cousteau").force(), | |
serde_json::to_vec(&lease).unwrap()).await.unwrap(); | |
break lease; | |
} else { | |
let wait_time = match lease.spec { | |
Some(KubeLeaseSpec{lease_duration_seconds: Some(lds), ..}) => lds as u64, | |
_ => LEASE_DURATION_SECONDS | |
}; | |
trace!("lease is not ready, let's wait {} seconds and try again", wait_time); | |
tokio::time::delay_for(Duration::from_secs(wait_time)).await; | |
continue; | |
} | |
} else { | |
panic!("what in the {:#?}", get_lease); | |
}; | |
}; | |
let (sender,mut recv) = tokio::sync::oneshot::channel(); | |
let renew_client = lease_client.clone(); | |
let mut renew_resource_version = lease.metadata.resource_version.clone(); | |
let renew_object_name = lease_name.to_string(); | |
let renew_lease_duration_seconds = lease.spec.as_ref().unwrap().lease_duration_seconds.unwrap(); | |
let join_handle = tokio::spawn(async move { | |
let mut interval = tokio::time::interval(std::time::Duration::from_secs(renew_lease_duration_seconds as u64)); | |
loop { | |
tokio::select! { | |
_ = interval.tick() => { | |
trace!("interval tick fired, good time to renew stuff"); | |
let patch_params = PatchParams::apply("cousteau"); | |
let patch = serde_yaml::to_vec(&serde_json::json!({ | |
"apiVersion": "coordination.k8s.io/v1", | |
"kind": "Lease", | |
"metadata": { | |
"resourceVersion": renew_resource_version, | |
"name": renew_object_name | |
}, | |
"spec": { | |
"renewTime": Self::now(), | |
} | |
})).unwrap(); | |
let patch_res = renew_client.patch(&renew_object_name, &patch_params, patch).await.unwrap(); | |
renew_resource_version = patch_res.metadata.resource_version; | |
} | |
_ = &mut recv => { | |
trace!("receiver woke up"); | |
break | |
} | |
} | |
} | |
trace!("all done looping, zeroing out the lease"); | |
let patch_params = PatchParams::apply("cousteau"); | |
let patch = serde_yaml::to_vec(&serde_json::json!({ | |
"apiVersion": "coordination.k8s.io/v1", | |
"kind": "Lease", | |
"metadata": { | |
"resourceVersion": renew_resource_version, | |
"name": renew_object_name | |
}, | |
"spec": { | |
"renewTime": Option::<()>::None, | |
"acquireTime": Option::<()>::None, | |
"holderIdentity": Option::<()>::None | |
} | |
})).unwrap(); | |
renew_client.patch(&renew_object_name, &patch_params, patch).await.unwrap(); | |
trace!("all done with the lease"); | |
}); | |
return Ok(Lease { | |
join_handle, | |
sender | |
}) | |
} | |
fn now() -> MicroTime { | |
let local_now = Local::now(); | |
MicroTime(local_now.with_timezone(&Utc)) | |
} | |
fn lease_expired(lease: &KubeLease) -> bool { | |
let KubeLeaseSpec{acquire_time, renew_time, lease_duration_seconds, ..} = lease.spec.as_ref().unwrap(); | |
let local_now = Local::now(); | |
let utc_now = local_now.with_timezone(&Utc); | |
let lease_duration = chrono::Duration::seconds(*lease_duration_seconds.as_ref().unwrap() as i64); | |
if let Some(MicroTime(time)) = renew_time { | |
let renew_expire = time.checked_add_signed(lease_duration).unwrap(); | |
return utc_now.gt(&renew_expire) | |
} else if let Some(MicroTime(time)) = acquire_time { | |
let acquire_expire = time.checked_add_signed(lease_duration).unwrap(); | |
return utc_now.gt(&acquire_expire) | |
} | |
return true; | |
} | |
pub async fn join(self) -> Result<(),tokio::task::JoinError> { | |
self.sender.send(()).unwrap(); | |
self.join_handle.await | |
} | |
} | |
#[cfg(test)] | |
mod test { | |
use super::*; | |
use crate::Client; | |
#[tokio::test] | |
async fn test_lease() { | |
let client = Client::new().await; | |
let lease = Lease::acquire_or_create(client, "staging", "cousteau","test-case").await.unwrap(); | |
tokio::time::delay_for(Duration::from_secs(5*LEASE_DURATION_SECONDS as u64)).await; | |
lease.join().await.unwrap(); | |
} | |
#[tokio::test] | |
async fn test_availability() { | |
let client = Client::new().await; | |
let lease = Lease::acquire_or_create(client.clone(), "production", "cousteau","test-case").await.unwrap(); | |
let available = Lease::available(client, "production", "cousteau").await.unwrap(); | |
panic!("available: {:#?}", available); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment