fix signal interrupt handler + add doc

This commit is contained in:
rmanach 2025-10-22 10:44:33 +02:00
parent 3a1c994b1e
commit db484c5216
4 changed files with 158 additions and 75 deletions

View File

@ -1 +1,5 @@
from .optimizer import ImgOptimizer, OptimizerResult
__all__ = ["ImgOptimizer", "OptimizerResult"]
VERSION = "0.1.0" VERSION = "0.1.0"

View File

@ -108,8 +108,19 @@ def main():
nb_workers, nb_workers,
) )
optimizer = ImgOptimizer(dest_dir, args.workers) optimizer = ImgOptimizer.init(dest_dir, args.workers)
result = optimizer.optimize(fg)
try:
result = optimizer.optimize(fg)
except KeyboardInterrupt:
logging.info("optimizer stopped gracefully")
exit(0)
except Exception as e:
logging.fatal(
f"unexpected error occurred while optimizing, err: {e}", exc_info=True
)
exit(1)
(optimized, percent, size) = result.stats() (optimized, percent, size) = result.stats()
logging.info( logging.info(

View File

@ -1,11 +1,11 @@
import logging import logging
import mimetypes import mimetypes
import os import os
import subprocess
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime as dt from datetime import datetime as dt
from enum import Enum from enum import Enum
from typing import Optional
__all__ = ["FileImgMimetype", "FileSizeRange", "File", "FileGroup", "Directory"]
DEFAULT_MIMETYPE = "unknown" DEFAULT_MIMETYPE = "unknown"
@ -25,6 +25,15 @@ class FileImgMimetype(Enum):
class FileSizeRange(Enum): class FileSizeRange(Enum):
"""
Categorized files by their size in megabytes.
* TINY: [0,1[ Mb
* MEDIUM: [1,2[ Mb
* LARGE: [2,5[ Mb
* FAT: [5,inf[ Mb
"""
TINY = "TINY" TINY = "TINY"
MEDIUM = "MEDIUM" MEDIUM = "MEDIUM"
LARGE = "LARGE" LARGE = "LARGE"
@ -59,6 +68,15 @@ class FileSizeRange(Enum):
@dataclass(slots=True, frozen=True) @dataclass(slots=True, frozen=True)
class File: class File:
"""
Handle file main attributes.
Example:
```python
file = File.from_directory("dir-path", "my-file-name.png")
```
"""
directory: str directory: str
name: str name: str
path: str path: str
@ -87,45 +105,14 @@ class File:
def __repr__(self): def __repr__(self):
return f"<FILE name={self.name} | dir={self.directory} | size={self.size:.2f} Mb | mimetype={self.mimetype}>" # noqa return f"<FILE name={self.name} | dir={self.directory} | size={self.size:.2f} Mb | mimetype={self.mimetype}>" # noqa
def _jpeg_opti(self, base_dest_dir: str) -> tuple["File", Optional["File"]] | None:
# remove ".." avoiding treat file in same dir
filepath = "/".join(self.path.split("/")[:-1])
if filepath.startswith(".."):
filepath = filepath.lstrip("..")
# replace all spaces in dir name for readability
dest_dir = os.path.join(base_dest_dir, filepath.lstrip("/")).replace(" ", "_")
os.makedirs(dest_dir, exist_ok=True)
cmd = f"jpegoptim -s -p -q '{self.path}' -d {dest_dir}"
logging.debug("optimization launched for file: %s -> %s", self, cmd)
try:
_ = subprocess.run(cmd, shell=True, check=True)
except subprocess.CalledProcessError as e:
logging.error("error while running command: %s, err: %s", cmd, e.output)
return None
except Exception:
logging.error(
"unexpected error while running command: %s", cmd, exc_info=True
)
return None
try:
f_opti = File.from_directory(dest_dir, self.name)
except Exception as e:
logging.debug("unable to get file: %s after optimization: %s", self, e)
return self, None
return self, f_opti
def opti(self, base_dest_dir: str) -> tuple["File", Optional["File"]] | None:
if self.mimetype == FileImgMimetype.JPEG.value:
return self._jpeg_opti(base_dest_dir)
return None
@dataclass(slots=True) @dataclass(slots=True)
class FileGroup: class FileGroup:
"""
Group a bunch of `File`. That's all.
Only useful to provide number of file and the whole size in Mb quickly.
"""
_files: dict[str, File] = field(default_factory=dict) _files: dict[str, File] = field(default_factory=dict)
_size: float = 0 _size: float = 0
_nb_files: int = 0 _nb_files: int = 0
@ -146,6 +133,7 @@ class FileGroup:
return self._size return self._size
def join(self, right: "FileGroup"): def join(self, right: "FileGroup"):
"""Include the whole `FileGroup` to its own."""
for filepath, file in right._files.items(): for filepath, file in right._files.items():
if self._files.get(filepath) is None: if self._files.get(filepath) is None:
self._files[filepath] = file self._files[filepath] = file
@ -167,6 +155,22 @@ class FileGroup:
@dataclass(slots=True, frozen=True) @dataclass(slots=True, frozen=True)
class Directory: class Directory:
"""
Represents a directory path grouping files by mimetype and size range.
Example:
```python
directory = Directory.from_path("my-path")
fg = directory.get_file_group() # collect all files
# collect all tiny files of the directory
fg_tiny = directory.get_file_group(size_range=FileSizeRange.TINY)
# collect all JPEG files
fg_jpeg = directory.get_file_group(mimetype=FileImgMimetype.JPEG)
```
"""
path: str path: str
nb_files: int nb_files: int
details: dict[str, dict[FileSizeRange, FileGroup]] details: dict[str, dict[FileSizeRange, FileGroup]]
@ -175,6 +179,9 @@ class Directory:
return self.nb_files return self.nb_files
def show(self): def show(self):
"""
Display the whole directory files grouped by mimetype and size range.
"""
data = [f"directory ({self.path}) details:"] data = [f"directory ({self.path}) details:"]
for mimetype, group in self.details.items(): for mimetype, group in self.details.items():

View File

@ -1,5 +1,6 @@
import logging import logging
import os import os
import signal
import subprocess import subprocess
import time import time
from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ProcessPoolExecutor
@ -8,14 +9,76 @@ from typing import Optional
from .files import File, FileGroup, FileImgMimetype from .files import File, FileGroup, FileImgMimetype
__all__ = ["ImgOptimizer", "OptimizerResult"]
# TODO(rmanach): add argument to set the size or leave it empty for loseless optim
def _jpeg_optim(dest_dir: str, file: File) -> tuple["File", Optional["File"]] | None:
"""
Optimize the `file` with `jpegoptim` and put the result in
`dest_dir` directory keeping file path.
"""
# remove ".." avoiding treat file in same dir
filepath = "/".join(file.path.split("/")[:-1])
if filepath.startswith(".."):
filepath = filepath.lstrip("..")
# replace all spaces in dir name for readability
dest_dir = os.path.join(dest_dir, filepath.lstrip("/")).replace(" ", "_")
os.makedirs(dest_dir, exist_ok=True)
# use "-S <i>k" to set maximum size in kilobytes
cmd = f"jpegoptim -s -p -q -S 1024k '{file.path}' -d {dest_dir}"
logging.debug("optimization launched for file: %s -> %s", file, cmd)
try:
_ = subprocess.run(cmd, shell=True, check=True)
except subprocess.CalledProcessError as e:
logging.error("error while running command: %s, err: %s", cmd, e.output)
return None
except Exception:
logging.error("unexpected error while running command: %s", cmd, exc_info=True)
return None
try:
file_optim = File.from_directory(dest_dir, file.name)
except Exception as e:
logging.debug("unable to get file: %s after optimization: %s", file, e)
return file, None
return file, file_optim
def _optim(dest_dir: str, file: File) -> tuple["File", Optional["File"]] | None:
"""
Entry point of `file` optimization selection the handler.
NOTE: Must be launched in separated process.
"""
# ignore interrupt signal, catch by multiprocess executor
signal.signal(signal.SIGINT, signal.SIG_IGN)
if file.mimetype == FileImgMimetype.JPEG.value:
return _jpeg_optim(dest_dir, file)
return None
@dataclass(slots=True, frozen=True) @dataclass(slots=True, frozen=True)
class OptimizerResult: class OptimizerResult:
"""
Optimization result.
Handle the original `FileGroup` and
the optimized `FileGroup`.
"""
orig: FileGroup orig: FileGroup
opti: FileGroup opti: FileGroup
optimized: int optimized: int
def stats(self) -> tuple[int, float, float]: def stats(self) -> tuple[int, float, float]:
"""
Returns the basics statistics of the optimization.
Returns:
tuple: (number of file optimized, percent of size gained, size gained in Mb)
"""
percent = (1 - (self.opti._size / self.orig._size)) * 100 percent = (1 - (self.opti._size / self.orig._size)) * 100
size = self.orig._size - self.opti._size size = self.orig._size - self.opti._size
return (self.optimized, percent, size) return (self.optimized, percent, size)
@ -23,51 +86,49 @@ class OptimizerResult:
@dataclass(slots=True, frozen=True) @dataclass(slots=True, frozen=True)
class ImgOptimizer: class ImgOptimizer:
"""
Wraps the optimization of JPEG and PNG files
using `jpegoptim` and `optipng` on process pool.
Example:
```python
optimizer = ImgOptimizer("mypath")
optimizer.optimize()
```
"""
dest_dir: str dest_dir: str
nb_workers: int = 5 _pool: ProcessPoolExecutor
_orig_sigint_handler = signal.getsignal(signal.SIGINT)
def _jpeg_optim(self, file: File) -> tuple["File", Optional["File"]] | None: @classmethod
# remove ".." avoiding treat file in same dir def init(cls, dest_dir: str, nb_workers: int = 5) -> "ImgOptimizer":
filepath = "/".join(file.path.split("/")[:-1]) return ImgOptimizer(dest_dir, ProcessPoolExecutor(nb_workers))
if filepath.startswith(".."):
filepath = filepath.lstrip("..")
# replace all spaces in dir name for readability def stop(self):
dest_dir = os.path.join(self.dest_dir, filepath.lstrip("/")).replace(" ", "_") logging.warning("stopping optimizer...")
os.makedirs(dest_dir, exist_ok=True) self._pool.shutdown(wait=True, cancel_futures=True)
# use "-S <i>k" to set maximum size in kilobytes def _sigint_handler(self, signum, frame):
cmd = f"jpegoptim -s -p -q -S 1024k '{file.path}' -d {dest_dir}" logging.warning("interrupt signal received, stoppping optimizer...")
logging.debug("optimization launched for file: %s -> %s", self, cmd) signal.signal(signal.SIGINT, self._orig_sigint_handler)
try: try:
_ = subprocess.run(cmd, shell=True, check=True) self.stop()
except subprocess.CalledProcessError as e:
logging.error("error while running command: %s, err: %s", cmd, e.output)
return None
except Exception:
logging.error(
"unexpected error while running command: %s", cmd, exc_info=True
)
return None
try:
file_optim = File.from_directory(dest_dir, file.name)
except Exception as e: except Exception as e:
logging.debug("unable to get file: %s after optimization: %s", file, e) logging.debug(
return file, None "error occurred while stopping optimizer: %s", e, exc_info=True
)
return file, file_optim pass
raise KeyboardInterrupt
def _optim(self, file: File) -> tuple["File", Optional["File"]] | None:
if file.mimetype == FileImgMimetype.JPEG.value:
return self._jpeg_optim(file)
return None
def optimize(self, file_group: FileGroup) -> OptimizerResult: def optimize(self, file_group: FileGroup) -> OptimizerResult:
signal.signal(signal.SIGINT, self._sigint_handler)
start = time.perf_counter() start = time.perf_counter()
with ProcessPoolExecutor(self.nb_workers) as p: with self._pool as p:
futures = [p.submit(self._optim, f) for f in file_group.get_files()] futures = [
p.submit(_optim, self.dest_dir, f) for f in file_group.get_files()
]
file_group_optim = FileGroup() file_group_optim = FileGroup()
optimized = 0 optimized = 0