Skip to content

Instantly share code, notes, and snippets.

@hgomersall
Last active May 29, 2025 17:17
Show Gist options
  • Save hgomersall/8ce00dbc781149dca4641663cd193435 to your computer and use it in GitHub Desktop.
Save hgomersall/8ce00dbc781149dca4641663cd193435 to your computer and use it in GitHub Desktop.
Example of how I'm showing what I put on the wire
#[derive(Debug)]
struct MonitoredWriter<W: AsyncWriteExt + Unpin> {
stream: W,
id_string: String,
held_buffer: Vec<u8>,
write_flag: AtomicBool,
}
impl<W: AsyncWriteExt + Unpin> MonitoredWriter<W> {
fn new(stream: W, id_string: String) -> MonitoredWriter<W> {
MonitoredWriter { stream, id_string, held_buffer: Vec::new(), write_flag: AtomicBool::new(false) }
}
#[instrument(name = "monitored_write_all_buf", level = "trace", skip_all)]
async fn write_all_buf<B: Buf>(&mut self, buf: &mut B) -> IoResult<()> {
if self.write_flag.swap(true, Ordering::SeqCst) {
panic!("The previous call did not complete as expected");
}
compiler_fence(Ordering::SeqCst);
let buf_remaining = buf.remaining();
let buf_chunk = buf.chunk();
if buf_chunk.len() != buf_remaining {
panic!("Can only deal with whole chunks");
}
self.held_buffer.extend_from_slice(buf_chunk);
let res = self.stream.write_all_buf(buf).await;
let remaining_now = buf.remaining();
if remaining_now > 0 {
self.held_buffer.truncate(self.held_buffer.len() - remaining_now);
}
compiler_fence(Ordering::SeqCst);
self.write_flag.store(false, Ordering::SeqCst);
res
}
}
impl<W: AsyncWriteExt + Unpin> Drop for MonitoredWriter<W> {
fn drop(&mut self) {
if self.write_flag.load(Ordering::SeqCst) {
warn!("the write_all_buf was cancelled and the buffer was not completed properly");
} else {
info!("Buffer ok, writing out to file.");
}
if self.held_buffer.len() > 0 {
let filename = "sent_magpie.data".to_owned();
std::fs::write(&filename, &self.held_buffer).unwrap();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment