Last active
February 8, 2023 17:42
-
-
Save Dabsunter/970557d47df39316cf4dd7cb5701c9a5 to your computer and use it in GitHub Desktop.
Fix de @qadadri pour MicroMedIAn/PathAIA
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf8 | |
"""Useful functions for images.""" | |
import numpy | |
from skimage.io import imread | |
from skimage.transform import resize | |
from .paths import imfiles_in_folder | |
from .types import NDBoolMask, PathLike, NDImage, NDByteImage, Coord | |
from ..patches.compat import convert_coords | |
import itertools | |
from typing import Iterator, List, Tuple, Sequence, Optional, Union, Any | |
from nptyping import NDArray | |
def regular_grid(shape: Coord, interval: Coord, psize: Coord) -> Iterator[Coord]: | |
""" | |
Get a regular grid of position on a slide given its dimensions. | |
Arguments: | |
shape: (x, y) shape of the window to tile. | |
interval: (x, y) steps between patch samples. | |
psize: (w, h) size of the patches (in pixels). | |
Yields: | |
(x, y) positions on a regular grid. | |
""" | |
psize = convert_coords(psize) | |
interval = convert_coords(interval) | |
shape = convert_coords(shape) | |
step = interval + psize | |
maxj, maxi = step * ((shape - psize) / step + 1) | |
col = numpy.arange(start=0, stop=maxj, step=step[0], dtype=int) | |
line = numpy.arange(start=0, stop=maxi, step=step[1], dtype=int) | |
for i, j in itertools.product(line, col): | |
yield Coord(x=j, y=i) | |
def get_coords_from_mask( | |
mask: NDBoolMask, shape: Coord, interval: Coord, psize: Coord | |
) -> Iterator[Coord]: | |
""" | |
Get tissue coordinates given a tissue binary mask and slide dimensions. | |
Arguments: | |
mask: binary mask where tissue is marked as True. | |
shape: (x, y) shape of the window to tile. | |
interval: (x, y) steps between patch samples. | |
psize: (w, h) size of the patches (in pixels). | |
Yields: | |
(x, y) positions on a regular grid. | |
""" | |
psize = convert_coords(psize) | |
interval = convert_coords(interval) | |
shape = convert_coords(shape) | |
step = interval + psize | |
mask_w, mask_h = (shape - psize) / step + 1 | |
mask = resize(mask, (mask_h, mask_w)) | |
for i, j in numpy.argwhere(mask): | |
yield step * (j, i) | |
def unlabeled_regular_grid_list(shape: Coord, step: int, psize: int) -> List[Coord]: | |
""" | |
Get a regular grid of position on a slide given its dimensions. | |
Args: | |
shape: shape (i, j) of the window to tile. | |
step: steps in pixels between patch samples. | |
psize: size of the side of the patch (in pixels). | |
Returns: | |
Positions (i, j) on the regular grid. | |
""" | |
maxi = step * int((shape[0] - (psize - step)) / step) + 1 | |
maxj = step * int((shape[1] - (psize - step)) / step) + 1 | |
col = numpy.arange(start=0, stop=maxj, step=step, dtype=int) | |
line = numpy.arange(start=0, stop=maxi, step=step, dtype=int) | |
return list(itertools.product(line, col)) | |
def images_in_folder( | |
folder: PathLike, | |
authorized: Sequence[str] = (".png", ".jpg", ".jpeg", ".tif", ".tiff"), | |
forbiden: Sequence[str] = ("thumbnail",), | |
randomize: bool = False, | |
datalim: Optional[int] = None, | |
paths: bool = False, | |
) -> Iterator[Union[NDByteImage, Tuple[str, NDByteImage]]]: | |
""" | |
Get images in a given folder. | |
Get all images as numpy arrays (selected by file extension). | |
You can remove terms from the research. | |
Args: | |
folder: absolute path to an image directory. | |
authorized: authorized image file extensions. | |
forbiden: non-authorized words in file names. | |
randomize: whether to randomize output list of files. | |
datalim: maximum number of file to extract in folder. | |
paths: whether to return absolute path with image data. | |
Yields: | |
Images as numpy arrays, optionally with path. | |
""" | |
for imfile in imfiles_in_folder(folder, authorized, forbiden, randomize, datalim): | |
if paths: | |
yield imfile, imread(imfile) | |
else: | |
yield imread(imfile) | |
def sample_img( | |
image: NDImage, psize: int, spl_per_image: int, mask: NDBoolMask = None | |
): | |
""" | |
Split image in patches. | |
Args: | |
image: numpy image to fit on. | |
psize: size in pixels of the side of a patch. | |
spl_per_image: maximum number of patches to extract in image. | |
mask: optional boolean array, we sample in true pixels if provided. | |
Returns: | |
Patches in the image. | |
""" | |
img = image.astype(float) | |
spaceshape = (image.shape[0], image.shape[1]) | |
di, dj = spaceshape | |
if mask is None: | |
positions = unlabeled_regular_grid_list(spaceshape, psize) | |
else: | |
half_size = int(0.5 * psize) | |
cropped_mask = numpy.zeros_like(mask) | |
cropped_mask[mask > 0] = 1 | |
cropped_mask[0 : half_size + 1, :] = 0 | |
cropped_mask[di - half_size - 1 : :, :] = 0 | |
cropped_mask[:, 0 : half_size + 1] = 0 | |
cropped_mask[:, dj - half_size - 1 : :] = 0 | |
y, x = numpy.where(cropped_mask > 0) | |
y -= half_size | |
x -= half_size | |
positions = [(i, j) for i, j in zip(y, x)] | |
numpy.random.shuffle(positions) | |
positions = positions[0:spl_per_image] | |
patches = [img[i : i + psize, j : j + psize].reshape(-1) for i, j in positions] | |
return patches | |
def sample_img_sep_channels( | |
image: NDByteImage, psize: int, spl_per_image: int, mask: NDBoolMask = None | |
): | |
"""Fit vocabulary on a single image. | |
Split image in patches and fit on them. | |
Args: | |
image: numpy image to fit on. | |
psize: size in pixels of the side of a patch. | |
spl_per_image: maximum number of patches to extract in image. | |
mask: optional boolean array, we sample in true pixels if provided. | |
Returns: | |
Patches in the image in separated channels. | |
""" | |
img = image.astype(float) | |
n_channels = image.shape[-1] | |
spaceshape = (image.shape[0], image.shape[1]) | |
di, dj = spaceshape | |
if mask is None: | |
positions = unlabeled_regular_grid_list(spaceshape, psize) | |
else: | |
half_size = int(0.5 * psize) | |
cropped_mask = numpy.zeros_like(mask) | |
cropped_mask[mask > 0] = 1 | |
cropped_mask[0 : half_size + 1, :] = 0 | |
cropped_mask[di - half_size - 1 : :, :] = 0 | |
cropped_mask[:, 0 : half_size + 1] = 0 | |
cropped_mask[:, dj - half_size - 1 : :] = 0 | |
y, x = numpy.where(cropped_mask > 0) | |
y -= half_size | |
x -= half_size | |
positions = [(i, j) for i, j in zip(y, x)] | |
numpy.random.shuffle(positions) | |
if len(positions) > spl_per_image: | |
positions = positions[0:spl_per_image] | |
patches = [] | |
for c in range(n_channels): | |
patches.append( | |
[ | |
img[:, :, c][i : i + psize, j : j + psize].reshape(-1) | |
for i, j in positions | |
] | |
) | |
return tuple(patches) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import ( | |
Any, | |
Callable, | |
Dict, | |
Sequence, | |
Union, | |
NamedTuple, | |
Optional, | |
List, | |
Tuple, | |
) | |
from nptyping import NDArray | |
import os | |
import numpy | |
from dataclasses import dataclass | |
from openslide import OpenSlide | |
from pathlib import Path | |
import warnings | |
from PIL import Image | |
try: | |
from cucim import CuImage | |
except ImportError: | |
pass | |
class _CoordBase(NamedTuple): | |
x: int | |
y: int | |
class Coord(_CoordBase): | |
""" | |
An (x, y) tuple representing integer coordinates. | |
If only x is given then takes value (x, x). | |
""" | |
def __new__(cls, x: Union[_CoordBase, int], y: Optional[int] = None): | |
if y is None: | |
if isinstance(x, tuple): | |
x, y = x | |
else: | |
y = x | |
return super().__new__(cls, int(x), int(y)) | |
def __add__(self, other): | |
x, y = self.__class__(other) | |
return self.__class__(int(x + self.x), int(y + self.y)) | |
def __radd__(self, other): | |
return self + other | |
def __neg__(self): | |
return self.__class__(-self.x, -self.y) | |
def __sub__(self, other): | |
other = self.__class__(other) | |
return -other + self | |
def __rsub__(self, other): | |
return -self + other | |
def __mul__(self, other): | |
x, y = self.__class__(other) | |
return self.__class__(int(x * self.x), int(y * self.y)) | |
def __rmul__(self, other): | |
return self * other | |
def __floordiv__(self, other): | |
x, y = self.__class__(other) | |
return self.__class__(int(self.x / x), int(self.y / y)) | |
def __truediv__(self, other): | |
return self // other | |
def __rfloordiv__(self, other): | |
x, y = self.__class__(other) | |
return self.__class__(int(x / self.x), int(y / self.y)) | |
def __rtruediv__(self, other): | |
return other // self | |
@dataclass(frozen=True) | |
class Patch: | |
id: str | |
slidename: str | |
position: Coord | |
level: int | |
size: Coord | |
size_0: Coord | |
parent: Optional["Patch"] = None | |
@classmethod | |
def get_fields(cls) -> List[str]: | |
return [ | |
"id", | |
"global_id", | |
"x", | |
"y", | |
"level", | |
"dx", | |
"dy", | |
"size_x", | |
"size_y", | |
"parent", | |
] | |
def to_csv_row(self) -> Dict[str, Union[str, int]]: | |
return { | |
"id": self.id, | |
"global_id": self.slidename + self.id, | |
"x": self.position[0], | |
"y": self.position[1], | |
"level": self.level, | |
"dx": self.size_0[0], | |
"dy": self.size_0[1], | |
"size_x": self.size[0], | |
"size_y": self.size[1], | |
"parent": "None" if self.parent is None else self.parent.id, | |
} | |
@classmethod | |
def from_csv_row(cls, row: Dict[str, Union[str, int]], slidename: str = None): | |
return cls( | |
id=row["id"], | |
slidename=slidename, | |
position=Coord(row["x"], row["y"]), | |
level=int(row["level"]), | |
size_0=Coord(row["dx"], row["dy"]), | |
size=Coord(row["size_x"], row["size_y"]) | |
) | |
Filter = Sequence[Union[str, Callable]] | |
FilterList = Union[str, Sequence[Filter], Dict[int, Sequence[Filter]]] | |
PathLike = Union[str, os.PathLike] | |
NDByteImage = NDArray[Any, numpy.uint8] | |
NDFloat32Image = NDArray[Any, numpy.float32] | |
NDFloat64Image = NDArray[Any, numpy.float64] | |
NDFloatImage = Union[NDFloat32Image, NDFloat64Image] | |
NDImage = Union[NDByteImage, NDFloatImage] | |
NDByteGrayImage = NDArray[Any, numpy.uint8] | |
NDFloat32GrayImage = NDArray[Any, numpy.float32] | |
NDFloat64GrayImage = NDArray[Any, numpy.float64] | |
NDFloatGrayImage = Union[NDFloat32GrayImage, NDFloat64GrayImage] | |
NDGrayImage = Union[NDByteGrayImage, NDFloatGrayImage] | |
NDBoolMask = NDArray[Any, numpy.uint8] | |
NDBoolMaskBatch = NDArray[Any, numpy.uint8] | |
NDIntMask2d = NDArray[Any, numpy.uint8] | |
NDIntMask3d = NDArray[Any, numpy.uint8] | |
NDIntMask4d = NDArray[Any, numpy.uint8] | |
NDByteImageBatch = NDArray[Any, numpy.uint8] | |
NDFloat32ImageBatch = NDArray[Any, numpy.float32] | |
NDFloat64ImageBatch = NDArray[Any, numpy.float64] | |
NDFloatImageBatch = Union[NDFloat32ImageBatch, NDFloat64ImageBatch] | |
NDImageBatch = Union[NDByteImageBatch, NDFloatImageBatch] | |
RefDataSet = Tuple[List, List] | |
SplitDataSet = Dict[Union[int, str], RefDataSet] | |
DataSet = Union[RefDataSet, SplitDataSet] | |
class Slide: | |
def __init__(self, path: PathLike, backend: str = "openslide"): | |
path = Path(path) | |
if backend == "openslide": | |
opener = OpenSlide | |
else: | |
if path.suffix not in (".svs", ".tif"): | |
warnings.warn( | |
"Cucim backend only works for svs and tiff, switching to openslide." | |
) | |
opener = OpenSlide | |
backend = "openslide" | |
else: | |
opener = CuImage | |
self._slide = opener(str(path)) | |
self.backend = backend | |
@property | |
def dimensions(self): | |
if self.backend == "openslide": | |
return self._slide.dimensions | |
else: | |
return self._slide.size("XY") | |
@property | |
def _filename(self): | |
if self.backend == "openslide": | |
return self._slide._filename | |
else: | |
return self._slide.metadata["cucim"]["path"] | |
def __getattr__(self, name): | |
try: | |
return getattr(self._slide, name) | |
except AttributeError as e: | |
if self.backend == "cucim" and name in self._slide.resolutions: | |
return self._slide.resolutions[name] | |
else: | |
raise AttributeError(e) | |
def get_best_level_for_downsample(self, downsample: float): | |
if self.backend == "openslide": | |
return self._slide.get_best_level_for_downsample(downsample) | |
else: | |
for i in range(1, self.level_count): | |
if downsample < self.level_downsamples[i]: | |
return max(0, i - 1) | |
return self.level_count - 1 | |
def read_region(self, location, level, size, **kwargs): | |
if self.backend == "openslide": | |
return self._slide.read_region(location, level, size) | |
else: | |
region = self._slide.read_region( | |
location=location, level=level, size=size, **kwargs | |
) | |
return Image.fromarray(numpy.asarray(region)).convert("RGBA") | |
def get_thumbnail(self, size: Coord): | |
if self.backend == "openslide": | |
return self._slide.get_thumbnail(size) | |
else: | |
dsr = max(*(dim / thumb for dim, thumb in zip(self.dimensions, size))) | |
level = self.get_best_level_for_downsample(dsr) | |
tile = self.read_region((0, 0), level, self.level_dimensions[level]) | |
# Apply on solid background | |
bg_color = "#ffffff" | |
thumb = Image.new("RGB", tile.size, bg_color) | |
thumb.paste(tile, None, tile) | |
thumb.thumbnail(size, Image.ANTIALIAS) | |
return thumb |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment