diff --git a/src/__init__.py b/src/__init__.py index 1cf6267..5c3eca0 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -1 +1,5 @@ +from .optimizer import ImgOptimizer, OptimizerResult + +__all__ = ["ImgOptimizer", "OptimizerResult"] + VERSION = "0.1.0" diff --git a/src/__main__.py b/src/__main__.py index 14e4052..dab2660 100644 --- a/src/__main__.py +++ b/src/__main__.py @@ -108,8 +108,19 @@ def main(): nb_workers, ) - optimizer = ImgOptimizer(dest_dir, args.workers) - result = optimizer.optimize(fg) + optimizer = ImgOptimizer.init(dest_dir, args.workers) + + try: + result = optimizer.optimize(fg) + except KeyboardInterrupt: + logging.info("optimizer stopped gracefully") + exit(0) + except Exception as e: + logging.fatal( + f"unexpected error occurred while optimizing, err: {e}", exc_info=True + ) + exit(1) + (optimized, percent, size) = result.stats() logging.info( diff --git a/src/files.py b/src/files.py index a3607a7..e7ab77a 100644 --- a/src/files.py +++ b/src/files.py @@ -1,11 +1,11 @@ import logging import mimetypes import os -import subprocess from dataclasses import dataclass, field from datetime import datetime as dt from enum import Enum -from typing import Optional + +__all__ = ["FileImgMimetype", "FileSizeRange", "File", "FileGroup", "Directory"] DEFAULT_MIMETYPE = "unknown" @@ -25,6 +25,15 @@ class FileImgMimetype(Enum): class FileSizeRange(Enum): + """ + Categorized files by their size in megabytes. + + * TINY: [0,1[ Mb + * MEDIUM: [1,2[ Mb + * LARGE: [2,5[ Mb + * FAT: [5,inf[ Mb + """ + TINY = "TINY" MEDIUM = "MEDIUM" LARGE = "LARGE" @@ -59,6 +68,15 @@ class FileSizeRange(Enum): @dataclass(slots=True, frozen=True) class File: + """ + Handle file main attributes. + + Example: + ```python + file = File.from_directory("dir-path", "my-file-name.png") + ``` + """ + directory: str name: str path: str @@ -87,45 +105,14 @@ class File: def __repr__(self): return f"" # noqa - def _jpeg_opti(self, base_dest_dir: str) -> tuple["File", Optional["File"]] | None: - # remove ".." avoiding treat file in same dir - filepath = "/".join(self.path.split("/")[:-1]) - if filepath.startswith(".."): - filepath = filepath.lstrip("..") - - # replace all spaces in dir name for readability - dest_dir = os.path.join(base_dest_dir, filepath.lstrip("/")).replace(" ", "_") - os.makedirs(dest_dir, exist_ok=True) - - cmd = f"jpegoptim -s -p -q '{self.path}' -d {dest_dir}" - logging.debug("optimization launched for file: %s -> %s", self, cmd) - try: - _ = subprocess.run(cmd, shell=True, check=True) - except subprocess.CalledProcessError as e: - logging.error("error while running command: %s, err: %s", cmd, e.output) - return None - except Exception: - logging.error( - "unexpected error while running command: %s", cmd, exc_info=True - ) - return None - - try: - f_opti = File.from_directory(dest_dir, self.name) - except Exception as e: - logging.debug("unable to get file: %s after optimization: %s", self, e) - return self, None - - return self, f_opti - - def opti(self, base_dest_dir: str) -> tuple["File", Optional["File"]] | None: - if self.mimetype == FileImgMimetype.JPEG.value: - return self._jpeg_opti(base_dest_dir) - return None - @dataclass(slots=True) class FileGroup: + """ + Group a bunch of `File`. That's all. + Only useful to provide number of file and the whole size in Mb quickly. + """ + _files: dict[str, File] = field(default_factory=dict) _size: float = 0 _nb_files: int = 0 @@ -146,6 +133,7 @@ class FileGroup: return self._size def join(self, right: "FileGroup"): + """Include the whole `FileGroup` to its own.""" for filepath, file in right._files.items(): if self._files.get(filepath) is None: self._files[filepath] = file @@ -167,6 +155,22 @@ class FileGroup: @dataclass(slots=True, frozen=True) class Directory: + """ + Represents a directory path grouping files by mimetype and size range. + + Example: + ```python + directory = Directory.from_path("my-path") + fg = directory.get_file_group() # collect all files + + # collect all tiny files of the directory + fg_tiny = directory.get_file_group(size_range=FileSizeRange.TINY) + + # collect all JPEG files + fg_jpeg = directory.get_file_group(mimetype=FileImgMimetype.JPEG) + ``` + """ + path: str nb_files: int details: dict[str, dict[FileSizeRange, FileGroup]] @@ -175,6 +179,9 @@ class Directory: return self.nb_files def show(self): + """ + Display the whole directory files grouped by mimetype and size range. + """ data = [f"directory ({self.path}) details:"] for mimetype, group in self.details.items(): diff --git a/src/optimizer.py b/src/optimizer.py index d13eaec..f8143e0 100644 --- a/src/optimizer.py +++ b/src/optimizer.py @@ -1,5 +1,6 @@ import logging import os +import signal import subprocess import time from concurrent.futures import ProcessPoolExecutor @@ -8,14 +9,76 @@ from typing import Optional from .files import File, FileGroup, FileImgMimetype +__all__ = ["ImgOptimizer", "OptimizerResult"] + + +# TODO(rmanach): add argument to set the size or leave it empty for loseless optim +def _jpeg_optim(dest_dir: str, file: File) -> tuple["File", Optional["File"]] | None: + """ + Optimize the `file` with `jpegoptim` and put the result in + `dest_dir` directory keeping file path. + """ + # remove ".." avoiding treat file in same dir + filepath = "/".join(file.path.split("/")[:-1]) + if filepath.startswith(".."): + filepath = filepath.lstrip("..") + + # replace all spaces in dir name for readability + dest_dir = os.path.join(dest_dir, filepath.lstrip("/")).replace(" ", "_") + os.makedirs(dest_dir, exist_ok=True) + + # use "-S k" to set maximum size in kilobytes + cmd = f"jpegoptim -s -p -q -S 1024k '{file.path}' -d {dest_dir}" + logging.debug("optimization launched for file: %s -> %s", file, cmd) + try: + _ = subprocess.run(cmd, shell=True, check=True) + except subprocess.CalledProcessError as e: + logging.error("error while running command: %s, err: %s", cmd, e.output) + return None + except Exception: + logging.error("unexpected error while running command: %s", cmd, exc_info=True) + return None + + try: + file_optim = File.from_directory(dest_dir, file.name) + except Exception as e: + logging.debug("unable to get file: %s after optimization: %s", file, e) + return file, None + + return file, file_optim + + +def _optim(dest_dir: str, file: File) -> tuple["File", Optional["File"]] | None: + """ + Entry point of `file` optimization selection the handler. + NOTE: Must be launched in separated process. + """ + # ignore interrupt signal, catch by multiprocess executor + signal.signal(signal.SIGINT, signal.SIG_IGN) + if file.mimetype == FileImgMimetype.JPEG.value: + return _jpeg_optim(dest_dir, file) + return None + @dataclass(slots=True, frozen=True) class OptimizerResult: + """ + Optimization result. + Handle the original `FileGroup` and + the optimized `FileGroup`. + """ + orig: FileGroup opti: FileGroup optimized: int def stats(self) -> tuple[int, float, float]: + """ + Returns the basics statistics of the optimization. + + Returns: + tuple: (number of file optimized, percent of size gained, size gained in Mb) + """ percent = (1 - (self.opti._size / self.orig._size)) * 100 size = self.orig._size - self.opti._size return (self.optimized, percent, size) @@ -23,51 +86,49 @@ class OptimizerResult: @dataclass(slots=True, frozen=True) class ImgOptimizer: + """ + Wraps the optimization of JPEG and PNG files + using `jpegoptim` and `optipng` on process pool. + + Example: + ```python + optimizer = ImgOptimizer("mypath") + optimizer.optimize() + ``` + """ + dest_dir: str - nb_workers: int = 5 + _pool: ProcessPoolExecutor + _orig_sigint_handler = signal.getsignal(signal.SIGINT) - def _jpeg_optim(self, file: File) -> tuple["File", Optional["File"]] | None: - # remove ".." avoiding treat file in same dir - filepath = "/".join(file.path.split("/")[:-1]) - if filepath.startswith(".."): - filepath = filepath.lstrip("..") + @classmethod + def init(cls, dest_dir: str, nb_workers: int = 5) -> "ImgOptimizer": + return ImgOptimizer(dest_dir, ProcessPoolExecutor(nb_workers)) - # replace all spaces in dir name for readability - dest_dir = os.path.join(self.dest_dir, filepath.lstrip("/")).replace(" ", "_") - os.makedirs(dest_dir, exist_ok=True) + def stop(self): + logging.warning("stopping optimizer...") + self._pool.shutdown(wait=True, cancel_futures=True) - # use "-S k" to set maximum size in kilobytes - cmd = f"jpegoptim -s -p -q -S 1024k '{file.path}' -d {dest_dir}" - logging.debug("optimization launched for file: %s -> %s", self, cmd) + def _sigint_handler(self, signum, frame): + logging.warning("interrupt signal received, stoppping optimizer...") + signal.signal(signal.SIGINT, self._orig_sigint_handler) try: - _ = subprocess.run(cmd, shell=True, check=True) - except subprocess.CalledProcessError as e: - logging.error("error while running command: %s, err: %s", cmd, e.output) - return None - except Exception: - logging.error( - "unexpected error while running command: %s", cmd, exc_info=True - ) - return None - - try: - file_optim = File.from_directory(dest_dir, file.name) + self.stop() except Exception as e: - logging.debug("unable to get file: %s after optimization: %s", file, e) - return file, None - - return file, file_optim - - def _optim(self, file: File) -> tuple["File", Optional["File"]] | None: - if file.mimetype == FileImgMimetype.JPEG.value: - return self._jpeg_optim(file) - return None + logging.debug( + "error occurred while stopping optimizer: %s", e, exc_info=True + ) + pass + raise KeyboardInterrupt def optimize(self, file_group: FileGroup) -> OptimizerResult: + signal.signal(signal.SIGINT, self._sigint_handler) start = time.perf_counter() - with ProcessPoolExecutor(self.nb_workers) as p: - futures = [p.submit(self._optim, f) for f in file_group.get_files()] + with self._pool as p: + futures = [ + p.submit(_optim, self.dest_dir, f) for f in file_group.get_files() + ] file_group_optim = FileGroup() optimized = 0