split optimize func in dedicated file

This commit is contained in:
rmanach 2025-10-21 17:46:34 +02:00
parent f5772f2783
commit 3a1c994b1e
2 changed files with 94 additions and 27 deletions

View File

@ -2,10 +2,9 @@ import argparse
import logging
import os
import sys
import time
from concurrent.futures import ProcessPoolExecutor
from .files import Directory, FileGroup, FileImgMimetype, FileSizeRange
from .files import Directory, FileImgMimetype, FileSizeRange
from .optimizer import ImgOptimizer
DEFAULT_DEST_DIR = "data"
@ -109,32 +108,12 @@ def main():
nb_workers,
)
start = time.perf_counter()
optimizer = ImgOptimizer(dest_dir, args.workers)
result = optimizer.optimize(fg)
(optimized, percent, size) = result.stats()
with ProcessPoolExecutor(nb_workers) as p:
futures = [p.submit(f.opti, dest_dir) for f in fg.get_files()]
fg_opti = FileGroup()
optimized = 0
for f in futures:
if (res := f.result()) and res is not None:
match res:
case (orig, None):
logging.debug(f"no optimization for file: {orig}")
fg_opti.add(orig)
case (orig, opti):
optimized += 1
logging.debug(
f"optimization for file: {orig} -> {(1 - (opti.size / orig.size)) * 100:.2f}%" # noqa
)
fg_opti.add(opti)
logging.info(f"optimization finished in {time.perf_counter() - start:.2f}s")
percent = (1 - (fg_opti._size / fg._size)) * 100
size_gained = fg._size - fg_opti._size
logging.info(
f"total optimization ({optimized}/{len(fg)}): {percent:.2f}% -> {size_gained:.2f} Mb" # noqa
f"total optimization ({optimized}/{len(result.orig)}): {percent:.2f}% -> {size:.2f} Mb" # noqa
)

88
src/optimizer.py Normal file
View File

@ -0,0 +1,88 @@
import logging
import os
import subprocess
import time
from concurrent.futures import ProcessPoolExecutor
from dataclasses import dataclass
from typing import Optional
from .files import File, FileGroup, FileImgMimetype
@dataclass(slots=True, frozen=True)
class OptimizerResult:
orig: FileGroup
opti: FileGroup
optimized: int
def stats(self) -> tuple[int, float, float]:
percent = (1 - (self.opti._size / self.orig._size)) * 100
size = self.orig._size - self.opti._size
return (self.optimized, percent, size)
@dataclass(slots=True, frozen=True)
class ImgOptimizer:
dest_dir: str
nb_workers: int = 5
def _jpeg_optim(self, file: File) -> tuple["File", Optional["File"]] | None:
# remove ".." avoiding treat file in same dir
filepath = "/".join(file.path.split("/")[:-1])
if filepath.startswith(".."):
filepath = filepath.lstrip("..")
# replace all spaces in dir name for readability
dest_dir = os.path.join(self.dest_dir, filepath.lstrip("/")).replace(" ", "_")
os.makedirs(dest_dir, exist_ok=True)
# use "-S <i>k" to set maximum size in kilobytes
cmd = f"jpegoptim -s -p -q -S 1024k '{file.path}' -d {dest_dir}"
logging.debug("optimization launched for file: %s -> %s", self, cmd)
try:
_ = subprocess.run(cmd, shell=True, check=True)
except subprocess.CalledProcessError as e:
logging.error("error while running command: %s, err: %s", cmd, e.output)
return None
except Exception:
logging.error(
"unexpected error while running command: %s", cmd, exc_info=True
)
return None
try:
file_optim = File.from_directory(dest_dir, file.name)
except Exception as e:
logging.debug("unable to get file: %s after optimization: %s", file, e)
return file, None
return file, file_optim
def _optim(self, file: File) -> tuple["File", Optional["File"]] | None:
if file.mimetype == FileImgMimetype.JPEG.value:
return self._jpeg_optim(file)
return None
def optimize(self, file_group: FileGroup) -> OptimizerResult:
start = time.perf_counter()
with ProcessPoolExecutor(self.nb_workers) as p:
futures = [p.submit(self._optim, f) for f in file_group.get_files()]
file_group_optim = FileGroup()
optimized = 0
for f in futures:
if (res := f.result()) and res is not None:
match res:
case (orig, None):
logging.debug(f"no optimization for file: {orig}")
file_group_optim.add(orig)
case (orig, opti):
optimized += 1
logging.debug(
f"optimization for file: {orig} -> {(1 - (opti.size / orig.size)) * 100:.2f}%" # noqa
)
file_group_optim.add(opti)
logging.info(f"optimization finished in {time.perf_counter() - start:.2f}s")
return OptimizerResult(file_group, file_group_optim, optimized)