Last active
February 7, 2021 02:15
-
-
Save benkay86/d0261cfff0c15076066efc02f46be2ef to your computer and use it in GitHub Desktop.
Write an output file in comma-separated value (CSV) format asynchronously using Rust and tokio.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//! Module for CSVOutputWriter, which writes a single unit of data to an output | |
//! sink using the comma separated value format. | |
use tokio::io::{AsyncWrite, AsyncWriteExt}; | |
use std::ops::Deref; | |
use tokio::sync::Mutex; | |
/// Type alias for a type that implementes AsyncWrite and is synchronized by a | |
/// mutex. For example: | |
/// | |
/// * `Mutex<tokio::fs::File>` | |
/// * `Mutex<Vec<u8>>` | |
/// | |
/// Note that the mutex is [`tokio::sync::Mutex`] rather than `std::sync::Mutex` | |
/// because we need to hold a lock on the mutex across `await` points. | |
pub type SynchronizedWrite = Mutex<dyn AsyncWrite + Send + Unpin>; | |
/// Synchronize access to a comma separated value (CSV) file. Aynchronously | |
/// writes out lines to the file. Example: | |
/// | |
/// ```no_run | |
/// use tokio::sync::Mutex; | |
/// use tokio::fs::File; | |
/// use output_writer::SynchronizedWrite; | |
/// # fn main() -> Result<(), std::error::Error> { | |
/// let writer: Box<SynchronizedWrite> = Box::new(Mutex::new(File::create("out.csv").await?)); | |
/// let writer = CSVOutputWriter::new(writer).build(); | |
/// let task = tokio::spawn(async move{ | |
/// // Output: value1,value2,value3 | |
/// writer.write(&["value1", "value2", "value3"]).await | |
/// }); | |
/// task.await??; | |
/// # Ok(()) | |
/// # } | |
/// ``` | |
pub struct CSVOutputWriter<T: Deref<Target = SynchronizedWrite>> | |
{ | |
/// A trait object that implements AsyncWrite and is guarded by a | |
/// [`tokio::sync::Mutex']. For example: | |
/// | |
/// * `Box<Mutex<tokio::fs::File>>` | |
/// * `Arc<Mutex<Vec<u8>>>` | |
pub writer: T, | |
/// Delimeter placed around each value, e.g. `"` --> `"val"`. | |
pub delim: String, | |
/// Line terminator, written at the end of each `write_line()`, e.g. `"\n"`. | |
pub newline: String, | |
/// Separator between values, e.g. `,` --> `val1,val2`. | |
pub sep: String, | |
} | |
impl<T: Deref<Target = SynchronizedWrite>> CSVOutputWriter<T> | |
{ | |
/// Consumes an iterator `line`, writing all the items to a single line, | |
/// e.g. | |
/// | |
/// ```no_run | |
/// # use tokio::sync::Mutex; | |
/// # use tokio::fs::File; | |
/// # use output_writer::SynchronizedWrite; | |
/// # fn main() -> Result<(), std::error::Error> { | |
/// let writer: Box<SynchronizedWrite> = Box::new(Mutex::new(File::create("out.csv").await?)).build(); | |
/// let writer = CSVOutputWriter::new(writer).build(); | |
/// writer.write(&["1", "2", "3"]).await?; | |
/// writer.write(&["a", "b", "c"]).await?; | |
/// // out.csv contains: | |
/// // 1,2,3 | |
/// // a,b,c | |
/// # Ok(()) | |
/// # } | |
/// ``` | |
pub async fn write_line<U, V>(&self, line: U) -> std::io::Result<()> where | |
U: IntoIterator<Item = V>, | |
V: AsRef<str> | |
{ | |
// Prepare this line of the CSV file for writing. | |
let line = line.into_iter() | |
.map(|s| [&self.delim, s.as_ref(), &self.delim].concat()) | |
.collect::<Vec<String>>() | |
.join(&self.sep) | |
+ &self.newline; | |
// Lock mutex for writer. | |
// Will only panic if the mutex is poisoned. | |
let mut writer = self.writer.lock().await; | |
// Write out the line. | |
writer.write_all(line.as_bytes()).await | |
// Lock automatically released here. | |
} | |
/// Flushes the writer, see [`tokio::io::AsyncWriteExt::flush()`]. | |
pub async fn flush(&self) -> std::io::Result<()> { | |
// Lock, flush, automatically unlock. | |
self.writer.lock().await.flush().await | |
} | |
} | |
/// Builder pattern for constructing a CSVOutputWriter, e.g. | |
/// | |
/// ```no_run | |
/// use tokio::sync::Mutex; | |
/// use tokio::fs::File; | |
/// use output_writer::SynchronizedWrite; | |
/// # fn main() -> Result<(), std::error::Error> { | |
/// let writer: Box<SynchronizedWrite> = Box::new(Mutex::new(File::create("out.csv").await?)).build(); | |
/// let writer = CSVOutputWriterBuilder::new(writer).sep(", ").build(); | |
/// # Ok(()) | |
/// # } | |
/// ``` | |
pub struct CSVOutputWriterBuilder<T: Deref<Target = SynchronizedWrite>> | |
{ | |
writer_: T, | |
delim_: String, | |
newline_: String, | |
sep_: String, | |
} | |
impl<T> std::convert::From<CSVOutputWriterBuilder<T>> | |
for CSVOutputWriter<T> | |
where T: Deref<Target = SynchronizedWrite> | |
{ | |
fn from(builder: CSVOutputWriterBuilder<T>) -> CSVOutputWriter<T> { | |
CSVOutputWriter { | |
writer: builder.writer_, | |
delim: builder.delim_, | |
newline: builder.newline_, | |
sep: builder.sep_, | |
} | |
} | |
} | |
impl<T: Deref<Target = SynchronizedWrite>> CSVOutputWriterBuilder<T> { | |
/// Creates a `CSVOutputWriterBuilder` from the given `writer` using default | |
/// options: | |
/// | |
/// * `delim: ""` | |
/// * `newline: "\n"` | |
/// * `sep: ","` | |
pub fn new(writer: T) -> Self { | |
CSVOutputWriterBuilder { | |
writer_: writer, // no default, must provide writer | |
delim_: "".into(), // default: empty string | |
newline_: "\n".into(), // default: newline character | |
sep_: ",".into(), // default: comma | |
} | |
} | |
/// Set the delimiter, default `","`. | |
pub fn delim<S: Into<String>>(self, delim: S) -> Self { | |
CSVOutputWriterBuilder { | |
writer_: self.writer_, | |
delim_: delim.into(), | |
newline_: self.newline_, | |
sep_: self.sep_, | |
} | |
} | |
/// Set the line terminator, default '"\n"`. | |
pub fn newline<S: Into<String>>(self, newline: S) -> Self { | |
CSVOutputWriterBuilder { | |
writer_: self.writer_, | |
delim_: self.delim_, | |
newline_: newline.into(), | |
sep_: self.sep_, | |
} | |
} | |
/// Set the separator, default `","`. | |
pub fn sep<S: Into<String>>(self, sep: S) -> Self { | |
CSVOutputWriterBuilder { | |
writer_: self.writer_, | |
delim_: self.delim_, | |
newline_: self.newline_, | |
sep_: sep.into(), | |
} | |
} | |
/// Convert this builder into a `CSVOutputWriter`. | |
pub fn build(self) -> CSVOutputWriter<T> { | |
self.into() | |
} | |
} | |
impl<T: Deref<Target = SynchronizedWrite>> CSVOutputWriter<T> { | |
/// Synonym for [`CSVOutputWriterBuilder::new()`]. | |
pub fn new(writer: T) -> CSVOutputWriterBuilder<T> { | |
CSVOutputWriterBuilder::new(writer) | |
} | |
} | |
#[cfg(test)] | |
mod tests { | |
use std::sync::Arc; | |
use super::*; | |
#[tokio::test] | |
// Test manual creation of CSVOutputWriter. | |
async fn test_write() { | |
let sink = Arc::new(Mutex::new(Vec::<u8>::new())); | |
{ | |
let writer: CSVOutputWriter<Arc<SynchronizedWrite>> = CSVOutputWriter { | |
writer: sink.clone() as Arc<SynchronizedWrite>, | |
delim: "".into(), | |
newline: "\n".into(), | |
sep: ", ".into() | |
}; | |
writer.write_line(&["1", "2", "3"]).await.unwrap(); | |
} | |
let sink = sink.lock().await; | |
let output = std::str::from_utf8(&sink).unwrap(); | |
assert!(output == "1, 2, 3\n"); | |
} | |
#[tokio::test] | |
// Test flush function. | |
async fn test_flush() { | |
let sink = Arc::new(Mutex::new(Vec::<u8>::new())); | |
let writer: CSVOutputWriter<Arc<SynchronizedWrite>> = CSVOutputWriter { | |
writer: sink.clone() as Arc<SynchronizedWrite>, | |
delim: "\"".into(), | |
newline: "\n".into(), | |
sep: ",".into() | |
}; | |
writer.write_line(&["1", "2", "3"]).await.unwrap(); | |
writer.flush().await.unwrap(); | |
let sink = sink.lock().await; | |
let output = std::str::from_utf8(&sink).unwrap(); | |
assert!(output == "\"1\",\"2\",\"3\"\n"); | |
} | |
#[tokio::test] | |
// Test sending writer to another thread. | |
async fn test_write_threaded() { | |
let sink = Arc::new(Mutex::new(Vec::<u8>::new())); | |
{ | |
let writer: Arc<CSVOutputWriter<Arc<SynchronizedWrite>>> = Arc::new(CSVOutputWriter { | |
writer: sink.clone() as Arc<SynchronizedWrite>, | |
delim: "".into(), | |
newline: "\n".into(), | |
sep: ",".into() | |
}); | |
// Send writer to another thread/task. | |
let task: tokio::task::JoinHandle<std::io::Result<()>>; | |
{ | |
let writer = writer.clone(); | |
task = tokio::spawn(async move { | |
writer.write_line(&["1", "2", "3"]).await?; | |
writer.flush().await | |
}); | |
} | |
// Wait for task to finish writing. | |
task.await.unwrap().unwrap(); | |
// Write some more on this thread. | |
writer.write_line(&["4", "5", "6"]).await.unwrap(); | |
} | |
let sink = sink.lock().await; | |
let output = std::str::from_utf8(&sink).unwrap(); | |
assert!(output == "1,2,3\n4,5,6\n"); | |
} | |
#[tokio::test] | |
// Test builder pattern. | |
async fn test_builder() { | |
let sink = Arc::new(Mutex::new(Vec::<u8>::new())); | |
{ | |
let sink: Arc<SynchronizedWrite> = sink.clone(); | |
let writer = CSVOutputWriter::new(sink) | |
.delim("'") | |
.newline("\n") | |
.sep("\t") | |
.build(); | |
writer.write_line(&["1", "2", "3"]).await.unwrap(); | |
} | |
let sink = sink.lock().await; | |
let output = std::str::from_utf8(&sink).unwrap(); | |
assert!(output == "'1'\t'2'\t'3'\n"); | |
} | |
#[tokio::test] | |
// Test builder pattern with defaults. | |
async fn test_builder_defaults() { | |
let sink = Arc::new(Mutex::new(Vec::<u8>::new())); | |
{ | |
let sink: Arc<SynchronizedWrite> = sink.clone(); | |
let writer = CSVOutputWriter::new(sink).build(); | |
writer.write_line(&["1", "2", "3"]).await.unwrap(); | |
} | |
let sink = sink.lock().await; | |
let output = std::str::from_utf8(&sink).unwrap(); | |
assert!(output == "1,2,3\n"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment