Skip to content

Instantly share code, notes, and snippets.

@rrbutani
Last active May 13, 2023 01:57
Show Gist options
  • Save rrbutani/beea08f1a7a5e2802008e56f401e10be to your computer and use it in GitHub Desktop.
Save rrbutani/beea08f1a7a5e2802008e56f401e10be to your computer and use it in GitHub Desktop.
...
see the revision history for lock files, if required
use flake
result*
.direnv
target
perf.data*
*.json
#![allow(unused_imports)]
use std::{
error::Error,
pin::Pin,
task::{Context, Poll},
};
use futures::{SinkExt, StreamExt, TryStreamExt};
use memmap2::MmapOptions;
use tokio::{
fs,
io::{self, AsyncBufReadExt, AsyncReadExt, AsyncWrite},
sync::mpsc,
};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::{io::{ReaderStream, StreamReader, SinkWriter}, sync::PollSender};
use tracing::{info, info_span, instrument};
use tracing_chrome::ChromeLayerBuilder;
use tracing_futures::Instrument;
use tracing_subscriber::prelude::*;
#[derive(Debug)]
struct LineCounter<'a>(&'a mut usize);
impl AsyncWrite for LineCounter<'_> {
#[instrument]
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
*self.get_mut().0 += buf.iter().filter(|&&c| c == b'\n').count();
Poll::Ready(Ok(buf.len()))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
Poll::Ready(Ok(()))
}
}
#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), Box<dyn Error + Sync + Send + 'static>> {
let _guard = if std::env::var("TRACE").is_ok() {
let (chrome_layer, _guard) = ChromeLayerBuilder::new().build();
tracing_subscriber::registry().with(chrome_layer).init();
Some(_guard)
} else {
None
};
let input = std::env::args().nth(1).unwrap();
realmain(input).await
}
#[tracing::instrument]
async fn realmain(input: String) -> Result<(), Box<dyn Error + Sync + Send + 'static>> {
let input = std::fs::File::open(input)?;
let mmap = unsafe { MmapOptions::new().map(&input)? };
let input = &*mmap;
let input: &'static _ = unsafe { core::mem::transmute(input) }; // yucky, TODO: fix
// let input = fs::File::open(input).await?;
// let input = io::BufReader::with_capacity(8 * 1024, input);
// let input = io::BufReader::new(input);
// ^^^^ wrapping in a BufReader is unnecessary after mmapping.
let reader = async_compression::tokio::bufread::ZstdDecoder::new(input);
// let reader = io::BufReader::with_capacity(8 * 1024, reader); // TODO: check if this helps..
// update: it does not
let mut line_count = 0;
// ~101ms
// stream based
/*
let mut stream = ReaderStream::new(reader).instrument(info_span!("decompress"));
while let Some(chunk) = stream.next().await {
let processing_span = info_span!("processing");
let _enter = processing_span.enter();
line_count += chunk?.iter().filter(|&&c| c == b'\n').count();
}
*/
// ~235ms
// `AsyncRead` based, "framed"
/*
let mut reader = reader;
let mut buf = vec![];
while reader.read_until(b'\n', &mut buf).await? != 0 {
line_count += 1;
}
*/
// ~96ms
// `AsyncRead` based, unframed
/*
let mut reader = reader;
let mut buf = [0; 8 * 1024];
loop {
match reader.read(&mut buf).await? {
0 => break,
n => line_count += buf[0..n].iter().filter(|&&c| c == b'\n').count(),
}
}
*/
// We _can_ invert the control flow and use the AsyncWrite version of the
// zstd decompressor and have it feed into our own AsyncWrite impl (via the
// `Sink` to `AsyncWrite` adapter, if we want) but it seems very likely this
// will give us more or less the exact same perf as the stream impl above.
// ~78ms
// channel based, buffer, `tokio::sync::mpsc` (assuming futures::channel will perform similarly)
//
// we're sending `Bytes` buffers which are `Arc` backed so we shouldn't be
// doing any real extra copies?
/*
let stream = ReaderStream::new(reader).instrument(info_span!("decompress"));
let (tx, rx) = mpsc::channel(8 * 1204 * 1024);
let (tx, rx) = (PollSender::new(tx), ReceiverStream::new(rx));
tokio::spawn(stream.map(|r| Ok(r.unwrap())).forward(tx));
line_count = rx
.instrument(info_span!("processing"))
.map(|b| b.iter().filter(|&&c| c == b'\n').count())
.fold(line_count, |a, b| async move { a + b })
.await;
*/
// ~94ms
// direct; as a reference point:
/*
let mut counter = LineCounter(&mut line_count);
let mut reader = reader;
tokio::io::copy(&mut reader, &mut counter).await.unwrap();
*/
// ~72ms
// async-ringbuf
/**/
let (prod, mut cons) = async_ringbuf::AsyncRb::new(1 * 1024 * 1024).split();
// We do this so we can trace:
//
// Note that this _does_ add ~10ms of overhead!
// let reader = ReaderStream::new(reader).instrument(info_span!("decompress"));
// let reader = StreamReader::new(reader);
tokio::spawn(async move {
let mut reader = reader;
let mut prod = prod;
tokio::io::copy(&mut reader, &mut prod).await
});
/*
line_count = cons /* ridiculously slow; >30 seconds */
.instrument(info_span!("processing"))
.filter(|&c| async move { c == b'\n' })
.count()
.await;
*/
let mut counter = LineCounter(&mut line_count);
tokio::io::copy(&mut cons, &mut counter).await.unwrap();
// */
println!("lines: {}", line_count);
Ok(())
}
// Am I missing something?
//
// There should be combinators to turn any async stream into a "fills up a
// ringbuffer, lets another future drain from the ringbuffer in another thread"
// kind of guy right?
//
// I guess the issue is that you now have two futures; i.e. you can't do ^ in
// a runtime agnostic manner. You need to be aware of `tokio::spawn` or
// whatever.
// Also why does `tracing` not have adapters to instrument
// `AsyncRead`/`AsyncWrite`?
// Also these are actually kind of neat:
// https://rust-lang.github.io/wg-async/vision/submitted_stories/status_quo/barbara_carefully_dismisses_embedded_future.html
// Also I know it's still a problem for people with very large crates but, rust
// compile times — even with aggressive optimizations enables — have gotten
// pretty excellent.
[package]
name = "playtest"
version = "0.0.0"
edition = "2021"
rust-version = "1.67"
publish = false
default-run = "sync"
[[bin]]
name = "sync"
path = "sync.rs"
required-features = ["sync"]
[[bin]]
name = "async"
path = "async.rs"
required-features = ["async"]
[dependencies]
###################### Sync ######################
bump_alloc = { version = "0.1.0", optional = true }
color-eyre = { version = "0.6", optional = true }
clap = { version = "4.2", optional = true, features = ["derive"] }
zstd = { version = "0.12", optional = true }
ringbuf = "0.3"
##################### Async #######################
tokio = { version = "1.28.1", features = ["macros", "rt-multi-thread", "fs", "io-util", "sync"], optional = true }
tokio-stream = { version = "0.1", optional = true }
tokio-util = { version = "0.7", features = ["io"], optional = true }
async-compression = { version = "0.4", features = ["zstd", "tokio"], optional = true }
tracing-chrome = { version = "0.7.1", optional = true }
tracing-subscriber = { version = "0.3.17", optional = true }
tracing = { version = "0.1.37", optional = true }
tracing-futures = { version = "0.2", features = ["futures-03"], optional = true }
futures = { version = "0.3.28", optional = true }
async-ringbuf = { version = "0.1.3", features = ["impl-tokio"], optional = true }
###################################################
memmap2 = { version = "0.6" }
[dependencies.zstd-sys]
version = "2.0.8"
# Picking between these two does not appear to make any difference to execution
# time.
features = ["pkg-config"]
# features = ["fat-lto"]
###################################################
[features]
default = ["sync", "async"]
sync = ["bump_alloc", "clap", "color-eyre", "zstd"]
async = ["tokio", "tokio-stream", "tokio-util", "async-compression", "tracing-chrome", "tracing-subscriber", "tracing", "tracing-futures", "futures", "async-ringbuf"]
[profile.release]
opt-level = 3
codegen-units = 1
# incremental = false
lto = "fat"
panic = "abort"
# debug = true
# rustflags = [ "-C", "target-cpu=sapphirerapids" ]
[profile.bench]
inherits = "release"
debug = true
{
inputs.nixpkgs.url = github:NixOS/nixpkgs/nixos-unstable;
inputs.flake-utils.url = github:numtide/flake-utils;
outputs = { flake-utils, nixpkgs, ... }: flake-utils.lib.eachDefaultSystem (sys: {
devShells.default = let
np = nixpkgs.legacyPackages.${sys};
# We're doing cross-lang LTO so we need to use the same LLVM version!
inherit (np.rustPlatform.rust.rustc) llvmPackages;
stdenv = llvmPackages.stdenv.override (old:
# `lld` doesn' quite work well enough yet on macOS for us to do this..
# (`-no_uuid`, etc. aren't supported)
np.lib.optionalAttrs (!(np.stdenv.hostPlatform.isDarwin)) {
allowedRequisites = null;
cc = old.cc.override {
inherit (llvmPackages) bintools;
};
}
);
rustPlatform' = np.makeRustPlatform {
inherit (np.rustPlatform.rust) rustc cargo;
inherit stdenv;
};
in (np.mkShell.override { inherit stdenv; }) (with np; {
nativeBuildInputs = [ pkg-config ]
++ [ clippy rustfmt ]
++ (with rustPlatform'; [ rustc cargo ])
++ [ rust-analyzer rustfmt cargo-flamegraph hyperfine ]
;
buildInputs = [ zstd ]
++ lib.optional stdenv.hostPlatform.isDarwin libiconv
;
# shellHook = np.lib.optionalString stdenv.hostPlatform.isDarwin ''
# export NIX_CFLAGS_COMPILE+=" -fuse-ld=lld"
# '';
# RUSTCFLAGS = np.lib.optionalString stdenv.hostPlatform.isDarwin "-C link-arg=-fuse-ld=lld";
RUSTCFLAGS = "-C target-cpu=native";
});
});
}
use std::{
fs::File,
io::{BufRead, BufReader, BufWriter, Error, ErrorKind, Read, Write},
path::PathBuf,
};
use clap::Parser;
use color_eyre::Result;
use memmap2::MmapOptions;
use zstd::Decoder;
// Does not improve performance:
// #[global_allocator]
// static ALLOCATOR: bump_alloc::BumpAlloc = bump_alloc::BumpAlloc::new();
#[derive(Debug, clap::Parser)]
#[command(version, about)]
struct Args {
#[arg(short, long)]
input: PathBuf,
}
fn main() -> Result<()> {
color_eyre::install()?;
let args = Args::parse();
let input = File::open(args.input)?;
// let reader = BufReader::with_capacity(8 * 1024, input);
let mmap = unsafe { MmapOptions::new().map(&input)? };
let reader = &*mmap;
let reader = Decoder::with_buffer(reader)?;
let mut reader = BufReader::with_capacity(8 * 1024, reader);
// ^ adding the outer buf reader improves perf considerably
//
// ```
// ❯ time zstdcat base-json/nixpkgs-dump.json.zst | wc -l
// 10253321
//
// real 0m0.207s
// user 0m0.201s
// sys 0m0.200s
// ```
//
// Slow, has to check and construct strings:
// println!("lines: {}", reader.lines().count());
// Even slower:
// let lines = reader.bytes().filter(|b| matches!(b, Ok(b'\n'))).count();
// println!("lines: {lines}");
// Close:
// ```
// ❯ hyperfine "zstdcat --single-thread base-json/nixpkgs-dump.json.zst | wc -l" "target/release/main --input base-json/nixpkgs-dump.json.zst"
// Benchmark 1: zstdcat --single-thread base-json/nixpkgs-dump.json.zst | wc -l
// Time (mean ± σ): 359.5 ms ± 58.9 ms [User: 300.1 ms, System: 220.9 ms]
// Range (min … max): 265.7 ms … 463.6 ms 10 runs
//
// Benchmark 2: target/release/main --input base-json/nixpkgs-dump.json.zst
// Time (mean ± σ): 425.4 ms ± 25.7 ms [User: 416.8 ms, System: 8.1 ms]
// Range (min … max): 396.4 ms … 474.8 ms 10 runs
// ```
/*
let mut lines = 0;
while let Ok(buf) = reader.fill_buf() {
let read = buf.len();
lines += buf.iter().filter(|&&c| c == b'\n').count();
if read == 0 { break }
reader.consume(read);
}
println!("lines: {lines}");
*/
let mut lines = 0;
pub struct LineCounter<'a>(&'a mut usize);
impl<'a> Write for LineCounter<'a> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
*self.0 += buf.iter().filter(|&&c| c == b'\n').count();
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
let mut counter = LineCounter(&mut lines);
// ~240ms compared to ~123ms
/*
std::io::copy(&mut reader, &mut counter).unwrap();
*/
// ~310ms; extra copy + realloc makes perf worse presumably
/*
let mut counter = BufWriter::new(counter);
std::io::copy(&mut reader, &mut counter).unwrap();
drop(counter);
*/
// with `-C target-cpu=native`:
//
// emits AVX2 instructions; gets to within spitting distance of `zstd` +
// `wc`
// std::io::copy(&mut reader, &mut counter).unwrap();
// as a baseline; this is ~95ms
// same as having `write` here do nothing
/*
struct Count<'a>(&'a mut usize);
impl Write for Count<'_> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
*self.0 += buf.len();
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
std::io::copy(&mut reader, &mut Count(&mut lines)).unwrap();
*/
// channel based approach
// ...
//
// actually nah, I don't really want to write this
// /*
let rb = ringbuf::HeapRb::new(1024 * 1024 * 1);
let (mut prod, mut cons) = rb.split();
let (end_tx, end_rx) = std::sync::mpsc::channel::<()>();
std::thread::scope(|s| {
s.spawn(move || {
let mut done = false;
loop {
let copied = cons.write_into(&mut counter, None).unwrap();
if copied == 0 {
if done && cons.is_empty() {
break;
}
if let Ok(()) = end_rx.recv_timeout(std::time::Duration::from_micros(0)) {
done = true;
}
}
}
});
loop {
/*
match std::io::copy(&mut reader, &mut prod) {
Ok(_) => break,
Err(e) if e.kind() == ErrorKind::WouldBlock => {
eprintln!("{e:?}");
},
e => { e.unwrap(); },
}
*/
// We'd like to use `io::copy` but unfortunately our ring buffer
// does not _block_ when full but instead returns `WouldBlock`...
//
// `io::copy` does not handle this gracefully; it goes and tosses
// what it's pulled out of the reader when this happens. This makes
// us lose data.
//
// So, we write the `io::copy` loop ourselves in a worse manner.
//
// This also wastes CPU; when the buffer is full we'd like to put
// our thread to sleep until it has capacity.
// Also, as an aside, it's pretty wild that zstd can overwhelm the
// line counting consumer here; the consumer gets vectorized and
// should be fast: https://rust.godbolt.org/z/963Tsz65s
//
// (only if you build with `-C target-cpu=...` though; need AVX2!)
if !prod.is_full() {
let buf = reader.fill_buf().unwrap();
if buf.is_empty() {
break;
}
let buf = &buf[0..(prod.free_len().min(buf.len()))];
let written = prod.write(buf).unwrap();
reader.consume(written);
}
}
prod.flush().unwrap();
end_tx.send(()).unwrap()
});
// */
println!("lines: {lines}");
// assert_eq!(lines, 10253321);
// println!("{} packages", packages.0.len());
Ok(())
}
{
"folders": [
{
"path": "."
}
],
"settings": {
"rust-analyzer.server.path": "rust-analyzer"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment