Skip to content

Instantly share code, notes, and snippets.

@mooreniemi
Created April 29, 2024 01:51
Show Gist options
  • Save mooreniemi/dcb3622b49c484d1bec4220da1ffb40f to your computer and use it in GitHub Desktop.
Save mooreniemi/dcb3622b49c484d1bec4220da1ffb40f to your computer and use it in GitHub Desktop.
a basic cas op for etcd
/// Increments a counter stored in a JSON object within etcd under the specified key.
/// Assumes the JSON structure is {"count": int}. Will try 5 times before failing.
async fn increment_counter(
etcd: &AsyncMutex<Client>,
key: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let key = key.to_string();
let mut client = etcd.lock().await;
// FIXME: actually we don't want to retry - we want to bail out and try to find another assignment
let mut attempts = 0;
while attempts < 5 {
let resp = client.get(key.clone(), None).await?;
let kv = resp.kvs().get(0);
let initial_value = kv.map_or(0, |kv| {
std::str::from_utf8(kv.value())
.unwrap()
.parse::<i64>()
.unwrap_or(0)
});
let txn = Txn::new();
let cmp = Compare::value(key.clone(), CompareOp::Equal, initial_value.to_string());
let succ = TxnOp::put(key.clone(), (initial_value + 1).to_string(), None);
let fail = TxnOp::get(key.clone(), None);
let txn = txn.when(vec![cmp]).and_then(vec![succ]).or_else(vec![fail]);
let txn_resp = client.txn(txn).await;
if txn_resp.is_ok() {
log::info!("Success: count incremented to {}", initial_value + 1);
return Ok(());
} else {
log::error!(
"Failure: count value changed, retrying attempt {}",
attempts + 1
);
attempts += 1;
sleep(Duration::from_millis(
(100 * 2_i64.pow(attempts as u32 - 1)).try_into().unwrap(),
))
.await;
}
}
Err("Failed to increment counter after 5 attempts".into())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment