Skip to content

Instantly share code, notes, and snippets.

@rikonor
Created October 31, 2024 14:27
Show Gist options
  • Save rikonor/2cfbf1b3cebd2c71c692ea0a3af1f507 to your computer and use it in GitHub Desktop.
Save rikonor/2cfbf1b3cebd2c71c692ea0a3af1f507 to your computer and use it in GitHub Desktop.
Log Anomyization
use std::{
sync::Arc,
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};
use anyhow::{Context, Error};
use async_trait::async_trait;
use ic_agent::{export::Principal, identity::AnonymousIdentity, Agent};
use rand::{rngs::StdRng, SeedableRng};
use rsa::{
pkcs1::{DecodeRsaPublicKey, EncodeRsaPublicKey},
rand_core::CryptoRngCore,
Pkcs1v15Encrypt, RsaPrivateKey, RsaPublicKey,
};
use tokio::time::sleep;
const SALT_SIZE: usize = 64;
const RSA_KEY_SIZE: usize = 2048;
#[tokio::main]
async fn main() -> Result<(), Error> {
// Canister
let ag = Agent::builder()
.with_url("http://www.example.com")
.with_identity(AnonymousIdentity)
.build()?;
let cid =
Principal::from_text("y2nf5-opuh7-4mmip-eebh3-db6fb-rjtz7-hd4ij-i47ip-o275s-zezdc-kqe")?;
let c = Canister::new(
ag, // agent
cid, // canister_id
);
// Canister method (register)
let register = {
let c = c.clone();
let c = WithLogs(c);
Arc::new(c)
};
// Canister method (query)
let query = {
let c = c.clone();
let c = WithLogs(c);
Arc::new(c)
};
// Canister method (submit)
let submit = {
let c = c.clone();
let c = WithLogs(c);
Arc::new(c)
};
let cm = CanisterMethods {
register,
query,
submit,
};
// Rng
let rng_seed = SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos();
let rng = Box::new(StdRng::seed_from_u64(rng_seed as u64));
// Tracker
Tracker::new(rng, cm)?
.track(|value| {
println!("Got {value:?}");
})
.await?;
Ok(())
}
// Canister
#[derive(Debug, thiserror::Error)]
pub enum RegisterError {
#[error("unauthorized")]
Unauthorized,
#[error(transparent)]
UnexpectedError(#[from] anyhow::Error),
}
#[async_trait]
trait Register: Sync + Send {
async fn register(&self, pubkey: &[u8]) -> Result<(), RegisterError>;
}
/// LeaderMode indicates whether a new salt is required
#[derive(Debug)]
enum LeaderMode {
/// Generate a fresh salt
Bootstrap,
/// Refresh the encrypted values
Refresh,
}
/// Pair associates a principal with a binary-blob (pubkey, ciphertext, etc)
#[derive(Debug)]
struct Pair(Principal, Vec<u8>);
#[derive(Debug, thiserror::Error)]
enum QueryError {
#[error("leader assignment received")]
Leader(LeaderMode, Vec<Pair>),
#[error("unauthorized")]
Unauthorized,
#[error(transparent)]
UnexpectedError(#[from] anyhow::Error),
}
#[async_trait]
trait Query: Sync + Send {
async fn query(&self) -> Result<Vec<u8>, QueryError>;
}
#[derive(Debug, thiserror::Error)]
pub enum SubmitError {
#[error("unauthorized")]
Unauthorized,
#[error(transparent)]
UnexpectedError(#[from] anyhow::Error),
}
#[async_trait]
trait Submit: Sync + Send {
async fn submit(&self, vs: &[Pair]) -> Result<(), SubmitError>;
}
#[derive(Clone)]
struct Canister {
agent: Agent,
cid: Principal,
}
impl Canister {
fn new(agent: Agent, cid: Principal) -> Self {
Self { agent, cid }
}
}
#[async_trait]
impl Register for Canister {
async fn register(&self, pubkey: &[u8]) -> Result<(), RegisterError> {
println!("registering");
sleep(Duration::from_secs(2)).await;
Ok(())
}
}
#[async_trait]
impl Query for Canister {
async fn query(&self) -> Result<Vec<u8>, QueryError> {
println!("querying");
sleep(Duration::from_secs(2)).await;
Ok(vec![])
}
}
#[async_trait]
impl Submit for Canister {
async fn submit(&self, vs: &[Pair]) -> Result<(), SubmitError> {
println!("submitting");
sleep(Duration::from_secs(2)).await;
Ok(())
}
}
// Canister methods
struct CanisterMethods {
/// register method tied to canister
register: Arc<dyn Register>,
/// query method tied to canister
query: Arc<dyn Query>,
/// submit method tied to canister
submit: Arc<dyn Submit>,
}
struct WithLogs<T>(T);
#[async_trait]
impl<T: Register> Register for WithLogs<T> {
async fn register(&self, pubkey: &[u8]) -> Result<(), RegisterError> {
let start_time = Instant::now();
let out = self.0.register(&pubkey).await;
let status = match &out {
Ok(_) => "ok",
Err(err) => match err {
RegisterError::Unauthorized => "unauthorized",
RegisterError::UnexpectedError(_) => "fail",
},
};
let duration = start_time.elapsed().as_secs_f64();
println!(
"action = 'register', status = {status}, duration = {duration}, error = {:?}",
out.as_ref().err()
);
return out;
}
}
#[async_trait]
impl<T: Query> Query for WithLogs<T> {
async fn query(&self) -> Result<Vec<u8>, QueryError> {
let start_time = Instant::now();
let out = self.0.query().await;
let status = match &out {
Ok(_) => "ok",
Err(err) => match err {
QueryError::Unauthorized => "unauthorized",
QueryError::Leader(mode, _) => match mode {
LeaderMode::Bootstrap => "leader-bootstrap",
LeaderMode::Refresh => "leader-refresh",
},
QueryError::UnexpectedError(_) => "fail",
},
};
let duration = start_time.elapsed().as_secs_f64();
println!(
"action = 'query', status = {status}, duration = {duration}, error = {:?}",
out.as_ref().err()
);
return out;
}
}
#[async_trait]
impl<T: Submit> Submit for WithLogs<T> {
async fn submit(&self, vs: &[Pair]) -> Result<(), SubmitError> {
let start_time = Instant::now();
let out = self.0.submit(vs).await;
let status = match &out {
Ok(_) => "ok",
Err(err) => match err {
SubmitError::Unauthorized => "unauthorized",
SubmitError::UnexpectedError(_) => "fail",
},
};
let duration = start_time.elapsed().as_secs_f64();
println!(
"action = 'submit', status = {status}, duration = {duration}, error = {:?}",
out.as_ref().err()
);
return out;
}
}
// Client
#[async_trait]
trait Track: Sync + Send {
async fn track(&mut self, cb: impl Fn(Vec<u8>) + Send + Sync) -> Result<(), Error>;
}
struct Tracker {
/// rng for generating a salt when needed
rng: Box<dyn CryptoRngCore + Send + Sync>,
/// canister client for salt sharing
canister: CanisterMethods,
/// Ephemeral private key for identifying client
pkey: RsaPrivateKey,
/// Current value of the salt
cur: Option<Vec<u8>>,
}
impl Tracker {
fn new(
mut rng: Box<dyn CryptoRngCore + Send + Sync>,
canister: CanisterMethods,
) -> Result<Self, Error> {
// Generate private key
let pkey = RsaPrivateKey::new(&mut rng, RSA_KEY_SIZE)
.context("failed to generate rsa private key")?;
Ok(Self {
rng,
canister,
pkey,
cur: None,
})
}
fn vec_pubkey(&self) -> Vec<u8> {
self.pkey
.to_public_key()
.to_pkcs1_der()
.expect("failed to encode public-key")
.to_vec()
}
}
#[async_trait]
impl Track for Tracker {
async fn track(&mut self, cb: impl Fn(Vec<u8>) + Send + Sync) -> Result<(), Error> {
// Register public-key
loop {
if self
.canister
.register
.register(&self.vec_pubkey())
.await
.is_ok()
{
break;
}
}
loop {
match self.canister.query.query().await {
// Ok means we got a new salt value
Ok(ct) => {
// Decrypt salt
let salt = match self.pkey.decrypt(
Pkcs1v15Encrypt, // padding
&ct, // ciphertext
) {
Ok(v) => v,
// Retry on failure
Err(_) => continue,
};
// Set value
self.cur = Some(salt.to_owned());
// Trigger callback
cb(salt);
}
// Leader means we're being asked to generate a salt
// and encrypt it for others
Err(QueryError::Leader(mode, pairs)) => {
let salt = match mode {
// Generate salt
LeaderMode::Bootstrap => {
let mut salt = vec![0u8; SALT_SIZE];
self.rng.fill_bytes(&mut salt);
salt
}
// Reuse existing salt
LeaderMode::Refresh => {
match &self.cur {
// Use existing salt
Some(salt) => salt.to_owned(),
// Or re-register pubkey to indicate we also need a salt
None => todo!(),
}
}
};
// Encrypt salt for each principal
let mut out = vec![];
for Pair(p, pk) in pairs {
// Parse public-key
let pubkey = match RsaPublicKey::from_pkcs1_der(&pk) {
Ok(v) => v,
// Skip invalid keys
Err(_) => continue,
};
// Encrypt salt for principal
let ct = match pubkey.encrypt(
&mut self.rng, // rng
Pkcs1v15Encrypt, // padding
&salt, // msg
) {
Ok(v) => v,
// Skip on failure
Err(_) => continue,
};
// Append to result
out.push(Pair(
p, // principal
ct, // ciphertext
));
}
// Submit encrypted salt values
let _ = self.canister.submit.submit(&out).await;
}
Err(_) => continue,
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment