Skip to content

Instantly share code, notes, and snippets.

@h-mayorquin
Created February 28, 2025 18:24
Show Gist options
  • Save h-mayorquin/0b99b241556f89580dc84ae0a7d0d1cf to your computer and use it in GitHub Desktop.
Save h-mayorquin/0b99b241556f89580dc84ae0a7d0d1cf to your computer and use it in GitHub Desktop.
Stubb Thor ome tiff
def stub_ome_dataset(file_path, num_timepoints=3, output_dir=None):
"""
Create a stubbed version of an OME-TIFF dataset based on a file path.
Parameters:
-----------
file_path : str or Path
Path to any TIFF file in the dataset
num_timepoints : int, optional
Number of timepoints to keep in the stub, default is 3
output_dir : str or Path, optional
Directory to save the stubbed dataset. If None, creates a directory
with "_stub" suffix in the same location as the source
Returns:
--------
Path
Path to the generated stub directory
"""
import shutil
import re
from pathlib import Path
import struct
import os
import tifffile
# Convert to Path object
file_path = Path(file_path)
# Get source directory
source_dir = file_path.parent
# Set destination directory
if output_dir is None:
dest_dir = source_dir.with_name(source_dir.name + "_stub")
else:
dest_dir = Path(output_dir)
# Create destination directory if it doesn't exist
dest_dir.mkdir(exist_ok=True, parents=True)
print(f"Source directory: {source_dir}")
print(f"Destination directory: {dest_dir}")
#---------------------------------------------
# PART 1: Copy and modify experiment.xml
#---------------------------------------------
source_exp_file = source_dir / "experiment.xml"
dest_exp_file = dest_dir / "experiment.xml"
# Copy and modify experiment.xml
if source_exp_file.exists():
with open(source_exp_file, 'r', encoding='utf-8') as f:
exp_content = f.read()
# Modify the timepoints values
exp_content = re.sub(r'<Timelapse timepoints="(\d+)"',
f'<Timelapse timepoints="{num_timepoints}"',
exp_content)
exp_content = re.sub(r'<Streaming .*?frames="(\d+)"',
lambda m: m.group(0).replace(m.group(1), str(num_timepoints)),
exp_content)
with open(dest_exp_file, 'w', encoding='utf-8') as f:
f.write(exp_content)
print(f"Copied and modified experiment.xml to {dest_exp_file}")
else:
print(f"Warning: experiment.xml not found at {source_exp_file}")
#---------------------------------------------
# PART 2: Extract filenames from OME-XML
#---------------------------------------------
# First, get the OME-XML from the provided file
with tifffile.TiffFile(file_path) as tif:
ome_xml = tif.ome_metadata
print(f"Original OME-XML length: {len(ome_xml)} bytes")
# Extract file names and their time points from TiffData elements
files_info = []
pattern = r'<TiffData FirstT="(\d+)".*?<UUID FileName="([^"]+)"'
matches = re.finditer(pattern, ome_xml, re.DOTALL)
for match in matches:
t_index = int(match.group(1))
filename = match.group(2)
files_info.append((t_index, filename))
# Sort by time index
files_info.sort(key=lambda x: x[0])
# Select only files for the time points we want to keep
files_to_copy = [filename for t_index, filename in files_info if t_index < num_timepoints]
print(f"Found {len(files_info)} files in OME-XML")
print(f"Will copy {len(files_to_copy)} files: {files_to_copy}")
#---------------------------------------------
# PART 3: Copy files and modify OME-XML
#---------------------------------------------
# First, copy all files directly (preserving their exact structure)
for filename in files_to_copy:
source_file = source_dir / filename
dest_file = dest_dir / filename
if source_file.exists():
# Copy the file directly
shutil.copy2(source_file, dest_file)
print(f"Copied {filename} to {dest_dir}")
else:
print(f"Warning: Source file {source_file} not found!")
# Modify the XML to update SizeT and TiffData entries
if ome_xml:
# Update SizeT attribute
ome_xml = re.sub(r'SizeT="(\d+)"', f'SizeT="{num_timepoints}"', ome_xml)
# Find and remove excess TiffData entries
# First, collect all TiffData entries
tiff_data_entries = []
pattern = r'<TiffData FirstT="(\d+)".*?</TiffData>'
for match in re.finditer(pattern, ome_xml, re.DOTALL):
t_index = int(match.group(1))
tiff_data_entries.append((t_index, match.start(), match.end(), match.group(0)))
# Sort entries by FirstT value
tiff_data_entries.sort(key=lambda x: x[0])
# Keep only entries with FirstT < num_timepoints
entries_to_keep = [entry for entry in tiff_data_entries if entry[0] < num_timepoints]
if entries_to_keep:
# Find the surrounding context
first_entry_start = min(entry[1] for entry in tiff_data_entries)
last_entry_end = max(entry[2] for entry in tiff_data_entries)
# Build new XML with just the entries we want to keep
new_xml = ome_xml[:first_entry_start]
for i, (t_index, start, end, text) in enumerate(entries_to_keep):
new_xml += text
# Add newline and indentation between entries
if i < len(entries_to_keep) - 1:
new_xml += '\n '
new_xml += ome_xml[last_entry_end:]
modified_xml = new_xml
else:
modified_xml = ome_xml
print(f"Modified OME-XML length: {len(modified_xml)} bytes")
# Now update each copied file with the modified XML
for filename in files_to_copy:
dest_file = dest_dir / filename
# Use tifffile to update just the ImageDescription tag
with tifffile.TiffFile(dest_file) as tif:
# Get the page
page = tif.pages[0]
# Check if ImageDescription exists
has_description = False
for tag in page.tags.values():
if tag.name == 'ImageDescription':
has_description = True
break
if has_description:
# Create a temporary file with updated description
temp_file = dest_file.with_suffix('.tmp.tif')
# Open the file in binary mode
with open(dest_file, 'rb') as f_in, open(temp_file, 'wb') as f_out:
# Copy header
header = f_in.read(8)
f_out.write(header)
# Read first IFD
ifd_offset = struct.unpack('<I', header[4:8])[0]
f_in.seek(ifd_offset)
f_out.seek(ifd_offset)
# Read number of entries
num_entries = struct.unpack('<H', f_in.read(2))[0]
f_out.write(struct.pack('<H', num_entries))
# Copy all entries except ImageDescription, which we'll update
for i in range(num_entries):
entry = f_in.read(12)
tag = struct.unpack('<H', entry[0:2])[0]
if tag == 270: # ImageDescription
# Save current position
current_pos = f_in.tell()
# Write modified tag
entry_type = 2 # ASCII
count = len(modified_xml) + 1 # +1 for NULL terminator
if count <= 4:
# Value fits in the entry
value = modified_xml.encode('utf-8').ljust(4, b'\0')
f_out.write(struct.pack('<HHI4s', tag, entry_type, count, value))
else:
# Value stored elsewhere - we'll add it at the end
# For now, just write a placeholder offset
f_out.write(struct.pack('<HHII', tag, entry_type, count, 0))
# Restore position for next entry
f_in.seek(current_pos)
else:
# Copy entry as is
f_out.write(entry)
# Copy next IFD offset
next_ifd = f_in.read(4)
f_out.write(next_ifd)
# Now add the modified XML at the end
xml_offset = f_out.tell()
# Go back and update the offset in the ImageDescription entry
for i in range(num_entries):
entry_pos = ifd_offset + 2 + i * 12
f_in.seek(entry_pos)
entry = f_in.read(12)
tag = struct.unpack('<H', entry[0:2])[0]
if tag == 270: # ImageDescription
f_out.seek(entry_pos + 8) # Position of value/offset
f_out.write(struct.pack('<I', xml_offset))
break
# Write the actual XML data
f_out.seek(xml_offset)
f_out.write(modified_xml.encode('utf-8') + b'\0')
# Copy the rest of the file (image data, etc.)
f_in.seek(xml_offset)
chunk = f_in.read(4096)
while chunk:
f_out.write(chunk)
chunk = f_in.read(4096)
# Replace original with modified file
temp_file.replace(dest_file)
print(f"Updated OME-XML in {filename}")
else:
print(f"Warning: No ImageDescription tag found in {filename}")
print("\nStubbed dataset created at {dest_dir}")
print(f"Kept {len(files_to_copy)} time points (first {num_timepoints})")
return dest_dir
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment