Skip to content

Instantly share code, notes, and snippets.

@nazar-pc
Created March 15, 2024 16:23
Show Gist options
  • Save nazar-pc/db418aeb3e23d6b6499d1ae9d4211290 to your computer and use it in GitHub Desktop.
Save nazar-pc/db418aeb3e23d6b6499d1ae9d4211290 to your computer and use it in GitHub Desktop.
Windows unbuffered I/O serialization bug
//! ```cargo
//! [dependencies]
//! rand = "0.8.5"
//! rayon = "1.9.0"
//! windows = { version = "0.54.0", features = ["Wdk_Storage_FileSystem", "Win32_Storage_FileSystem", "Win32_System_IO", "Win32_Security", "Win32_System_Threading"] }
//! ```
use rand::prelude::*;
use rayon::prelude::*;
use std::fs::File;
use std::io::{Seek, SeekFrom};
use std::time::Instant;
use std::{env, mem, ptr};
use windows::core::PCWSTR;
use windows::Wdk::Storage::FileSystem::NtReadFile;
use windows::Win32::Foundation::{
GENERIC_READ, GENERIC_WRITE, HANDLE, STATUS_PENDING, STATUS_SUCCESS,
};
use windows::Win32::Storage::FileSystem::{
CreateFileW, FileEndOfFileInfo, SetFileInformationByHandle, FILE_END_OF_FILE_INFO,
FILE_FLAGS_AND_ATTRIBUTES, FILE_FLAG_NO_BUFFERING, FILE_FLAG_WRITE_THROUGH, FILE_SHARE_DELETE,
FILE_SHARE_READ, FILE_SHARE_WRITE, OPEN_EXISTING,
};
use windows::Win32::System::Threading::{WaitForSingleObject, INFINITE};
use windows::Win32::System::IO::IO_STATUS_BLOCK;
const SECTOR_SIZE: usize = 1024 * 1024 * 1024;
const CHUNK_SIZE: usize = 4096;
fn main() {
let input_file_path = env::args().nth(1).unwrap();
// Just to measure size
let file_size = File::open(&input_file_path)
.unwrap()
.seek(SeekFrom::End(0))
.unwrap();
let sectors = (file_size / SECTOR_SIZE as u64) as usize;
println!("sectors: {sectors}");
// Reference benchmark results
{
let file = unsafe { open_file_read(&input_file_path) };
let files = unsafe { open_rayon_files_read(&input_file_path) };
let start = Instant::now();
do_random_reads(file, sectors);
println!("Random reads sequential: {:?}", start.elapsed());
let start = Instant::now();
do_random_reads_parallel(&files, sectors);
println!("Random reads parallel: {:?}", start.elapsed());
}
unsafe {
let write_file = open_file_write(&input_file_path);
let info = FILE_END_OF_FILE_INFO {
EndOfFile: file_size as i64,
};
SetFileInformationByHandle(
write_file,
FileEndOfFileInfo,
ptr::addr_of!(info).cast(),
mem::size_of::<FILE_END_OF_FILE_INFO>().try_into().unwrap(),
)
.unwrap();
}
// Second benchmark results to confirm that parallel file reads are being reduced to
// single-threaded performance
{
// It is important to reopen the files for some reason to reproduce this issue
let file = unsafe { open_file_read(&input_file_path) };
let files = unsafe { open_rayon_files_read(&input_file_path) };
let start = Instant::now();
do_random_reads(file, sectors);
println!(
"Sequential after SetFileInformationByHandle: {:?}",
start.elapsed()
);
let start = Instant::now();
do_random_reads_parallel(&files, sectors);
println!(
"Parallel after SetFileInformationByHandle: {:?}",
start.elapsed()
);
}
}
/// Open file for reads with buffering disabled
unsafe fn open_file_read(path: &str) -> HANDLE {
let mut path = path.encode_utf16().collect::<Vec<_>>();
path.push(0);
CreateFileW(
PCWSTR::from_raw(path.as_ptr()),
GENERIC_READ.0,
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
None,
OPEN_EXISTING,
FILE_FLAG_WRITE_THROUGH | FILE_FLAG_NO_BUFFERING,
HANDLE::default(),
)
.unwrap()
}
/// Open file for writes with default flags (buffering enabled)
unsafe fn open_file_write(path: &str) -> HANDLE {
let mut path = path.encode_utf16().collect::<Vec<_>>();
path.push(0);
CreateFileW(
PCWSTR::from_raw(path.as_ptr()),
GENERIC_WRITE.0,
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
None,
OPEN_EXISTING,
FILE_FLAGS_AND_ATTRIBUTES::default(),
HANDLE::default(),
)
.unwrap()
}
unsafe fn open_rayon_files_read(path: &str) -> Vec<HANDLE> {
(0..rayon::current_num_threads())
.map(|_| open_file_read(path))
.collect()
}
/// Pick random [`SECTOR_SIZE`] sector in a large file and reach [`CHUNK_SIZE`] chunks from it at
/// random
fn do_random_reads(file: HANDLE, sectors: usize) {
let random_sector = thread_rng().gen_range(0..sectors) as u64;
let sector_offset = random_sector * SECTOR_SIZE as u64;
for _ in 0..2_u32.pow(15) {
let offset_within_sector = thread_rng().gen_range(0..SECTOR_SIZE / CHUNK_SIZE) * CHUNK_SIZE;
let byte_offset = sector_offset as i64 + offset_within_sector as i64;
unsafe {
read_chunk(file, byte_offset);
}
}
}
/// Similar to [`do_random_reads`], but reads chunks concurrently using rayon, takes multiple files
/// because otherwise Windows is not scaling reads from multiple threads 🤷
fn do_random_reads_parallel(files: &[HANDLE], sectors: usize) {
let random_sector = thread_rng().gen_range(0..sectors) as u64;
let sector_offset = random_sector * SECTOR_SIZE as u64;
(0..2_u32.pow(15)).into_par_iter().for_each(|_| {
let offset_within_sector = thread_rng().gen_range(0..SECTOR_SIZE / CHUNK_SIZE) * CHUNK_SIZE;
let byte_offset = sector_offset as i64 + offset_within_sector as i64;
unsafe {
read_chunk(files[rayon::current_thread_index().unwrap()], byte_offset);
}
});
}
unsafe fn read_chunk(file: HANDLE, byte_offset: i64) {
let mut io_status = IO_STATUS_BLOCK::default();
let mut result = [0u8; CHUNK_SIZE];
let status = NtReadFile(
file,
HANDLE::default(),
None,
None,
&mut io_status,
result.as_mut_ptr() as *mut _,
CHUNK_SIZE as u32,
Some(&byte_offset),
None,
);
if status == STATUS_PENDING {
WaitForSingleObject(file, INFINITE);
}
if status != STATUS_SUCCESS {
panic!("Status: {status:?}");
}
}
@nazar-pc
Copy link
Author

Reported on Feedback Hub at https://aka.ms/AApjlev

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment