Skip to content

Instantly share code, notes, and snippets.

@grampelberg
Created February 1, 2026 22:50
Show Gist options
  • Select an option

  • Save grampelberg/d8bb58ad81f34076f0a705452bedcd8f to your computer and use it in GitHub Desktop.

Select an option

Save grampelberg/d8bb58ad81f34076f0a705452bedcd8f to your computer and use it in GitHub Desktop.
loop {
tokio::select! {
_ = connect_retry.tick(), if connection.is_none() => {
let Ok(conn) = endpoint.connect(self.reader_addr.clone(), reader::Sink::ALPN).await else {
writer_log!(self, info, "connection failed");
metrics::counter!("writer.connect_failed").increment(1);
continue;
};
// metrics::counter!("writer.connect").increment(1);
// metrics::gauge!("writer.connected").set(1);
connection = Some(conn);
}
err = async {
if let Some(conn) = connection.as_ref() {
Some(conn.closed().await)
} else {
None
}
}, if connection.is_some() => {
writer_log!(self, info, err = ?err, "connection closed");
metrics::counter!("writer.disconnect").increment(1);
metrics::gauge!("writer.connected").set(0);
}
val = self.rx.recv() => {
let Some(record) = val else {
return Err(eyre!("channel closed"));
};
let Some(conn) = connection.as_ref() else {
metrics::counter!("writer.skipped").increment(1);
continue;
};
let mut sender = conn.open_uni().await?;
sender.write_all(&record.encode()?).await?;
sender.finish()?;
metrics::counter!("writer.sent").increment(1);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment