import logging import mimetypes import os import subprocess import sys import time from concurrent.futures import ProcessPoolExecutor from dataclasses import dataclass, field from datetime import datetime as dt from enum import Enum, auto from typing import Optional SRC_PATH = "../users/lea/pictures" DEFAULT_MIMETYPE = "unknown" DEFAULT_DEST_DIR = "data" DEFAULT_NB_WORKERS = 10 JPEG_MIMETYPE = "image/jpeg" PNG_MIMETYPE = "image/png" class FileSizeRange(Enum): TINY = auto() MEDIUM = auto() LARGE = auto() FAT = auto() @classmethod def from_size(cls, size: float) -> "FileSizeRange": if size < 1: return cls.TINY if size >= 1 and size < 2: return cls.MEDIUM if size >= 2 and size < 5: return cls.LARGE return cls.FAT def __str__(self): match self: case FileSizeRange.TINY: return "tiny" case FileSizeRange.MEDIUM: return "medium" case FileSizeRange.LARGE: return "large" case FileSizeRange.FAT: return "fat" @dataclass(slots=True, frozen=True) class File: directory: str name: str path: str mime_type: str size: float size_range: FileSizeRange modified: dt @classmethod def from_directory(cls, directory: str, name: str) -> "File": path = os.path.join(directory, name) mtype, _ = mimetypes.guess_type(path) mime_type = mtype or DEFAULT_MIMETYPE size = os.path.getsize(path) / 1_048_576 return File( directory, name, path, mime_type, size, FileSizeRange.from_size(size), dt.fromtimestamp(os.path.getmtime(path)), ) def __repr__(self): return f"" # noqa def _jpeg_opti(self, base_dest_dir: str) -> tuple["File", Optional["File"]] | None: # remove ".." avoiding treat file in same dir filepath = "/".join(self.path.split("/")[:-1]) if filepath.startswith(".."): filepath = filepath.lstrip("../") # replace all spaces in dir name dest_dir = os.path.join(base_dest_dir, filepath).replace(" ", "_") os.makedirs(dest_dir, exist_ok=True) cmd = f"jpegoptim -s -p -q '{self.path}' -d {dest_dir}" logging.debug("optimization launched for file: %s -> %s", self, cmd) try: _ = subprocess.run(cmd, shell=True, check=True) except subprocess.CalledProcessError as e: logging.error("error while running command: %s, err: %s", cmd, e.output) return self, None except Exception: logging.error( "unexpected error while running command: %s", cmd, exc_info=True ) return None try: f_opti = File.from_directory(dest_dir, self.name) except Exception as e: logging.debug("unable to get file: %s after optimization: %s", self, e) return self, None return self, f_opti def opti(self, base_dest_dir: str) -> tuple["File", Optional["File"]] | None: if self.mime_type == JPEG_MIMETYPE: return self._jpeg_opti(base_dest_dir) return None @dataclass(slots=True) class FileGroup: mime_type: str file_range: FileSizeRange files: dict[str, File] = field(default_factory=dict) size: float = 0 _nb_files: int = 0 def __repr__(self): return f"" # noqa def __len__(self): return self._nb_files def add(self, file: File): if self.files.get(file.path) is None: self.files[file.path] = file self._nb_files += 1 self.size += file.size def get_size(self) -> float: return self.size @staticmethod def format_size(size: float) -> str: if size < 1000: return f"{size:.2f} Mb" return f"{size / 1024:.2f} Gb" def get_size_formatted(self) -> str: return FileGroup.format_size(self.size) def get_files(self) -> list[File]: return list(self.files.values()) @dataclass(slots=True, frozen=True) class Dir: path: str nb_files: int details: dict[str, dict[FileSizeRange, FileGroup]] def show(self): data = [f"directory ({self.path}) details:"] for mime_type, group in self.details.items(): nb_files = 0 size = 0 to_display = [f"* {mime_type}"] for file_range in group.keys(): file_group = self.details[mime_type][file_range] to_display.append( f"\t{file_range:<8}{len(file_group):<8}{file_group.get_size_formatted()}" ) nb_files += len(self.details[mime_type][file_range]) size += file_group.size to_display[0] += f" ({FileGroup.format_size(size)})" data.append("\n".join(to_display)) print("\n".join(data)) @classmethod def from_path(cls, path: str) -> "Dir": if not os.path.isdir(path): raise Exception(f"Dir path: {path} must be a directory") nb_files = 0 details: dict[str, dict[FileSizeRange, FileGroup]] = {} for dirpath, _, filenames in os.walk(path): for file in filenames: file_path = os.path.join(dirpath, file) try: f = File.from_directory(dirpath, file) except OSError as e: logging.error("error accessing %s, err: %s", file_path, e) continue if details.get(f.mime_type) is None: details[f.mime_type] = {} if details[f.mime_type].get(f.size_range) is None: details[f.mime_type][f.size_range] = FileGroup( f.mime_type, f.size_range ) details[f.mime_type][f.size_range].add(f) nb_files += 1 return Dir(path, nb_files, details) def get_file_group( self, mimetype: str, file_size: FileSizeRange ) -> FileGroup | None: if (mt := self.details.get(mimetype)) is not None: return mt.get(file_size) return None def get_files(self) -> list[File]: files = [] for details in self.details.values(): for file_group in details.values(): files.extend(file_group.get_files()) return files if __name__ == "__main__": stdout_handler = logging.StreamHandler(stream=sys.stdout) logging.basicConfig( format="[%(levelname)s] - %(asctime)s - %(message)s", level=logging.INFO, handlers=(stdout_handler,), ) d = Dir.from_path(SRC_PATH) d.show() os.makedirs(DEFAULT_DEST_DIR, exist_ok=True) mtype = JPEG_MIMETYPE frange = FileSizeRange.FAT nb_workers = DEFAULT_NB_WORKERS fg = d.get_file_group(mtype, frange) if fg is None: logging.error( "no files found for mimetype: %s and file size range: %s", mtype, frange ) exit(1) logging.info( "launching optimization (%d) for %s and range %s on %d workers...", len(fg), mtype, frange, nb_workers, ) start = time.perf_counter() with ProcessPoolExecutor(nb_workers) as p: futures = [p.submit(f.opti, DEFAULT_DEST_DIR) for f in fg.get_files()] fg_opti = FileGroup(mtype, frange) optimized = 0 for f in futures: if (res := f.result()) and res is not None: match res: case (orig, None): logging.debug(f"no optimization for file: {orig}") fg_opti.add(orig) case (orig, opti): optimized += 1 logging.debug( f"optimization for file: {orig} -> {(1 - (opti.size / orig.size)) * 100:.2f}%" # noqa ) fg_opti.add(opti) logging.info(f"optimization finished in {time.perf_counter() - start:.2f}s") percent = (1 - (fg_opti.size / fg.size)) * 100 size_gained = fg.size - fg_opti.size logging.info( f"total optimization ({optimized}/{len(fg)}): {percent:.2f}% -> {size_gained:.2f} Mb" # noqa )