Created
February 28, 2025 18:24
-
-
Save h-mayorquin/0b99b241556f89580dc84ae0a7d0d1cf to your computer and use it in GitHub Desktop.
Stubb Thor ome tiff
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def stub_ome_dataset(file_path, num_timepoints=3, output_dir=None): | |
""" | |
Create a stubbed version of an OME-TIFF dataset based on a file path. | |
Parameters: | |
----------- | |
file_path : str or Path | |
Path to any TIFF file in the dataset | |
num_timepoints : int, optional | |
Number of timepoints to keep in the stub, default is 3 | |
output_dir : str or Path, optional | |
Directory to save the stubbed dataset. If None, creates a directory | |
with "_stub" suffix in the same location as the source | |
Returns: | |
-------- | |
Path | |
Path to the generated stub directory | |
""" | |
import shutil | |
import re | |
from pathlib import Path | |
import struct | |
import os | |
import tifffile | |
# Convert to Path object | |
file_path = Path(file_path) | |
# Get source directory | |
source_dir = file_path.parent | |
# Set destination directory | |
if output_dir is None: | |
dest_dir = source_dir.with_name(source_dir.name + "_stub") | |
else: | |
dest_dir = Path(output_dir) | |
# Create destination directory if it doesn't exist | |
dest_dir.mkdir(exist_ok=True, parents=True) | |
print(f"Source directory: {source_dir}") | |
print(f"Destination directory: {dest_dir}") | |
#--------------------------------------------- | |
# PART 1: Copy and modify experiment.xml | |
#--------------------------------------------- | |
source_exp_file = source_dir / "experiment.xml" | |
dest_exp_file = dest_dir / "experiment.xml" | |
# Copy and modify experiment.xml | |
if source_exp_file.exists(): | |
with open(source_exp_file, 'r', encoding='utf-8') as f: | |
exp_content = f.read() | |
# Modify the timepoints values | |
exp_content = re.sub(r'<Timelapse timepoints="(\d+)"', | |
f'<Timelapse timepoints="{num_timepoints}"', | |
exp_content) | |
exp_content = re.sub(r'<Streaming .*?frames="(\d+)"', | |
lambda m: m.group(0).replace(m.group(1), str(num_timepoints)), | |
exp_content) | |
with open(dest_exp_file, 'w', encoding='utf-8') as f: | |
f.write(exp_content) | |
print(f"Copied and modified experiment.xml to {dest_exp_file}") | |
else: | |
print(f"Warning: experiment.xml not found at {source_exp_file}") | |
#--------------------------------------------- | |
# PART 2: Extract filenames from OME-XML | |
#--------------------------------------------- | |
# First, get the OME-XML from the provided file | |
with tifffile.TiffFile(file_path) as tif: | |
ome_xml = tif.ome_metadata | |
print(f"Original OME-XML length: {len(ome_xml)} bytes") | |
# Extract file names and their time points from TiffData elements | |
files_info = [] | |
pattern = r'<TiffData FirstT="(\d+)".*?<UUID FileName="([^"]+)"' | |
matches = re.finditer(pattern, ome_xml, re.DOTALL) | |
for match in matches: | |
t_index = int(match.group(1)) | |
filename = match.group(2) | |
files_info.append((t_index, filename)) | |
# Sort by time index | |
files_info.sort(key=lambda x: x[0]) | |
# Select only files for the time points we want to keep | |
files_to_copy = [filename for t_index, filename in files_info if t_index < num_timepoints] | |
print(f"Found {len(files_info)} files in OME-XML") | |
print(f"Will copy {len(files_to_copy)} files: {files_to_copy}") | |
#--------------------------------------------- | |
# PART 3: Copy files and modify OME-XML | |
#--------------------------------------------- | |
# First, copy all files directly (preserving their exact structure) | |
for filename in files_to_copy: | |
source_file = source_dir / filename | |
dest_file = dest_dir / filename | |
if source_file.exists(): | |
# Copy the file directly | |
shutil.copy2(source_file, dest_file) | |
print(f"Copied {filename} to {dest_dir}") | |
else: | |
print(f"Warning: Source file {source_file} not found!") | |
# Modify the XML to update SizeT and TiffData entries | |
if ome_xml: | |
# Update SizeT attribute | |
ome_xml = re.sub(r'SizeT="(\d+)"', f'SizeT="{num_timepoints}"', ome_xml) | |
# Find and remove excess TiffData entries | |
# First, collect all TiffData entries | |
tiff_data_entries = [] | |
pattern = r'<TiffData FirstT="(\d+)".*?</TiffData>' | |
for match in re.finditer(pattern, ome_xml, re.DOTALL): | |
t_index = int(match.group(1)) | |
tiff_data_entries.append((t_index, match.start(), match.end(), match.group(0))) | |
# Sort entries by FirstT value | |
tiff_data_entries.sort(key=lambda x: x[0]) | |
# Keep only entries with FirstT < num_timepoints | |
entries_to_keep = [entry for entry in tiff_data_entries if entry[0] < num_timepoints] | |
if entries_to_keep: | |
# Find the surrounding context | |
first_entry_start = min(entry[1] for entry in tiff_data_entries) | |
last_entry_end = max(entry[2] for entry in tiff_data_entries) | |
# Build new XML with just the entries we want to keep | |
new_xml = ome_xml[:first_entry_start] | |
for i, (t_index, start, end, text) in enumerate(entries_to_keep): | |
new_xml += text | |
# Add newline and indentation between entries | |
if i < len(entries_to_keep) - 1: | |
new_xml += '\n ' | |
new_xml += ome_xml[last_entry_end:] | |
modified_xml = new_xml | |
else: | |
modified_xml = ome_xml | |
print(f"Modified OME-XML length: {len(modified_xml)} bytes") | |
# Now update each copied file with the modified XML | |
for filename in files_to_copy: | |
dest_file = dest_dir / filename | |
# Use tifffile to update just the ImageDescription tag | |
with tifffile.TiffFile(dest_file) as tif: | |
# Get the page | |
page = tif.pages[0] | |
# Check if ImageDescription exists | |
has_description = False | |
for tag in page.tags.values(): | |
if tag.name == 'ImageDescription': | |
has_description = True | |
break | |
if has_description: | |
# Create a temporary file with updated description | |
temp_file = dest_file.with_suffix('.tmp.tif') | |
# Open the file in binary mode | |
with open(dest_file, 'rb') as f_in, open(temp_file, 'wb') as f_out: | |
# Copy header | |
header = f_in.read(8) | |
f_out.write(header) | |
# Read first IFD | |
ifd_offset = struct.unpack('<I', header[4:8])[0] | |
f_in.seek(ifd_offset) | |
f_out.seek(ifd_offset) | |
# Read number of entries | |
num_entries = struct.unpack('<H', f_in.read(2))[0] | |
f_out.write(struct.pack('<H', num_entries)) | |
# Copy all entries except ImageDescription, which we'll update | |
for i in range(num_entries): | |
entry = f_in.read(12) | |
tag = struct.unpack('<H', entry[0:2])[0] | |
if tag == 270: # ImageDescription | |
# Save current position | |
current_pos = f_in.tell() | |
# Write modified tag | |
entry_type = 2 # ASCII | |
count = len(modified_xml) + 1 # +1 for NULL terminator | |
if count <= 4: | |
# Value fits in the entry | |
value = modified_xml.encode('utf-8').ljust(4, b'\0') | |
f_out.write(struct.pack('<HHI4s', tag, entry_type, count, value)) | |
else: | |
# Value stored elsewhere - we'll add it at the end | |
# For now, just write a placeholder offset | |
f_out.write(struct.pack('<HHII', tag, entry_type, count, 0)) | |
# Restore position for next entry | |
f_in.seek(current_pos) | |
else: | |
# Copy entry as is | |
f_out.write(entry) | |
# Copy next IFD offset | |
next_ifd = f_in.read(4) | |
f_out.write(next_ifd) | |
# Now add the modified XML at the end | |
xml_offset = f_out.tell() | |
# Go back and update the offset in the ImageDescription entry | |
for i in range(num_entries): | |
entry_pos = ifd_offset + 2 + i * 12 | |
f_in.seek(entry_pos) | |
entry = f_in.read(12) | |
tag = struct.unpack('<H', entry[0:2])[0] | |
if tag == 270: # ImageDescription | |
f_out.seek(entry_pos + 8) # Position of value/offset | |
f_out.write(struct.pack('<I', xml_offset)) | |
break | |
# Write the actual XML data | |
f_out.seek(xml_offset) | |
f_out.write(modified_xml.encode('utf-8') + b'\0') | |
# Copy the rest of the file (image data, etc.) | |
f_in.seek(xml_offset) | |
chunk = f_in.read(4096) | |
while chunk: | |
f_out.write(chunk) | |
chunk = f_in.read(4096) | |
# Replace original with modified file | |
temp_file.replace(dest_file) | |
print(f"Updated OME-XML in {filename}") | |
else: | |
print(f"Warning: No ImageDescription tag found in {filename}") | |
print("\nStubbed dataset created at {dest_dir}") | |
print(f"Kept {len(files_to_copy)} time points (first {num_timepoints})") | |
return dest_dir |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment