|
#!/usr/bin/env python |
|
import argparse |
|
import gzip |
|
import numpy as np |
|
import matplotlib.pyplot as plt |
|
import os |
|
|
|
|
|
def compress_block(block: bytes) -> float: |
|
compressed = gzip.compress(block) |
|
ratio = ( |
|
len(block) / len(compressed) if compressed else float("inf") |
|
) # Prevent division by zero |
|
return ratio |
|
|
|
|
|
def is_gzipped(file_path: str) -> bool: |
|
with open(file_path, "rb") as f: |
|
return f.read(2) == b"\x1f\x8b" |
|
|
|
|
|
def calculate_compression_ratios(file_path: str, block_size: int) -> np.ndarray: |
|
compression_ratios = [] |
|
|
|
open_func = gzip.open if is_gzipped(file_path) else open |
|
|
|
with open_func(file_path, "rb") as f: |
|
while True: |
|
block = f.read(block_size) |
|
if not block: |
|
break |
|
ratio = compress_block(block) |
|
compression_ratios.append(ratio) |
|
|
|
# Convert the list of ratios to a 1D array |
|
ratios_array = np.array(compression_ratios) |
|
return ratios_array |
|
|
|
|
|
def log_transform_ratios(ratios_array: np.ndarray) -> np.ndarray: |
|
# Add a small constant to avoid log(0) |
|
small_constant = 1e-5 |
|
adjusted_ratios = ( |
|
ratios_array + small_constant - min(ratios_array) + 1 |
|
) # Ensure all values are positive |
|
log_ratios = np.log(adjusted_ratios) |
|
return log_ratios |
|
|
|
|
|
def z_score_standardize_ratios(ratios_array: np.ndarray) -> np.ndarray: |
|
mean = np.mean(ratios_array) |
|
std = np.std(ratios_array) |
|
standardized_ratios = (ratios_array - mean) / std |
|
return standardized_ratios |
|
|
|
|
|
def generate_heatmap( |
|
ratios_array: np.ndarray, output_image: str, cmap: str, transform: str = None |
|
) -> None: |
|
plt.figure(figsize=(10, 5)) |
|
plt.imshow(ratios_array.reshape(1, -1), cmap=cmap, aspect="auto") |
|
plt.colorbar() |
|
plt.title("Heatmap of Compression Ratios") |
|
plt.xlabel("Block Number") |
|
ylabel = "Compression Ratio" |
|
if transform == "log": |
|
ylabel = "Log of Compression Ratio" |
|
elif transform == "zscore": |
|
ylabel = "Z-Score of Compression Ratio" |
|
plt.ylabel(ylabel) |
|
plt.savefig(output_image) |
|
plt.close() |
|
|
|
|
|
def save_ratios_tsv( |
|
ratios_array: np.ndarray, output_filename: str, block_size: int |
|
) -> None: |
|
tsv_file_name = os.path.join( |
|
os.getcwd(), output_filename + "_compression_ratios.tsv.gz" |
|
) |
|
|
|
# Save the compression ratios as a gzip compressed TSV file |
|
with gzip.open(tsv_file_name, "wt") as f: |
|
f.write(f"#block_size={block_size}\n") |
|
for i, ratio in enumerate(ratios_array): |
|
f.write(f"{i}\t{ratio}\n") |
|
|
|
print(f"Compression ratios saved as: {tsv_file_name}") |
|
|
|
|
|
if __name__ == "__main__": |
|
parser = argparse.ArgumentParser( |
|
description="Generate a heatmap and TSV of compression ratios for a file." |
|
) |
|
parser.add_argument("file", type=str, help="File to compress and analyze") |
|
parser.add_argument( |
|
"-o", |
|
"--output", |
|
type=str, |
|
help="Output image filename (PNG format)", |
|
default=None, |
|
) |
|
parser.add_argument( |
|
"-b", |
|
"--block_size", |
|
type=int, |
|
default=1024, |
|
help="Block size for compression (default: 1024)", |
|
) |
|
parser.add_argument( |
|
"-w", |
|
"--window_size", |
|
type=int, |
|
default=1, |
|
help="Window size to average compression ratios (default: 1)", |
|
) |
|
parser.add_argument( |
|
"-c", |
|
"--cmap", |
|
type=str, |
|
default="rainbow", |
|
help="Matplotlib colormap for the heatmap (default: rainbow)", |
|
) |
|
parser.add_argument( |
|
"-t", |
|
"--transform", |
|
type=str, |
|
default=None, |
|
help='Apply a transformation to the ratios (options: "log", "zscore")', |
|
) |
|
|
|
args = parser.parse_args() |
|
|
|
file_path = args.file |
|
block_size = args.block_size |
|
window_size = args.window_size |
|
cmap = args.cmap |
|
|
|
base_filename = os.path.splitext(os.path.basename(file_path))[0] |
|
|
|
if args.output: |
|
output_image = args.output |
|
else: |
|
output_image = ( |
|
f"{os.path.splitext(base_filename)[0]}_compression_ratio_heatmap.png" |
|
) |
|
|
|
ratios_array = calculate_compression_ratios(file_path, block_size) |
|
|
|
output_tsv_filename = ( |
|
base_filename |
|
if args.output is None |
|
else os.path.splitext(os.path.basename(args.output))[0] |
|
) |
|
|
|
save_ratios_tsv(ratios_array, output_tsv_filename, block_size) |
|
|
|
if args.transform == "log": |
|
ratios_array = log_transform_ratios(ratios_array) |
|
elif args.transform == "zscore": |
|
ratios_array = z_score_standardize_ratios(ratios_array) |
|
|
|
if window_size > 1: |
|
ratios_array = np.convolve( |
|
ratios_array, np.ones(window_size) / window_size, mode="valid" |
|
) |
|
|
|
generate_heatmap(ratios_array, output_image, cmap, args.transform) |
|
print(f"Heatmap saved as: {output_image}") |
It might make more sense for the default block size to be 32000, matching the default gzip block size. For gzip, it probably doesn't make sense for the block size to be >32000 (since blocks are compressed independently and this isn't configurable in the Python stdlib gzip).