Source code for jetraw_tools.compression_tool

import os
import logging
import numpy as np
import tifffile
import locale
import multiprocessing
from typing import Optional

# Local package imports
from .dpcore import load_parameters
from .utils import prepare_images, add_extension, create_compress_folder
from .tiff_writer import imwrite, metadata_writer
from .image_reader import ImageReader
from .logger import logger


[docs] class CompressionTool: """ A tool for compressing and decompressing images using the JetRaw algorithm. Handles batch processing of images with metadata preservation options. :param calibration_file: Path to the JetRaw calibration file :type calibration_file: str, optional :param identifier: Camera identifier for the compression settings :type identifier: str, optional :param ncores: Number of CPU cores to use (0 for auto-detection) :type ncores: int, optional :param omit_processed: Skip files that have already been processed :type omit_processed: bool, optional :param verbose: Enable detailed logging output :type verbose: bool, optional :raises FileNotFoundError: If the specified calibration file doesn't exist """
[docs] def __init__( self, calibration_file: str = None, identifier: str = "", ncores=0, omit_processed: bool = True, verbose: bool = False, metadata_format: str = "ome", ): """:no-index:""" # Check if calibration file exists if calibration_file is not None and not os.path.exists(calibration_file): logger.error( f"\033[91m\033[1mCalibration file not found:\033[0m\033[91m {calibration_file}\033[0m" ) raise FileNotFoundError( f"Calibration file does not exist: {calibration_file}" ) self.calibration_file = calibration_file self.identifier = identifier self.ncores = ncores self.omit_processed = omit_processed self.verbose = verbose self.metadata_format = metadata_format if verbose: logger.setLevel(logging.DEBUG)
[docs] def list_files(self, folder_path: str, image_extension: str) -> list: """ List all files in a folder with a specific extension. :param folder_path: The path to the folder. :param image_extension: The image file extension. :return: A list of image files. """ if os.path.isfile(folder_path) and folder_path.endswith(image_extension): image_files = [folder_path] else: image_files = [ f for f in os.listdir(folder_path) if f.endswith(image_extension) ] if len(image_files) == 0: logger.info(f"No file found in the folder with extension {image_extension}") return image_files
[docs] def remove_files(self, output_tiff_filename: str, input_filename: str) -> None: """ Remove original file after successful compression. Only removes if the compressed file exists and is at least 5% of the original size. :param output_tiff_filename: The output TIFF filename. :param input_filename: The input filename. """ # Verify that the new file exist with size > 5%original before removal if os.path.exists(output_tiff_filename): original_size = os.path.getsize(input_filename) compressed_size = os.path.getsize(output_tiff_filename) if compressed_size > 0.05 * original_size: os.remove(input_filename)
[docs] def compress_image( self, img_map: np.ndarray, target_file: str, metadata: dict, ome_bool: bool = True, metadata_json: bool = True, ) -> bool: """ Compress an image using JetRaw algorithm. :param img_map: NumPy array containing the image data :param target_file: Output path for the compressed file :param metadata: Dictionary containing image metadata :param ome_bool: Save metadata in OME format :param metadata_json: Additionally save metadata as JSON :return: True if compression was successful """ # Prepare input image locale.setlocale(locale.LC_ALL, locale.getlocale()) img_map = np.ascontiguousarray(img_map, dtype=img_map.dtype) load_parameters(self.calibration_file) prepare_images(img_map, identifier=self.identifier) # Compress input image to JetRaw compressed TIFF format imwrite(target_file, img_map, description="") if metadata: if not ome_bool: imageJ_metadata = True else: imageJ_metadata = False metadata_writer( target_file, metadata=metadata, ome_bool=ome_bool, imagej=imageJ_metadata, as_json=metadata_json, ) logger.debug(f"Successfully compressed image to: {target_file}") return True
[docs] def decompress_image( self, img_map: np.ndarray, target_file: str, metadata: dict, ome_bool: bool = True, metadata_json: bool = False, ) -> bool: """ Decompress a JetRaw image to standard TIFF. :param img_map: NumPy array containing the image data :param target_file: Output path for the decompressed file :param metadata: Dictionary containing image metadata :param ome_bool: Save metadata in OME format :param metadata_json: Additionally save metadata as JSON :return: True if decompression was successful """ with tifffile.TiffWriter(target_file) as tif: tif.write(img_map) if metadata: if not ome_bool: imageJ_metadata = True else: imageJ_metadata = False metadata_writer( target_file, metadata=metadata, ome_bool=ome_bool, imagej=imageJ_metadata, as_json=metadata_json, ) return True
[docs] def process_image( self, folder_path: str, output_folder: str, image_file: str, mode: str, image_extension: str, process_metadata: bool, ome_bool: bool, metadata_json: bool, remove_source: bool, progress_info: tuple, ) -> int: """ Process a single image for compression or decompression. Worker function used by the parallel processing pool. :param folder_path: The path to the folder containing the image. :param output_folder: The path to the folder where the processed image will be saved. :param image_file: The name of the image file to process. :param mode: The mode, either "compress" or "decompress". :param image_extension: The image file extension. :param process_metadata: Whether to process metadata. :param ome_bool: Whether to use OME metadata. :param metadata_json: Whether to write metadata as JSON. :param remove_source: Whether to remove the source files after processing. :param progress_info: The total number of files to process. :return: None """ if self.verbose: logger.info( f"Processing {image_file}... (File {progress_info[0]} of {progress_info[1]})" ) # Input/output files input_filename = os.path.join(folder_path, image_file) output_filename = os.path.join(output_folder, image_file) output_filename = add_extension( output_filename, image_extension, mode=mode, ome=ome_bool ) failed_files = 0 try: # Read image (and metadata only if requested) image_reader = ImageReader( input_filename, image_extension, metadata_format=self.metadata_format, read_metadata=process_metadata, ) img_map, metadata = image_reader.read_image() if metadata is None: metadata = {} if mode == "compress": self.compress_image( img_map, output_filename, metadata, ome_bool=ome_bool, metadata_json=metadata_json, ) elif mode == "decompress": self.decompress_image( img_map, output_filename, metadata, ome_bool=ome_bool, metadata_json=False, ) else: error_msg = f"Mode {mode} is not supported. Please use 'compress' or 'decompress'." logger.error(error_msg) raise ValueError(error_msg) if remove_source: self.remove_files(output_filename, input_filename) except Exception as e: failed_files += 1 logger.error(f"Error processing {image_file}: {e}") return failed_files
[docs] def process_folder( self, folder_path: str, mode: str = "compress", image_extension: str = ".tiff", process_metadata: bool = True, ome_bool: Optional[bool] = None, metadata_json: bool = True, remove_source: bool = False, target_folder: str = None, # New parameter for target folder ) -> bool: """ Process a folder of images using parallel processing. :param folder_path: The path to the folder. :param mode: The mode, either "compress" or "decompress". :param image_extension: The image file extension. :param process_metadata: Whether to process metadata. :param ome_bool: Whether to write OME metadata. If None (default), derived from the instance's metadata_format ('ome' -> True, 'imagej' -> False). :param metadata_json: Whether to write metadata as JSON. :param remove_source: Whether to remove the source files. :param target_folder: Optional target folder for processed images. """ # Derive ome_bool from metadata_format when not explicitly passed if ome_bool is None: ome_bool = self.metadata_format == "ome" # Create or use the output folder (with check if it exists) if target_folder: output_folder = os.path.abspath(target_folder) if not os.path.exists(output_folder): os.makedirs(output_folder) else: if mode == "decompress": suffix = "_decompressed" else: suffix = "_compressed" if os.path.isdir(folder_path): output_folder = create_compress_folder(folder_path, suffix=suffix) else: output_folder = folder_path logger.debug(f"Using output directory: {output_folder}") image_files = self.list_files(folder_path, image_extension) removed_count = 0 if self.omit_processed: processed_files = set() # Only list directory contents if output_folder is actually a directory if os.path.isdir(output_folder): for file in os.listdir(output_folder): base_name, _ = os.path.splitext(file) processed_files.add(base_name) original_count = len(image_files) image_files = [ file for file in image_files if os.path.splitext(file)[0] not in processed_files ] removed_count = original_count - len(image_files) total_files = len(image_files) if self.verbose: logger.info(f"Total files to process: {total_files}") logger.info(f"Files already processed: {removed_count}") # Create a pool of worker processes if self.ncores > 0: pool = multiprocessing.Pool(processes=self.ncores) else: num_processes = multiprocessing.cpu_count() pool = multiprocessing.Pool(processes=num_processes) # Prepare arguments for the worker function worker_args = [ ( folder_path, output_folder, image_file, mode, image_extension, process_metadata, ome_bool, metadata_json, remove_source, (index + 1, total_files), ) for index, image_file in enumerate(image_files) ] # Run the worker function in parallel results = pool.starmap(self.process_image, worker_args) # Close the pool and wait for all tasks to complete pool.close() pool.join() if self.verbose: logger.info(f"Processed {len(image_files)} images") failed = sum(results) success_files = len(image_files) - failed logger.info( f"{success_files} files processed correctly and {failed} images failed to process" ) return True