Created
March 15, 2024 16:23
-
-
Save nazar-pc/db418aeb3e23d6b6499d1ae9d4211290 to your computer and use it in GitHub Desktop.
Windows unbuffered I/O serialization bug
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//! ```cargo | |
//! [dependencies] | |
//! rand = "0.8.5" | |
//! rayon = "1.9.0" | |
//! windows = { version = "0.54.0", features = ["Wdk_Storage_FileSystem", "Win32_Storage_FileSystem", "Win32_System_IO", "Win32_Security", "Win32_System_Threading"] } | |
//! ``` | |
use rand::prelude::*; | |
use rayon::prelude::*; | |
use std::fs::File; | |
use std::io::{Seek, SeekFrom}; | |
use std::time::Instant; | |
use std::{env, mem, ptr}; | |
use windows::core::PCWSTR; | |
use windows::Wdk::Storage::FileSystem::NtReadFile; | |
use windows::Win32::Foundation::{ | |
GENERIC_READ, GENERIC_WRITE, HANDLE, STATUS_PENDING, STATUS_SUCCESS, | |
}; | |
use windows::Win32::Storage::FileSystem::{ | |
CreateFileW, FileEndOfFileInfo, SetFileInformationByHandle, FILE_END_OF_FILE_INFO, | |
FILE_FLAGS_AND_ATTRIBUTES, FILE_FLAG_NO_BUFFERING, FILE_FLAG_WRITE_THROUGH, FILE_SHARE_DELETE, | |
FILE_SHARE_READ, FILE_SHARE_WRITE, OPEN_EXISTING, | |
}; | |
use windows::Win32::System::Threading::{WaitForSingleObject, INFINITE}; | |
use windows::Win32::System::IO::IO_STATUS_BLOCK; | |
const SECTOR_SIZE: usize = 1024 * 1024 * 1024; | |
const CHUNK_SIZE: usize = 4096; | |
fn main() { | |
let input_file_path = env::args().nth(1).unwrap(); | |
// Just to measure size | |
let file_size = File::open(&input_file_path) | |
.unwrap() | |
.seek(SeekFrom::End(0)) | |
.unwrap(); | |
let sectors = (file_size / SECTOR_SIZE as u64) as usize; | |
println!("sectors: {sectors}"); | |
// Reference benchmark results | |
{ | |
let file = unsafe { open_file_read(&input_file_path) }; | |
let files = unsafe { open_rayon_files_read(&input_file_path) }; | |
let start = Instant::now(); | |
do_random_reads(file, sectors); | |
println!("Random reads sequential: {:?}", start.elapsed()); | |
let start = Instant::now(); | |
do_random_reads_parallel(&files, sectors); | |
println!("Random reads parallel: {:?}", start.elapsed()); | |
} | |
unsafe { | |
let write_file = open_file_write(&input_file_path); | |
let info = FILE_END_OF_FILE_INFO { | |
EndOfFile: file_size as i64, | |
}; | |
SetFileInformationByHandle( | |
write_file, | |
FileEndOfFileInfo, | |
ptr::addr_of!(info).cast(), | |
mem::size_of::<FILE_END_OF_FILE_INFO>().try_into().unwrap(), | |
) | |
.unwrap(); | |
} | |
// Second benchmark results to confirm that parallel file reads are being reduced to | |
// single-threaded performance | |
{ | |
// It is important to reopen the files for some reason to reproduce this issue | |
let file = unsafe { open_file_read(&input_file_path) }; | |
let files = unsafe { open_rayon_files_read(&input_file_path) }; | |
let start = Instant::now(); | |
do_random_reads(file, sectors); | |
println!( | |
"Sequential after SetFileInformationByHandle: {:?}", | |
start.elapsed() | |
); | |
let start = Instant::now(); | |
do_random_reads_parallel(&files, sectors); | |
println!( | |
"Parallel after SetFileInformationByHandle: {:?}", | |
start.elapsed() | |
); | |
} | |
} | |
/// Open file for reads with buffering disabled | |
unsafe fn open_file_read(path: &str) -> HANDLE { | |
let mut path = path.encode_utf16().collect::<Vec<_>>(); | |
path.push(0); | |
CreateFileW( | |
PCWSTR::from_raw(path.as_ptr()), | |
GENERIC_READ.0, | |
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, | |
None, | |
OPEN_EXISTING, | |
FILE_FLAG_WRITE_THROUGH | FILE_FLAG_NO_BUFFERING, | |
HANDLE::default(), | |
) | |
.unwrap() | |
} | |
/// Open file for writes with default flags (buffering enabled) | |
unsafe fn open_file_write(path: &str) -> HANDLE { | |
let mut path = path.encode_utf16().collect::<Vec<_>>(); | |
path.push(0); | |
CreateFileW( | |
PCWSTR::from_raw(path.as_ptr()), | |
GENERIC_WRITE.0, | |
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, | |
None, | |
OPEN_EXISTING, | |
FILE_FLAGS_AND_ATTRIBUTES::default(), | |
HANDLE::default(), | |
) | |
.unwrap() | |
} | |
unsafe fn open_rayon_files_read(path: &str) -> Vec<HANDLE> { | |
(0..rayon::current_num_threads()) | |
.map(|_| open_file_read(path)) | |
.collect() | |
} | |
/// Pick random [`SECTOR_SIZE`] sector in a large file and reach [`CHUNK_SIZE`] chunks from it at | |
/// random | |
fn do_random_reads(file: HANDLE, sectors: usize) { | |
let random_sector = thread_rng().gen_range(0..sectors) as u64; | |
let sector_offset = random_sector * SECTOR_SIZE as u64; | |
for _ in 0..2_u32.pow(15) { | |
let offset_within_sector = thread_rng().gen_range(0..SECTOR_SIZE / CHUNK_SIZE) * CHUNK_SIZE; | |
let byte_offset = sector_offset as i64 + offset_within_sector as i64; | |
unsafe { | |
read_chunk(file, byte_offset); | |
} | |
} | |
} | |
/// Similar to [`do_random_reads`], but reads chunks concurrently using rayon, takes multiple files | |
/// because otherwise Windows is not scaling reads from multiple threads 🤷 | |
fn do_random_reads_parallel(files: &[HANDLE], sectors: usize) { | |
let random_sector = thread_rng().gen_range(0..sectors) as u64; | |
let sector_offset = random_sector * SECTOR_SIZE as u64; | |
(0..2_u32.pow(15)).into_par_iter().for_each(|_| { | |
let offset_within_sector = thread_rng().gen_range(0..SECTOR_SIZE / CHUNK_SIZE) * CHUNK_SIZE; | |
let byte_offset = sector_offset as i64 + offset_within_sector as i64; | |
unsafe { | |
read_chunk(files[rayon::current_thread_index().unwrap()], byte_offset); | |
} | |
}); | |
} | |
unsafe fn read_chunk(file: HANDLE, byte_offset: i64) { | |
let mut io_status = IO_STATUS_BLOCK::default(); | |
let mut result = [0u8; CHUNK_SIZE]; | |
let status = NtReadFile( | |
file, | |
HANDLE::default(), | |
None, | |
None, | |
&mut io_status, | |
result.as_mut_ptr() as *mut _, | |
CHUNK_SIZE as u32, | |
Some(&byte_offset), | |
None, | |
); | |
if status == STATUS_PENDING { | |
WaitForSingleObject(file, INFINITE); | |
} | |
if status != STATUS_SUCCESS { | |
panic!("Status: {status:?}"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Reported on Feedback Hub at https://aka.ms/AApjlev