diff --git a/.gitignore b/.gitignore index 298cfea..b429544 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,8 @@ .ruff_cache venv +venv.dist data +dist *.log \ No newline at end of file diff --git a/Makefile b/Makefile index 3fdc0a4..0c675c5 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,13 @@ ROOT_DIR := $(dir $(realpath $(lastword $(MAKEFILE_LIST)))) PYTHON := $(ROOT_DIR)venv/bin/python +PYTHONCLI := $(ROOT_DIR)venv.dist/bin/python + +SRC_NAME = optimg .PHONY: venv venv: @python3 -m venv venv - pip install -r requirements-dev.txt + $(PYTHON) -m pip install -r requirements-dev.txt lint: $(PYTHON) -m ruff check --fix @@ -15,4 +18,17 @@ format: check-type: $(PYTHON) -m mypy . -check: format lint check-type \ No newline at end of file +check: format lint check-type + +build: check + @rm -rf dist/* + $(PYTHON) -m hatch -v build -t wheel + +install: build + @python3 -m venv venv.dist + $(PYTHONCLI) -m pip install dist/$(SRC_NAME)-*.whl --force-reinstall + +documentation: + $(PYTHON) -m pdoc --html -o docs src/ --force + @find docs/src/* -type f -exec sed -i 's/src\./$(SRC_NAME)\./g' {} \; -exec sed -i 's/srcsrc<\/code>/$(SRC_NAME)<\/code>/g' docs/src/index.html \ No newline at end of file diff --git a/imgopti.py b/imgopti.py deleted file mode 100644 index 0fe7b41..0000000 --- a/imgopti.py +++ /dev/null @@ -1,287 +0,0 @@ -import logging -import mimetypes -import os -import subprocess -import sys -import time -from concurrent.futures import ProcessPoolExecutor -from dataclasses import dataclass, field -from datetime import datetime as dt -from enum import Enum, auto -from typing import Optional - -SRC_PATH = "../users/lea/pictures" -DEFAULT_MIMETYPE = "unknown" -DEFAULT_DEST_DIR = "data" -DEFAULT_NB_WORKERS = 10 - -JPEG_MIMETYPE = "image/jpeg" -PNG_MIMETYPE = "image/png" - - -class FileSizeRange(Enum): - TINY = auto() - MEDIUM = auto() - LARGE = auto() - FAT = auto() - - @classmethod - def from_size(cls, size: float) -> "FileSizeRange": - if size < 1: - return cls.TINY - - if size >= 1 and size < 2: - return cls.MEDIUM - - if size >= 2 and size < 5: - return cls.LARGE - - return cls.FAT - - def __str__(self): - match self: - case FileSizeRange.TINY: - return "tiny" - case FileSizeRange.MEDIUM: - return "medium" - case FileSizeRange.LARGE: - return "large" - case FileSizeRange.FAT: - return "fat" - - -@dataclass(slots=True, frozen=True) -class File: - directory: str - name: str - path: str - mime_type: str - size: float - size_range: FileSizeRange - modified: dt - - @classmethod - def from_directory(cls, directory: str, name: str) -> "File": - path = os.path.join(directory, name) - - mtype, _ = mimetypes.guess_type(path) - mime_type = mtype or DEFAULT_MIMETYPE - - size = os.path.getsize(path) / 1_048_576 - - return File( - directory, - name, - path, - mime_type, - size, - FileSizeRange.from_size(size), - dt.fromtimestamp(os.path.getmtime(path)), - ) - - def __repr__(self): - return f"" # noqa - - def _jpeg_opti(self, base_dest_dir: str) -> tuple["File", Optional["File"]] | None: - # remove ".." avoiding treat file in same dir - filepath = "/".join(self.path.split("/")[:-1]) - if filepath.startswith(".."): - filepath = filepath.lstrip("../") - - # replace all spaces in dir name - dest_dir = os.path.join(base_dest_dir, filepath).replace(" ", "_") - os.makedirs(dest_dir, exist_ok=True) - - cmd = f"jpegoptim -s -p -q '{self.path}' -d {dest_dir}" - logging.debug("optimization launched for file: %s -> %s", self, cmd) - try: - _ = subprocess.run(cmd, shell=True, check=True) - except subprocess.CalledProcessError as e: - logging.error("error while running command: %s, err: %s", cmd, e.output) - return self, None - except Exception: - logging.error( - "unexpected error while running command: %s", cmd, exc_info=True - ) - return None - - try: - f_opti = File.from_directory(dest_dir, self.name) - except Exception as e: - logging.debug("unable to get file: %s after optimization: %s", self, e) - return self, None - - return self, f_opti - - def opti(self, base_dest_dir: str) -> tuple["File", Optional["File"]] | None: - if self.mime_type == JPEG_MIMETYPE: - return self._jpeg_opti(base_dest_dir) - return None - - -@dataclass(slots=True) -class FileGroup: - mime_type: str - file_range: FileSizeRange - files: dict[str, File] = field(default_factory=dict) - size: float = 0 - _nb_files: int = 0 - - def __repr__(self): - return f"" # noqa - - def __len__(self): - return self._nb_files - - def add(self, file: File): - if self.files.get(file.path) is None: - self.files[file.path] = file - self._nb_files += 1 - self.size += file.size - - def get_size(self) -> float: - return self.size - - @staticmethod - def format_size(size: float) -> str: - if size < 1000: - return f"{size:.2f} Mb" - return f"{size / 1024:.2f} Gb" - - def get_size_formatted(self) -> str: - return FileGroup.format_size(self.size) - - def get_files(self) -> list[File]: - return list(self.files.values()) - - -@dataclass(slots=True, frozen=True) -class Dir: - path: str - nb_files: int - details: dict[str, dict[FileSizeRange, FileGroup]] - - def show(self): - data = [f"directory ({self.path}) details:"] - - for mime_type, group in self.details.items(): - nb_files = 0 - size = 0 - to_display = [f"* {mime_type}"] - - for file_range in group.keys(): - file_group = self.details[mime_type][file_range] - to_display.append( - f"\t{file_range:<8}{len(file_group):<8}{file_group.get_size_formatted()}" - ) - nb_files += len(self.details[mime_type][file_range]) - size += file_group.size - - to_display[0] += f" ({FileGroup.format_size(size)})" - - data.append("\n".join(to_display)) - - print("\n".join(data)) - - @classmethod - def from_path(cls, path: str) -> "Dir": - if not os.path.isdir(path): - raise Exception(f"Dir path: {path} must be a directory") - - nb_files = 0 - details: dict[str, dict[FileSizeRange, FileGroup]] = {} - for dirpath, _, filenames in os.walk(path): - for file in filenames: - file_path = os.path.join(dirpath, file) - try: - f = File.from_directory(dirpath, file) - except OSError as e: - logging.error("error accessing %s, err: %s", file_path, e) - continue - - if details.get(f.mime_type) is None: - details[f.mime_type] = {} - - if details[f.mime_type].get(f.size_range) is None: - details[f.mime_type][f.size_range] = FileGroup( - f.mime_type, f.size_range - ) - - details[f.mime_type][f.size_range].add(f) - nb_files += 1 - - return Dir(path, nb_files, details) - - def get_file_group( - self, mimetype: str, file_size: FileSizeRange - ) -> FileGroup | None: - if (mt := self.details.get(mimetype)) is not None: - return mt.get(file_size) - return None - - def get_files(self) -> list[File]: - files = [] - for details in self.details.values(): - for file_group in details.values(): - files.extend(file_group.get_files()) - return files - - -if __name__ == "__main__": - stdout_handler = logging.StreamHandler(stream=sys.stdout) - logging.basicConfig( - format="[%(levelname)s] - %(asctime)s - %(message)s", - level=logging.INFO, - handlers=(stdout_handler,), - ) - - d = Dir.from_path(SRC_PATH) - d.show() - - os.makedirs(DEFAULT_DEST_DIR, exist_ok=True) - - mtype = JPEG_MIMETYPE - frange = FileSizeRange.FAT - nb_workers = DEFAULT_NB_WORKERS - - fg = d.get_file_group(mtype, frange) - if fg is None: - logging.error( - "no files found for mimetype: %s and file size range: %s", mtype, frange - ) - exit(1) - - logging.info( - "launching optimization (%d) for %s and range %s on %d workers...", - len(fg), - mtype, - frange, - nb_workers, - ) - start = time.perf_counter() - - with ProcessPoolExecutor(nb_workers) as p: - futures = [p.submit(f.opti, DEFAULT_DEST_DIR) for f in fg.get_files()] - - fg_opti = FileGroup(mtype, frange) - optimized = 0 - for f in futures: - if (res := f.result()) and res is not None: - match res: - case (orig, None): - logging.debug(f"no optimization for file: {orig}") - fg_opti.add(orig) - case (orig, opti): - optimized += 1 - logging.debug( - f"optimization for file: {orig} -> {(1 - (opti.size / orig.size)) * 100:.2f}%" # noqa - ) - fg_opti.add(opti) - - logging.info(f"optimization finished in {time.perf_counter() - start:.2f}s") - - percent = (1 - (fg_opti.size / fg.size)) * 100 - size_gained = fg.size - fg_opti.size - logging.info( - f"total optimization ({optimized}/{len(fg)}): {percent:.2f}% -> {size_gained:.2f} Mb" # noqa - ) diff --git a/pyproject.toml b/pyproject.toml index d6761ac..45a2452 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,8 +1,27 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + [project] -name = "imgopti" +name = "optimg" dynamic = ["version"] -authors = [] -requires-python = ">= 3.10" +description = "Optimize JPEG and PNG files from a directory." +dependencies = [] + +[project.scripts] +optimg = "optimg.__main__:main" + +[tool.hatch.version] +path = "src/__init__.py" + +[tool.hatch.build.targets.wheel] +packages = ["src"] + +[tool.hatch.build.targets.sdist] +only-include = ["src"] + +[tool.hatch.build.targets.wheel.sources] +"src" = "optimg" [tool.ruff.lint] select = ["E", "F", "I"] @@ -23,4 +42,4 @@ max-complexity = 10 exclude = [ "venv", ] -ignore_missing_imports = true +ignore_missing_imports = true \ No newline at end of file diff --git a/requirements-dev.txt b/requirements-dev.txt index a13e5f6..e24ffc0 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,2 +1,5 @@ mypy==1.18.2 -ruff==0.14.1 \ No newline at end of file +ruff==0.14.1 +hatch==1.12.0 +pdoc3==0.11.6 +twine==6.2.0 \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..1cf6267 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1 @@ +VERSION = "0.1.0" diff --git a/src/__main__.py b/src/__main__.py new file mode 100644 index 0000000..6d0a797 --- /dev/null +++ b/src/__main__.py @@ -0,0 +1,142 @@ +import argparse +import logging +import os +import sys +import time +from concurrent.futures import ProcessPoolExecutor + +from .files import Directory, FileGroup, FileImgMimetype, FileSizeRange + +DEFAULT_DEST_DIR = "data" + + +def main(): + stdout_handler = logging.StreamHandler(stream=sys.stdout) + logging.basicConfig( + format="[%(levelname)s] - %(asctime)s - %(message)s", + level=logging.INFO, + handlers=(stdout_handler,), + ) + + parser = argparse.ArgumentParser( + "optimg", description="Optimize JPEG and PNG files from a directory." + ) + parser.add_argument("src", type=str, help="Directory path to scan.") + parser.add_argument( + "--mimetype", + type=str, + choices=FileImgMimetype._member_names_, + default=None, + help="Filter by mimetype. Default is all.", + ) + parser.add_argument( + "--size", + type=str, + choices=FileSizeRange._member_names_, + default=None, + help="Filter by file size. Default is all.", + ) + parser.add_argument( + "--dest", + type=str, + help="Base destination directory of optimized files.", + ) + parser.add_argument( + "--workers", + type=int, + default=4, + help="Number of workers used to optimize files.", + ) + parser.add_argument( + "--show", + action="store_true", + default=False, + help="Details the directory by mimetypes and quit.", + ) + parser.add_argument( + "--debug", + action="store_true", + default=False, + help="Set log level to debug. Default is info.", + ) + args = parser.parse_args() + + if args.debug: + logging.root.setLevel(logging.DEBUG) + logging.debug("set debug mode on") + + try: + directory = Directory.from_path(args.src) + except Exception as e: + logging.error(e) + exit(1) + + if args.show: + directory.show() + exit(0) + + try: + mimetype = ( + FileImgMimetype.from_str(args.mimetype) + if args.mimetype is not None + else None + ) + size_range = ( + FileSizeRange.from_str(args.size) if args.size is not None else None + ) + except Exception as e: + logging.error(f"unexpected error occurred while parsing arguments: {e}") + exit(1) + + nb_workers = args.workers + dest_dir = args.dest or DEFAULT_DEST_DIR + os.makedirs(dest_dir, exist_ok=True) + + fg = directory.get_file_group(mimetype, size_range) + if not len(fg): + logging.info( + "no files found for mimetype: %s and file size range: %s", + mimetype, + size_range, + ) + exit(0) + + logging.info( + "launching optimization (%d) for type: %s and size range: %s on %d workers...", + len(fg), + mimetype.value if mimetype is not None else "all", + size_range.value if size_range is not None else "all", + nb_workers, + ) + + start = time.perf_counter() + + with ProcessPoolExecutor(nb_workers) as p: + futures = [p.submit(f.opti, dest_dir) for f in fg.get_files()] + + fg_opti = FileGroup() + optimized = 0 + for f in futures: + if (res := f.result()) and res is not None: + match res: + case (orig, None): + logging.debug(f"no optimization for file: {orig}") + fg_opti.add(orig) + case (orig, opti): + optimized += 1 + logging.debug( + f"optimization for file: {orig} -> {(1 - (opti.size / orig.size)) * 100:.2f}%" # noqa + ) + fg_opti.add(opti) + + logging.info(f"optimization finished in {time.perf_counter() - start:.2f}s") + + percent = (1 - (fg_opti._size / fg._size)) * 100 + size_gained = fg._size - fg_opti._size + logging.info( + f"total optimization ({optimized}/{len(fg)}): {percent:.2f}% -> {size_gained:.2f} Mb" # noqa + ) + + +if __name__ == "__main__": + main() diff --git a/src/files.py b/src/files.py new file mode 100644 index 0000000..a3607a7 --- /dev/null +++ b/src/files.py @@ -0,0 +1,255 @@ +import logging +import mimetypes +import os +import subprocess +from dataclasses import dataclass, field +from datetime import datetime as dt +from enum import Enum +from typing import Optional + +DEFAULT_MIMETYPE = "unknown" + + +class FileImgMimetype(Enum): + JPEG = "image/jpeg" + PNG = "image/png" + + @classmethod + def from_str(cls, value: str) -> "FileImgMimetype": + match value.upper(): + case FileImgMimetype.JPEG.name: + return FileImgMimetype.JPEG + case FileImgMimetype.PNG.name: + return FileImgMimetype.PNG + raise Exception(f"unable to parse file img mimetype: {value}") + + +class FileSizeRange(Enum): + TINY = "TINY" + MEDIUM = "MEDIUM" + LARGE = "LARGE" + FAT = "FAT" + + @classmethod + def from_str(cls, value: str) -> "FileSizeRange": + match value.upper(): + case FileSizeRange.TINY.name: + return FileSizeRange.TINY + case FileSizeRange.MEDIUM.name: + return FileSizeRange.MEDIUM + case FileSizeRange.LARGE.name: + return FileSizeRange.LARGE + case FileSizeRange.FAT.name: + return FileSizeRange.FAT + raise Exception(f"unable to parse file size range: {value}") + + @classmethod + def from_size(cls, size: float) -> "FileSizeRange": + if size < 1: + return cls.TINY + + if size >= 1 and size < 2: + return cls.MEDIUM + + if size >= 2 and size < 5: + return cls.LARGE + + return cls.FAT + + +@dataclass(slots=True, frozen=True) +class File: + directory: str + name: str + path: str + mimetype: str + size: float + modified: dt + + @classmethod + def from_directory(cls, directory: str, name: str) -> "File": + path = os.path.join(directory, name) + + mimetype, _ = mimetypes.guess_type(path) + mimetype = mimetype or DEFAULT_MIMETYPE + + size = os.path.getsize(path) / 1_048_576 + + return File( + directory, + name, + path, + mimetype, + size, + dt.fromtimestamp(os.path.getmtime(path)), + ) + + def __repr__(self): + return f"" # noqa + + def _jpeg_opti(self, base_dest_dir: str) -> tuple["File", Optional["File"]] | None: + # remove ".." avoiding treat file in same dir + filepath = "/".join(self.path.split("/")[:-1]) + if filepath.startswith(".."): + filepath = filepath.lstrip("..") + + # replace all spaces in dir name for readability + dest_dir = os.path.join(base_dest_dir, filepath.lstrip("/")).replace(" ", "_") + os.makedirs(dest_dir, exist_ok=True) + + cmd = f"jpegoptim -s -p -q '{self.path}' -d {dest_dir}" + logging.debug("optimization launched for file: %s -> %s", self, cmd) + try: + _ = subprocess.run(cmd, shell=True, check=True) + except subprocess.CalledProcessError as e: + logging.error("error while running command: %s, err: %s", cmd, e.output) + return None + except Exception: + logging.error( + "unexpected error while running command: %s", cmd, exc_info=True + ) + return None + + try: + f_opti = File.from_directory(dest_dir, self.name) + except Exception as e: + logging.debug("unable to get file: %s after optimization: %s", self, e) + return self, None + + return self, f_opti + + def opti(self, base_dest_dir: str) -> tuple["File", Optional["File"]] | None: + if self.mimetype == FileImgMimetype.JPEG.value: + return self._jpeg_opti(base_dest_dir) + return None + + +@dataclass(slots=True) +class FileGroup: + _files: dict[str, File] = field(default_factory=dict) + _size: float = 0 + _nb_files: int = 0 + + def __repr__(self): + return f"" # noqa + + def __len__(self): + return self._nb_files + + def add(self, file: File): + if self._files.get(file.path) is None: + self._files[file.path] = file + self._nb_files += 1 + self._size += file.size + + def get_size(self) -> float: + return self._size + + def join(self, right: "FileGroup"): + for filepath, file in right._files.items(): + if self._files.get(filepath) is None: + self._files[filepath] = file + self._size += file.size + self._nb_files += 1 + + @staticmethod + def format_size(size: float) -> str: + if size < 1000: + return f"{size:.2f} Mb" + return f"{size / 1024:.2f} Gb" + + def get_size_formatted(self) -> str: + return FileGroup.format_size(self._size) + + def get_files(self) -> list[File]: + return list(self._files.values()) + + +@dataclass(slots=True, frozen=True) +class Directory: + path: str + nb_files: int + details: dict[str, dict[FileSizeRange, FileGroup]] + + def __len__(self): + return self.nb_files + + def show(self): + data = [f"directory ({self.path}) details:"] + + for mimetype, group in self.details.items(): + nb_files = 0 + size = 0 + to_display = [f"* {mimetype}"] + + for file_range in group.keys(): + file_group = self.details[mimetype][file_range] + to_display.append( + f"\t{file_range.value:<8}{len(file_group):<8}{file_group.get_size_formatted()}" + ) + nb_files += len(self.details[mimetype][file_range]) + size += file_group._size + + to_display[0] += f" ({FileGroup.format_size(size)})" + + data.append("\n".join(to_display)) + + print("\n".join(data)) + + @classmethod + def from_path(cls, path: str) -> "Directory": + if not os.path.isdir(path): + raise Exception(f"Directory path: {path} must be a directory") + + nb_files = 0 + details: dict[str, dict[FileSizeRange, FileGroup]] = {} + for dirpath, _, filenames in os.walk(path): + for file in filenames: + file_path = os.path.join(dirpath, file) + try: + f = File.from_directory(dirpath, file) + except OSError as e: + logging.error("error accessing %s, err: %s", file_path, e) + continue + + if details.get(f.mimetype) is None: + details[f.mimetype] = {} + + size_range = FileSizeRange.from_size(f.size) + if details[f.mimetype].get(size_range) is None: + details[f.mimetype][size_range] = FileGroup() + + details[f.mimetype][size_range].add(f) + nb_files += 1 + + return Directory(path, nb_files, details) + + def get_file_group( + self, + mimetype: FileImgMimetype | None = None, + size_range: FileSizeRange | None = None, + ) -> FileGroup: + file_group = FileGroup() + match (mimetype, size_range): + case (None, None): + file_group = self.get_all() + case (mimetype, None): + if dict_file_range := self.details.get(mimetype.value): # type: ignore + for fg in dict_file_range.values(): + file_group.join(fg) + case (None, size_range): + for dict_file_range in self.details.values(): + if fg := dict_file_range.get(size_range): # type: ignore + file_group.join(fg) + case (mimetype, size_range): + if dict_file_range := self.details.get(mimetype.value): + if fg := dict_file_range.get(size_range): # type: ignore + file_group.join(fg) + return file_group + + def get_all(self) -> FileGroup: + file_group = FileGroup() + for details in self.details.values(): + for fg in details.values(): + file_group.join(fg) + return file_group