Last active
May 13, 2023 01:57
-
-
Save rrbutani/beea08f1a7a5e2802008e56f401e10be to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
... | |
see the revision history for lock files, if required |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use flake |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
result* | |
.direnv | |
target | |
perf.data* | |
*.json |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#![allow(unused_imports)] | |
use std::{ | |
error::Error, | |
pin::Pin, | |
task::{Context, Poll}, | |
}; | |
use futures::{SinkExt, StreamExt, TryStreamExt}; | |
use memmap2::MmapOptions; | |
use tokio::{ | |
fs, | |
io::{self, AsyncBufReadExt, AsyncReadExt, AsyncWrite}, | |
sync::mpsc, | |
}; | |
use tokio_stream::wrappers::ReceiverStream; | |
use tokio_util::{io::{ReaderStream, StreamReader, SinkWriter}, sync::PollSender}; | |
use tracing::{info, info_span, instrument}; | |
use tracing_chrome::ChromeLayerBuilder; | |
use tracing_futures::Instrument; | |
use tracing_subscriber::prelude::*; | |
#[derive(Debug)] | |
struct LineCounter<'a>(&'a mut usize); | |
impl AsyncWrite for LineCounter<'_> { | |
#[instrument] | |
fn poll_write( | |
self: Pin<&mut Self>, | |
_cx: &mut Context<'_>, | |
buf: &[u8], | |
) -> Poll<Result<usize, std::io::Error>> { | |
*self.get_mut().0 += buf.iter().filter(|&&c| c == b'\n').count(); | |
Poll::Ready(Ok(buf.len())) | |
} | |
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { | |
Poll::Ready(Ok(())) | |
} | |
fn poll_shutdown( | |
self: Pin<&mut Self>, | |
_cx: &mut Context<'_>, | |
) -> Poll<Result<(), std::io::Error>> { | |
Poll::Ready(Ok(())) | |
} | |
} | |
#[tokio::main(flavor = "multi_thread")] | |
async fn main() -> Result<(), Box<dyn Error + Sync + Send + 'static>> { | |
let _guard = if std::env::var("TRACE").is_ok() { | |
let (chrome_layer, _guard) = ChromeLayerBuilder::new().build(); | |
tracing_subscriber::registry().with(chrome_layer).init(); | |
Some(_guard) | |
} else { | |
None | |
}; | |
let input = std::env::args().nth(1).unwrap(); | |
realmain(input).await | |
} | |
#[tracing::instrument] | |
async fn realmain(input: String) -> Result<(), Box<dyn Error + Sync + Send + 'static>> { | |
let input = std::fs::File::open(input)?; | |
let mmap = unsafe { MmapOptions::new().map(&input)? }; | |
let input = &*mmap; | |
let input: &'static _ = unsafe { core::mem::transmute(input) }; // yucky, TODO: fix | |
// let input = fs::File::open(input).await?; | |
// let input = io::BufReader::with_capacity(8 * 1024, input); | |
// let input = io::BufReader::new(input); | |
// ^^^^ wrapping in a BufReader is unnecessary after mmapping. | |
let reader = async_compression::tokio::bufread::ZstdDecoder::new(input); | |
// let reader = io::BufReader::with_capacity(8 * 1024, reader); // TODO: check if this helps.. | |
// update: it does not | |
let mut line_count = 0; | |
// ~101ms | |
// stream based | |
/* | |
let mut stream = ReaderStream::new(reader).instrument(info_span!("decompress")); | |
while let Some(chunk) = stream.next().await { | |
let processing_span = info_span!("processing"); | |
let _enter = processing_span.enter(); | |
line_count += chunk?.iter().filter(|&&c| c == b'\n').count(); | |
} | |
*/ | |
// ~235ms | |
// `AsyncRead` based, "framed" | |
/* | |
let mut reader = reader; | |
let mut buf = vec![]; | |
while reader.read_until(b'\n', &mut buf).await? != 0 { | |
line_count += 1; | |
} | |
*/ | |
// ~96ms | |
// `AsyncRead` based, unframed | |
/* | |
let mut reader = reader; | |
let mut buf = [0; 8 * 1024]; | |
loop { | |
match reader.read(&mut buf).await? { | |
0 => break, | |
n => line_count += buf[0..n].iter().filter(|&&c| c == b'\n').count(), | |
} | |
} | |
*/ | |
// We _can_ invert the control flow and use the AsyncWrite version of the | |
// zstd decompressor and have it feed into our own AsyncWrite impl (via the | |
// `Sink` to `AsyncWrite` adapter, if we want) but it seems very likely this | |
// will give us more or less the exact same perf as the stream impl above. | |
// ~78ms | |
// channel based, buffer, `tokio::sync::mpsc` (assuming futures::channel will perform similarly) | |
// | |
// we're sending `Bytes` buffers which are `Arc` backed so we shouldn't be | |
// doing any real extra copies? | |
/* | |
let stream = ReaderStream::new(reader).instrument(info_span!("decompress")); | |
let (tx, rx) = mpsc::channel(8 * 1204 * 1024); | |
let (tx, rx) = (PollSender::new(tx), ReceiverStream::new(rx)); | |
tokio::spawn(stream.map(|r| Ok(r.unwrap())).forward(tx)); | |
line_count = rx | |
.instrument(info_span!("processing")) | |
.map(|b| b.iter().filter(|&&c| c == b'\n').count()) | |
.fold(line_count, |a, b| async move { a + b }) | |
.await; | |
*/ | |
// ~94ms | |
// direct; as a reference point: | |
/* | |
let mut counter = LineCounter(&mut line_count); | |
let mut reader = reader; | |
tokio::io::copy(&mut reader, &mut counter).await.unwrap(); | |
*/ | |
// ~72ms | |
// async-ringbuf | |
/**/ | |
let (prod, mut cons) = async_ringbuf::AsyncRb::new(1 * 1024 * 1024).split(); | |
// We do this so we can trace: | |
// | |
// Note that this _does_ add ~10ms of overhead! | |
// let reader = ReaderStream::new(reader).instrument(info_span!("decompress")); | |
// let reader = StreamReader::new(reader); | |
tokio::spawn(async move { | |
let mut reader = reader; | |
let mut prod = prod; | |
tokio::io::copy(&mut reader, &mut prod).await | |
}); | |
/* | |
line_count = cons /* ridiculously slow; >30 seconds */ | |
.instrument(info_span!("processing")) | |
.filter(|&c| async move { c == b'\n' }) | |
.count() | |
.await; | |
*/ | |
let mut counter = LineCounter(&mut line_count); | |
tokio::io::copy(&mut cons, &mut counter).await.unwrap(); | |
// */ | |
println!("lines: {}", line_count); | |
Ok(()) | |
} | |
// Am I missing something? | |
// | |
// There should be combinators to turn any async stream into a "fills up a | |
// ringbuffer, lets another future drain from the ringbuffer in another thread" | |
// kind of guy right? | |
// | |
// I guess the issue is that you now have two futures; i.e. you can't do ^ in | |
// a runtime agnostic manner. You need to be aware of `tokio::spawn` or | |
// whatever. | |
// Also why does `tracing` not have adapters to instrument | |
// `AsyncRead`/`AsyncWrite`? | |
// Also these are actually kind of neat: | |
// https://rust-lang.github.io/wg-async/vision/submitted_stories/status_quo/barbara_carefully_dismisses_embedded_future.html | |
// Also I know it's still a problem for people with very large crates but, rust | |
// compile times — even with aggressive optimizations enables — have gotten | |
// pretty excellent. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "playtest" | |
version = "0.0.0" | |
edition = "2021" | |
rust-version = "1.67" | |
publish = false | |
default-run = "sync" | |
[[bin]] | |
name = "sync" | |
path = "sync.rs" | |
required-features = ["sync"] | |
[[bin]] | |
name = "async" | |
path = "async.rs" | |
required-features = ["async"] | |
[dependencies] | |
###################### Sync ###################### | |
bump_alloc = { version = "0.1.0", optional = true } | |
color-eyre = { version = "0.6", optional = true } | |
clap = { version = "4.2", optional = true, features = ["derive"] } | |
zstd = { version = "0.12", optional = true } | |
ringbuf = "0.3" | |
##################### Async ####################### | |
tokio = { version = "1.28.1", features = ["macros", "rt-multi-thread", "fs", "io-util", "sync"], optional = true } | |
tokio-stream = { version = "0.1", optional = true } | |
tokio-util = { version = "0.7", features = ["io"], optional = true } | |
async-compression = { version = "0.4", features = ["zstd", "tokio"], optional = true } | |
tracing-chrome = { version = "0.7.1", optional = true } | |
tracing-subscriber = { version = "0.3.17", optional = true } | |
tracing = { version = "0.1.37", optional = true } | |
tracing-futures = { version = "0.2", features = ["futures-03"], optional = true } | |
futures = { version = "0.3.28", optional = true } | |
async-ringbuf = { version = "0.1.3", features = ["impl-tokio"], optional = true } | |
################################################### | |
memmap2 = { version = "0.6" } | |
[dependencies.zstd-sys] | |
version = "2.0.8" | |
# Picking between these two does not appear to make any difference to execution | |
# time. | |
features = ["pkg-config"] | |
# features = ["fat-lto"] | |
################################################### | |
[features] | |
default = ["sync", "async"] | |
sync = ["bump_alloc", "clap", "color-eyre", "zstd"] | |
async = ["tokio", "tokio-stream", "tokio-util", "async-compression", "tracing-chrome", "tracing-subscriber", "tracing", "tracing-futures", "futures", "async-ringbuf"] | |
[profile.release] | |
opt-level = 3 | |
codegen-units = 1 | |
# incremental = false | |
lto = "fat" | |
panic = "abort" | |
# debug = true | |
# rustflags = [ "-C", "target-cpu=sapphirerapids" ] | |
[profile.bench] | |
inherits = "release" | |
debug = true |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
inputs.nixpkgs.url = github:NixOS/nixpkgs/nixos-unstable; | |
inputs.flake-utils.url = github:numtide/flake-utils; | |
outputs = { flake-utils, nixpkgs, ... }: flake-utils.lib.eachDefaultSystem (sys: { | |
devShells.default = let | |
np = nixpkgs.legacyPackages.${sys}; | |
# We're doing cross-lang LTO so we need to use the same LLVM version! | |
inherit (np.rustPlatform.rust.rustc) llvmPackages; | |
stdenv = llvmPackages.stdenv.override (old: | |
# `lld` doesn' quite work well enough yet on macOS for us to do this.. | |
# (`-no_uuid`, etc. aren't supported) | |
np.lib.optionalAttrs (!(np.stdenv.hostPlatform.isDarwin)) { | |
allowedRequisites = null; | |
cc = old.cc.override { | |
inherit (llvmPackages) bintools; | |
}; | |
} | |
); | |
rustPlatform' = np.makeRustPlatform { | |
inherit (np.rustPlatform.rust) rustc cargo; | |
inherit stdenv; | |
}; | |
in (np.mkShell.override { inherit stdenv; }) (with np; { | |
nativeBuildInputs = [ pkg-config ] | |
++ [ clippy rustfmt ] | |
++ (with rustPlatform'; [ rustc cargo ]) | |
++ [ rust-analyzer rustfmt cargo-flamegraph hyperfine ] | |
; | |
buildInputs = [ zstd ] | |
++ lib.optional stdenv.hostPlatform.isDarwin libiconv | |
; | |
# shellHook = np.lib.optionalString stdenv.hostPlatform.isDarwin '' | |
# export NIX_CFLAGS_COMPILE+=" -fuse-ld=lld" | |
# ''; | |
# RUSTCFLAGS = np.lib.optionalString stdenv.hostPlatform.isDarwin "-C link-arg=-fuse-ld=lld"; | |
RUSTCFLAGS = "-C target-cpu=native"; | |
}); | |
}); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::{ | |
fs::File, | |
io::{BufRead, BufReader, BufWriter, Error, ErrorKind, Read, Write}, | |
path::PathBuf, | |
}; | |
use clap::Parser; | |
use color_eyre::Result; | |
use memmap2::MmapOptions; | |
use zstd::Decoder; | |
// Does not improve performance: | |
// #[global_allocator] | |
// static ALLOCATOR: bump_alloc::BumpAlloc = bump_alloc::BumpAlloc::new(); | |
#[derive(Debug, clap::Parser)] | |
#[command(version, about)] | |
struct Args { | |
#[arg(short, long)] | |
input: PathBuf, | |
} | |
fn main() -> Result<()> { | |
color_eyre::install()?; | |
let args = Args::parse(); | |
let input = File::open(args.input)?; | |
// let reader = BufReader::with_capacity(8 * 1024, input); | |
let mmap = unsafe { MmapOptions::new().map(&input)? }; | |
let reader = &*mmap; | |
let reader = Decoder::with_buffer(reader)?; | |
let mut reader = BufReader::with_capacity(8 * 1024, reader); | |
// ^ adding the outer buf reader improves perf considerably | |
// | |
// ``` | |
// ❯ time zstdcat base-json/nixpkgs-dump.json.zst | wc -l | |
// 10253321 | |
// | |
// real 0m0.207s | |
// user 0m0.201s | |
// sys 0m0.200s | |
// ``` | |
// | |
// Slow, has to check and construct strings: | |
// println!("lines: {}", reader.lines().count()); | |
// Even slower: | |
// let lines = reader.bytes().filter(|b| matches!(b, Ok(b'\n'))).count(); | |
// println!("lines: {lines}"); | |
// Close: | |
// ``` | |
// ❯ hyperfine "zstdcat --single-thread base-json/nixpkgs-dump.json.zst | wc -l" "target/release/main --input base-json/nixpkgs-dump.json.zst" | |
// Benchmark 1: zstdcat --single-thread base-json/nixpkgs-dump.json.zst | wc -l | |
// Time (mean ± σ): 359.5 ms ± 58.9 ms [User: 300.1 ms, System: 220.9 ms] | |
// Range (min … max): 265.7 ms … 463.6 ms 10 runs | |
// | |
// Benchmark 2: target/release/main --input base-json/nixpkgs-dump.json.zst | |
// Time (mean ± σ): 425.4 ms ± 25.7 ms [User: 416.8 ms, System: 8.1 ms] | |
// Range (min … max): 396.4 ms … 474.8 ms 10 runs | |
// ``` | |
/* | |
let mut lines = 0; | |
while let Ok(buf) = reader.fill_buf() { | |
let read = buf.len(); | |
lines += buf.iter().filter(|&&c| c == b'\n').count(); | |
if read == 0 { break } | |
reader.consume(read); | |
} | |
println!("lines: {lines}"); | |
*/ | |
let mut lines = 0; | |
pub struct LineCounter<'a>(&'a mut usize); | |
impl<'a> Write for LineCounter<'a> { | |
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { | |
*self.0 += buf.iter().filter(|&&c| c == b'\n').count(); | |
Ok(buf.len()) | |
} | |
fn flush(&mut self) -> std::io::Result<()> { | |
Ok(()) | |
} | |
} | |
let mut counter = LineCounter(&mut lines); | |
// ~240ms compared to ~123ms | |
/* | |
std::io::copy(&mut reader, &mut counter).unwrap(); | |
*/ | |
// ~310ms; extra copy + realloc makes perf worse presumably | |
/* | |
let mut counter = BufWriter::new(counter); | |
std::io::copy(&mut reader, &mut counter).unwrap(); | |
drop(counter); | |
*/ | |
// with `-C target-cpu=native`: | |
// | |
// emits AVX2 instructions; gets to within spitting distance of `zstd` + | |
// `wc` | |
// std::io::copy(&mut reader, &mut counter).unwrap(); | |
// as a baseline; this is ~95ms | |
// same as having `write` here do nothing | |
/* | |
struct Count<'a>(&'a mut usize); | |
impl Write for Count<'_> { | |
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { | |
*self.0 += buf.len(); | |
Ok(buf.len()) | |
} | |
fn flush(&mut self) -> std::io::Result<()> { | |
Ok(()) | |
} | |
} | |
std::io::copy(&mut reader, &mut Count(&mut lines)).unwrap(); | |
*/ | |
// channel based approach | |
// ... | |
// | |
// actually nah, I don't really want to write this | |
// /* | |
let rb = ringbuf::HeapRb::new(1024 * 1024 * 1); | |
let (mut prod, mut cons) = rb.split(); | |
let (end_tx, end_rx) = std::sync::mpsc::channel::<()>(); | |
std::thread::scope(|s| { | |
s.spawn(move || { | |
let mut done = false; | |
loop { | |
let copied = cons.write_into(&mut counter, None).unwrap(); | |
if copied == 0 { | |
if done && cons.is_empty() { | |
break; | |
} | |
if let Ok(()) = end_rx.recv_timeout(std::time::Duration::from_micros(0)) { | |
done = true; | |
} | |
} | |
} | |
}); | |
loop { | |
/* | |
match std::io::copy(&mut reader, &mut prod) { | |
Ok(_) => break, | |
Err(e) if e.kind() == ErrorKind::WouldBlock => { | |
eprintln!("{e:?}"); | |
}, | |
e => { e.unwrap(); }, | |
} | |
*/ | |
// We'd like to use `io::copy` but unfortunately our ring buffer | |
// does not _block_ when full but instead returns `WouldBlock`... | |
// | |
// `io::copy` does not handle this gracefully; it goes and tosses | |
// what it's pulled out of the reader when this happens. This makes | |
// us lose data. | |
// | |
// So, we write the `io::copy` loop ourselves in a worse manner. | |
// | |
// This also wastes CPU; when the buffer is full we'd like to put | |
// our thread to sleep until it has capacity. | |
// Also, as an aside, it's pretty wild that zstd can overwhelm the | |
// line counting consumer here; the consumer gets vectorized and | |
// should be fast: https://rust.godbolt.org/z/963Tsz65s | |
// | |
// (only if you build with `-C target-cpu=...` though; need AVX2!) | |
if !prod.is_full() { | |
let buf = reader.fill_buf().unwrap(); | |
if buf.is_empty() { | |
break; | |
} | |
let buf = &buf[0..(prod.free_len().min(buf.len()))]; | |
let written = prod.write(buf).unwrap(); | |
reader.consume(written); | |
} | |
} | |
prod.flush().unwrap(); | |
end_tx.send(()).unwrap() | |
}); | |
// */ | |
println!("lines: {lines}"); | |
// assert_eq!(lines, 10253321); | |
// println!("{} packages", packages.0.len()); | |
Ok(()) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Show hidden characters
{ | |
"folders": [ | |
{ | |
"path": "." | |
} | |
], | |
"settings": { | |
"rust-analyzer.server.path": "rust-analyzer" | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment