commit c2084a5166a73849ff8c648bd38f2475b1c9a1da Author: rmanach Date: Tue Oct 21 14:20:44 2025 +0200 init repository diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..298cfea --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +.mypy_cache +.ruff_cache + +venv +data + +*.log \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..3fdc0a4 --- /dev/null +++ b/Makefile @@ -0,0 +1,18 @@ +ROOT_DIR := $(dir $(realpath $(lastword $(MAKEFILE_LIST)))) +PYTHON := $(ROOT_DIR)venv/bin/python + +.PHONY: venv +venv: + @python3 -m venv venv + pip install -r requirements-dev.txt + +lint: + $(PYTHON) -m ruff check --fix + +format: + $(PYTHON) -m ruff format + +check-type: + $(PYTHON) -m mypy . + +check: format lint check-type \ No newline at end of file diff --git a/imgopti.py b/imgopti.py new file mode 100644 index 0000000..0fe7b41 --- /dev/null +++ b/imgopti.py @@ -0,0 +1,287 @@ +import logging +import mimetypes +import os +import subprocess +import sys +import time +from concurrent.futures import ProcessPoolExecutor +from dataclasses import dataclass, field +from datetime import datetime as dt +from enum import Enum, auto +from typing import Optional + +SRC_PATH = "../users/lea/pictures" +DEFAULT_MIMETYPE = "unknown" +DEFAULT_DEST_DIR = "data" +DEFAULT_NB_WORKERS = 10 + +JPEG_MIMETYPE = "image/jpeg" +PNG_MIMETYPE = "image/png" + + +class FileSizeRange(Enum): + TINY = auto() + MEDIUM = auto() + LARGE = auto() + FAT = auto() + + @classmethod + def from_size(cls, size: float) -> "FileSizeRange": + if size < 1: + return cls.TINY + + if size >= 1 and size < 2: + return cls.MEDIUM + + if size >= 2 and size < 5: + return cls.LARGE + + return cls.FAT + + def __str__(self): + match self: + case FileSizeRange.TINY: + return "tiny" + case FileSizeRange.MEDIUM: + return "medium" + case FileSizeRange.LARGE: + return "large" + case FileSizeRange.FAT: + return "fat" + + +@dataclass(slots=True, frozen=True) +class File: + directory: str + name: str + path: str + mime_type: str + size: float + size_range: FileSizeRange + modified: dt + + @classmethod + def from_directory(cls, directory: str, name: str) -> "File": + path = os.path.join(directory, name) + + mtype, _ = mimetypes.guess_type(path) + mime_type = mtype or DEFAULT_MIMETYPE + + size = os.path.getsize(path) / 1_048_576 + + return File( + directory, + name, + path, + mime_type, + size, + FileSizeRange.from_size(size), + dt.fromtimestamp(os.path.getmtime(path)), + ) + + def __repr__(self): + return f"" # noqa + + def _jpeg_opti(self, base_dest_dir: str) -> tuple["File", Optional["File"]] | None: + # remove ".." avoiding treat file in same dir + filepath = "/".join(self.path.split("/")[:-1]) + if filepath.startswith(".."): + filepath = filepath.lstrip("../") + + # replace all spaces in dir name + dest_dir = os.path.join(base_dest_dir, filepath).replace(" ", "_") + os.makedirs(dest_dir, exist_ok=True) + + cmd = f"jpegoptim -s -p -q '{self.path}' -d {dest_dir}" + logging.debug("optimization launched for file: %s -> %s", self, cmd) + try: + _ = subprocess.run(cmd, shell=True, check=True) + except subprocess.CalledProcessError as e: + logging.error("error while running command: %s, err: %s", cmd, e.output) + return self, None + except Exception: + logging.error( + "unexpected error while running command: %s", cmd, exc_info=True + ) + return None + + try: + f_opti = File.from_directory(dest_dir, self.name) + except Exception as e: + logging.debug("unable to get file: %s after optimization: %s", self, e) + return self, None + + return self, f_opti + + def opti(self, base_dest_dir: str) -> tuple["File", Optional["File"]] | None: + if self.mime_type == JPEG_MIMETYPE: + return self._jpeg_opti(base_dest_dir) + return None + + +@dataclass(slots=True) +class FileGroup: + mime_type: str + file_range: FileSizeRange + files: dict[str, File] = field(default_factory=dict) + size: float = 0 + _nb_files: int = 0 + + def __repr__(self): + return f"" # noqa + + def __len__(self): + return self._nb_files + + def add(self, file: File): + if self.files.get(file.path) is None: + self.files[file.path] = file + self._nb_files += 1 + self.size += file.size + + def get_size(self) -> float: + return self.size + + @staticmethod + def format_size(size: float) -> str: + if size < 1000: + return f"{size:.2f} Mb" + return f"{size / 1024:.2f} Gb" + + def get_size_formatted(self) -> str: + return FileGroup.format_size(self.size) + + def get_files(self) -> list[File]: + return list(self.files.values()) + + +@dataclass(slots=True, frozen=True) +class Dir: + path: str + nb_files: int + details: dict[str, dict[FileSizeRange, FileGroup]] + + def show(self): + data = [f"directory ({self.path}) details:"] + + for mime_type, group in self.details.items(): + nb_files = 0 + size = 0 + to_display = [f"* {mime_type}"] + + for file_range in group.keys(): + file_group = self.details[mime_type][file_range] + to_display.append( + f"\t{file_range:<8}{len(file_group):<8}{file_group.get_size_formatted()}" + ) + nb_files += len(self.details[mime_type][file_range]) + size += file_group.size + + to_display[0] += f" ({FileGroup.format_size(size)})" + + data.append("\n".join(to_display)) + + print("\n".join(data)) + + @classmethod + def from_path(cls, path: str) -> "Dir": + if not os.path.isdir(path): + raise Exception(f"Dir path: {path} must be a directory") + + nb_files = 0 + details: dict[str, dict[FileSizeRange, FileGroup]] = {} + for dirpath, _, filenames in os.walk(path): + for file in filenames: + file_path = os.path.join(dirpath, file) + try: + f = File.from_directory(dirpath, file) + except OSError as e: + logging.error("error accessing %s, err: %s", file_path, e) + continue + + if details.get(f.mime_type) is None: + details[f.mime_type] = {} + + if details[f.mime_type].get(f.size_range) is None: + details[f.mime_type][f.size_range] = FileGroup( + f.mime_type, f.size_range + ) + + details[f.mime_type][f.size_range].add(f) + nb_files += 1 + + return Dir(path, nb_files, details) + + def get_file_group( + self, mimetype: str, file_size: FileSizeRange + ) -> FileGroup | None: + if (mt := self.details.get(mimetype)) is not None: + return mt.get(file_size) + return None + + def get_files(self) -> list[File]: + files = [] + for details in self.details.values(): + for file_group in details.values(): + files.extend(file_group.get_files()) + return files + + +if __name__ == "__main__": + stdout_handler = logging.StreamHandler(stream=sys.stdout) + logging.basicConfig( + format="[%(levelname)s] - %(asctime)s - %(message)s", + level=logging.INFO, + handlers=(stdout_handler,), + ) + + d = Dir.from_path(SRC_PATH) + d.show() + + os.makedirs(DEFAULT_DEST_DIR, exist_ok=True) + + mtype = JPEG_MIMETYPE + frange = FileSizeRange.FAT + nb_workers = DEFAULT_NB_WORKERS + + fg = d.get_file_group(mtype, frange) + if fg is None: + logging.error( + "no files found for mimetype: %s and file size range: %s", mtype, frange + ) + exit(1) + + logging.info( + "launching optimization (%d) for %s and range %s on %d workers...", + len(fg), + mtype, + frange, + nb_workers, + ) + start = time.perf_counter() + + with ProcessPoolExecutor(nb_workers) as p: + futures = [p.submit(f.opti, DEFAULT_DEST_DIR) for f in fg.get_files()] + + fg_opti = FileGroup(mtype, frange) + optimized = 0 + for f in futures: + if (res := f.result()) and res is not None: + match res: + case (orig, None): + logging.debug(f"no optimization for file: {orig}") + fg_opti.add(orig) + case (orig, opti): + optimized += 1 + logging.debug( + f"optimization for file: {orig} -> {(1 - (opti.size / orig.size)) * 100:.2f}%" # noqa + ) + fg_opti.add(opti) + + logging.info(f"optimization finished in {time.perf_counter() - start:.2f}s") + + percent = (1 - (fg_opti.size / fg.size)) * 100 + size_gained = fg.size - fg_opti.size + logging.info( + f"total optimization ({optimized}/{len(fg)}): {percent:.2f}% -> {size_gained:.2f} Mb" # noqa + ) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..d6761ac --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,26 @@ +[project] +name = "imgopti" +dynamic = ["version"] +authors = [] +requires-python = ">= 3.10" + +[tool.ruff.lint] +select = ["E", "F", "I"] +ignore = [] + +[tool.ruff] +exclude = [ + "venv", +] + +line-length = 88 +target-version = "py311" + +[tool.ruff.lint.mccabe] +max-complexity = 10 + +[tool.mypy] +exclude = [ + "venv", +] +ignore_missing_imports = true diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..a13e5f6 --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,2 @@ +mypy==1.18.2 +ruff==0.14.1 \ No newline at end of file