Compare commits

...

1 Commits

Author SHA1 Message Date
f79668195b handle optim errors 2025-10-23 17:00:01 +02:00
3 changed files with 66 additions and 25 deletions

4
.gitignore vendored
View File

@ -8,4 +8,6 @@ data
dist
docs
*.log
*.log
*.err*
*.swp

View File

@ -126,6 +126,7 @@ def main():
logging.info(
f"total optimization ({optimized}/{len(result.orig)}): {percent:.2f}% -> {size:.2f} Mb" # noqa
)
result.check_errors()
if __name__ == "__main__":

View File

@ -4,7 +4,7 @@ import signal
import subprocess
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
from dataclasses import dataclass
from dataclasses import dataclass, field
from typing import Optional
from tqdm import tqdm
@ -15,7 +15,9 @@ __all__ = ["ImgOptimizer", "OptimizerResult"]
# TODO(rmanach): add argument to set the size or leave it empty for loseless optim
def _jpeg_optim(dest_dir: str, file: File) -> tuple["File", Optional["File"]] | None:
def _jpeg_optim(
dest_dir: str, file: File
) -> tuple["File", Optional["File"], str | None]:
"""
Optimize the `file` with `jpegoptim` and put the result in
`dest_dir` directory keeping file path.
@ -35,22 +37,26 @@ def _jpeg_optim(dest_dir: str, file: File) -> tuple["File", Optional["File"]] |
try:
_ = subprocess.run(cmd, shell=True, check=True)
except subprocess.CalledProcessError as e:
logging.error("error while running command: %s, err: %s", cmd, e.stderr or "?")
return None
logging.debug("error while running command: %s, err: %s", cmd, e.stderr or "?")
return file, None, f"command: {cmd} failed, err: {e.stderr}"
except Exception:
logging.error("unexpected error while running command: %s", cmd, exc_info=True)
return None
logging.debug("unexpected error while running command: %s", cmd, exc_info=True)
return (
file,
None,
f"command: {cmd} failed unexpectedly, set debug mode for more details",
)
try:
file_optim = File.from_directory(dest_dir, file.name)
except Exception as e:
logging.debug("unable to get file: %s after optimization: %s", file, e)
return file, None
return file, None, None
return file, file_optim
return file, file_optim, None
def _optim(dest_dir: str, file: File) -> tuple["File", Optional["File"]] | None:
def _optim(dest_dir: str, file: File) -> tuple["File", Optional["File"], str | None]:
"""
Entry point of `file` optimization selection the handler.
NOTE: Must be launched in separated process.
@ -59,7 +65,27 @@ def _optim(dest_dir: str, file: File) -> tuple["File", Optional["File"]] | None:
signal.signal(signal.SIGINT, signal.SIG_IGN)
if file.mimetype == FileImgMimetype.JPEG.value:
return _jpeg_optim(dest_dir, file)
return None
return file, None, f"mimetype: {file.mimetype} not supported for optimization"
@dataclass(slots=True, frozen=True)
class OptimizerErrors:
_data: dict[str, str] = field(default_factory=dict)
def save(self):
if not len(self._data):
return
with open("optimg.err.csv", "w") as f:
f.write("filepath;err\n")
for filepath, error in self._data.items():
f.write(f"{filepath};{error}\n")
def has_errors(self) -> bool:
return len(self._data) > 0
def add(self, filepath: str, error: str):
self._data[filepath] = error
@dataclass(slots=True, frozen=True)
@ -73,6 +99,7 @@ class OptimizerResult:
orig: FileGroup
opti: FileGroup
optimized: int
err: OptimizerErrors
def stats(self) -> tuple[int, float, float]:
"""
@ -85,6 +112,13 @@ class OptimizerResult:
size = self.orig._size - self.opti._size
return (self.optimized, percent, size)
def check_errors(self):
if self.err.has_errors():
logging.error(
"some errors detected during optimization, check 'optimg.err.csv' for details" # noqa
)
self.err.save()
@dataclass(slots=True, frozen=True)
class ImgOptimizer:
@ -94,8 +128,8 @@ class ImgOptimizer:
Example:
```python
optimizer = ImgOptimizer("mypath")
optimizer.optimize()
optimizer = ImgOptimizer()
res = optimizer.optimize(file_group)
```
"""
@ -134,20 +168,24 @@ class ImgOptimizer:
file_group_optim = FileGroup()
optimized = 0
errors = OptimizerErrors()
for f in tqdm(
as_completed(futures), total=len(futures), desc="Optimizing..."
):
if (res := f.result()) and res is not None:
match res:
case (orig, None):
logging.debug(f"no optimization for file: {orig}")
file_group_optim.add(orig)
case (orig, opti):
optimized += 1
logging.debug(
f"optimization for file: {orig} -> {(1 - (opti.size / orig.size)) * 100:.2f}%" # noqa
)
file_group_optim.add(opti)
match f.result():
case (orig, None, err):
logging.debug(f"optim error for file: {orig}, err: {err}")
file_group_optim.add(orig)
errors.add(orig.path, err)
case (orig, None, None):
logging.debug(f"no optimization for file: {orig}")
file_group_optim.add(orig)
case (orig, opti, None):
optimized += 1
logging.debug(
f"optimization for file: {orig} -> {(1 - (opti.size / orig.size)) * 100:.2f}%" # noqa
)
file_group_optim.add(opti)
logging.info(f"optimization finished in {time.perf_counter() - start:.2f}s")
return OptimizerResult(file_group, file_group_optim, optimized)
return OptimizerResult(file_group, file_group_optim, optimized, errors)