Skip to content

Instantly share code, notes, and snippets.

@arjunsk
Created May 1, 2022 19:28
Show Gist options
  • Save arjunsk/24437a90968e0c21d1b72612700b6ba3 to your computer and use it in GitHub Desktop.
Save arjunsk/24437a90968e0c21d1b72612700b6ba3 to your computer and use it in GitHub Desktop.
/// Write a single `Frame` value to the underlying stream.
///
/// The `Frame` value is written to the socket using the various `write_*`
/// functions provided by `AsyncWrite`. Calling these functions directly on
/// a `TcpStream` is **not** advised, as this will result in a large number of
/// syscalls. However, it is fine to call these functions on a *buffered*
/// write stream. The data will be written to the buffer. Once the buffer is
/// full, it is flushed to the underlying socket.
pub async fn write_frame(&mut self, frame: &Frame) -> io::Result<()> {
// Arrays are encoded by encoding each entry. All other frame types are
// considered literals. For now, mini-redis is not able to encode
// recursive frame structures. See below for more details.
match frame {
Frame::Array(val) => {
// Encode the frame type prefix. For an array, it is `*`.
self.stream.write_u8(b'*').await?;
// Encode the length of the array.
self.write_decimal(val.len() as u64).await?;
// Iterate and encode each entry in the array.
for entry in &**val {
self.write_value(entry).await?;
}
}
// The frame type is a literal. Encode the value directly.
_ => self.write_value(frame).await?,
}
// Ensure the encoded frame is written to the socket. The calls above
// are to the buffered stream and writes. Calling `flush` writes the
// remaining contents of the buffer to the socket.
self.stream.flush().await
}
/// Write a frame literal to the stream
async fn write_value(&mut self, frame: &Frame) -> io::Result<()> {
match frame {
Frame::Simple(val) => {
self.stream.write_u8(b'+').await?;
self.stream.write_all(val.as_bytes()).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Error(val) => {
self.stream.write_u8(b'-').await?;
self.stream.write_all(val.as_bytes()).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Integer(val) => {
self.stream.write_u8(b':').await?;
self.write_decimal(*val).await?;
}
Frame::Null => {
self.stream.write_all(b"$-1\r\n").await?;
}
Frame::Bulk(val) => {
let len = val.len();
self.stream.write_u8(b'$').await?;
self.write_decimal(len as u64).await?;
self.stream.write_all(val).await?;
self.stream.write_all(b"\r\n").await?;
}
// Encoding an `Array` from within a value cannot be done using a
// recursive strategy. In general, async fns do not support
// recursion. Mini-redis has not needed to encode nested arrays yet,
// so for now it is skipped.
Frame::Array(_val) => unreachable!(),
}
Ok(())
}
/// Write a decimal frame to the stream
async fn write_decimal(&mut self, val: u64) -> io::Result<()> {
use std::io::Write;
// Convert the value to a string
let mut buf = [0u8; 20];
let mut buf = Cursor::new(&mut buf[..]);
write!(&mut buf, "{}", val)?;
let pos = buf.position() as usize;
self.stream.write_all(&buf.get_ref()[..pos]).await?;
self.stream.write_all(b"\r\n").await?;
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment