Skip to content

Instantly share code, notes, and snippets.

View benkay86's full-sized avatar

Benjamin Kay benkay86

View GitHub Profile
@benkay86
benkay86 / slurm_sync.py
Last active September 26, 2025 17:11
Synchronizing concurrent slurm jobs using advisory file locks
"""Synchronize slurm jobs using advisory file locks (flock).
This module provides functionality for synchronizing concurrent work across
multiple jobs running on multiple nodes in a slurm cluster. Synchronization is
achieved using advisory file locks, or flock. This module is particularly useful
for easily-parallelizable jobs that divide a large problem into chunks that can
be processed independently.
To use an flock like a mutex:
>>> from slurm_sync import LockFile
@benkay86
benkay86 / asyncio_queue_example.py
Created March 27, 2025 18:28
Fine-Grained Control of Concurrent IO using Python's asyncio.Queue
# Python's asynchronous input-output framework speeds up IO-bound operations
# by allowing your python program to make progress on multiple IO-bound tasks
# concurrently. In this example, we load many neuroimaging data files and
# then perform a compute-intensive operation on each of them. Parallelizing the
# compute-intensive operation is beyond the scope of this tutorial. However,
# we can still get a big speed up by reading multiple files concurrently. While
# the program waits for the operating system to make progress on reading one
# file, it can work on computational transformation of another file.
#
# Loading a file asynchronously is straightforward. Here, we define a helper
@benkay86
benkay86 / volume_space_conversion.md
Created March 14, 2024 02:31
convert a cifti dense label (dlabel) file to a different volume space

Volume Space Conversion

The Problem

A collaborator shares some fMRI data with you in a file called other.dtseries.nii. You try to parcellate the data and get this error:

wb_command -cifti-parcellate \
  other.dtseries.nii \
  parcellation.dlabel.nii \
@benkay86
benkay86 / pyo3_memory.rs
Last active August 16, 2021 17:47
A closer look at managing Python's memory with PyO3.
//! The "gotchas" of unexpectedly long lifetimes
//! when managing Python's memory with PyO3.
//! See <https://pyo3.rs>.
use pyo3::types::PyString;
use pyo3::{Py, PyResult, Python};
fn main() -> PyResult<()> {
// Initialize the Python interpreter.
pyo3::prepare_freethreaded_python();
@benkay86
benkay86 / borrow.rs
Created February 22, 2021 18:17
Receiving a borrowed type as either borrowed or owned in Rust
// In Rust, if you are going to access some data without mutating it you
// typically express this with a borrow `&`, for example:
//
// ```
// fn readonly_access(s: &str) {
// println!("{}", s);
// }
// ```
//
// Sometimes when designing an API you have decided not to mutate some data but
@benkay86
benkay86 / glob_iter.rs
Last active March 18, 2021 19:42
Rust iterators and streams over globs with a uniform error type.
//! Tools to enhance [`glob::glob()`]. Provides a [`GlobIterError`] type that
//! enumerates over [`glob::PatternError`] and [`glob::GlobError`] with a
//! corresponding [`GlobPathIter`] as a drop-in replacement for
//! [`glob::Paths`]. Also provides a [`GlobPatternIter`] for iterating over
//! paths matching multiple glob patterns.
//!
//! Note that the underlying glob crate does not support asynchronous pattern
//! matching. This module provides [`UnboundedGlobPathStream`]
//! and [`UnboundedGlobPatternStream`], which use [`super::sidestream`] to do
//! blocking operations on a separate thread.
@benkay86
benkay86 / sidestream.rs
Last active February 11, 2021 16:43
Sidestream collects items from a wrapped stream (or iterator) into a queue on a separate thread.
//! Module for creating sidestreams. A sidestream is a stream over another
//! stream or an iterator. The items from the enclosed stream or iterator are
//! collected into a queue on a separate thread. An optional count parameter is
//! incremented as each item is queued. The items are asynchronously dequeued
//! by the enclosing sidestream. Each item is ready to be dequeued as soon as
//! it is yielded by the enclosed stream or iterator; the sidestream does *not*
//! wait for the collection thread to join. This pattern is useful when you
//! need to know the total number of items in a stream, e.g. to display progress
//! when processing an iterator over a list of files. It is also useful for
//! converting iterators into asynchronous streams.
@benkay86
benkay86 / csv_output_writer.rs
Last active February 7, 2021 02:15
Write an output file in comma-separated value (CSV) format asynchronously using Rust and tokio.
//! Module for CSVOutputWriter, which writes a single unit of data to an output
//! sink using the comma separated value format.
use tokio::io::{AsyncWrite, AsyncWriteExt};
use std::ops::Deref;
use tokio::sync::Mutex;
/// Type alias for a type that implementes AsyncWrite and is synchronized by a
/// mutex. For example:
///
@benkay86
benkay86 / structopt_error.rs
Created January 29, 2021 21:04
Replacement for structopt::clap::Error to override from_args() and avoid calls to std::process::exit().
//! Custom error type to replace [`structopt::clap::Error`].
//! By default [`structopt::StructOpt::from_args()`] calls `exit().``
//! By default [`std::process::exit()`] does not unwind the stack.
//! This is bad: https://www.youtube.com/watch?v=zQC8T71Y8e4
//! Use [`OptsError`] like this to override `from_args()` and avoid calls to
//! `std::process::exit()`:
//!
//! ```ignore
//! #[derive(StructOpt)]
//! #[structopt(...)]
@benkay86
benkay86 / indicatif_progress_stream.rs
Last active April 9, 2025 19:29
Attach an indicatif progress bar to a stream, similar to indicatif::ProgressIterator.
//! [Indicatif](https://github.com/mitsuhiko/indicatif) is a wonderful progress
//! bar for terminal applications. It already works in a multi-threaded
//! environment. The [`indicatif::ProgressIterator::progress_with()`]
//! combinator conveniently attaches a progress bar to an existing iterator.
//!
//! This module provides [`ProgressStream::progress_with()`] for attaching a
//! progress bar to a [`futures::stream::Stream`].
//!
//! Note that the unit tests in this module use [tokio](https://tokio.rs) for
//! a testing runtime, but the rest of the code does not depend on tokio. You