handle optim errors
This commit is contained in:
parent
0ba3e7a9ff
commit
f79668195b
2
.gitignore
vendored
2
.gitignore
vendored
@ -9,3 +9,5 @@ dist
|
||||
docs
|
||||
|
||||
*.log
|
||||
*.err*
|
||||
*.swp
|
||||
|
||||
@ -126,6 +126,7 @@ def main():
|
||||
logging.info(
|
||||
f"total optimization ({optimized}/{len(result.orig)}): {percent:.2f}% -> {size:.2f} Mb" # noqa
|
||||
)
|
||||
result.check_errors()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@ -4,7 +4,7 @@ import signal
|
||||
import subprocess
|
||||
import time
|
||||
from concurrent.futures import ProcessPoolExecutor, as_completed
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Optional
|
||||
|
||||
from tqdm import tqdm
|
||||
@ -15,7 +15,9 @@ __all__ = ["ImgOptimizer", "OptimizerResult"]
|
||||
|
||||
|
||||
# TODO(rmanach): add argument to set the size or leave it empty for loseless optim
|
||||
def _jpeg_optim(dest_dir: str, file: File) -> tuple["File", Optional["File"]] | None:
|
||||
def _jpeg_optim(
|
||||
dest_dir: str, file: File
|
||||
) -> tuple["File", Optional["File"], str | None]:
|
||||
"""
|
||||
Optimize the `file` with `jpegoptim` and put the result in
|
||||
`dest_dir` directory keeping file path.
|
||||
@ -35,22 +37,26 @@ def _jpeg_optim(dest_dir: str, file: File) -> tuple["File", Optional["File"]] |
|
||||
try:
|
||||
_ = subprocess.run(cmd, shell=True, check=True)
|
||||
except subprocess.CalledProcessError as e:
|
||||
logging.error("error while running command: %s, err: %s", cmd, e.stderr or "?")
|
||||
return None
|
||||
logging.debug("error while running command: %s, err: %s", cmd, e.stderr or "?")
|
||||
return file, None, f"command: {cmd} failed, err: {e.stderr}"
|
||||
except Exception:
|
||||
logging.error("unexpected error while running command: %s", cmd, exc_info=True)
|
||||
return None
|
||||
logging.debug("unexpected error while running command: %s", cmd, exc_info=True)
|
||||
return (
|
||||
file,
|
||||
None,
|
||||
f"command: {cmd} failed unexpectedly, set debug mode for more details",
|
||||
)
|
||||
|
||||
try:
|
||||
file_optim = File.from_directory(dest_dir, file.name)
|
||||
except Exception as e:
|
||||
logging.debug("unable to get file: %s after optimization: %s", file, e)
|
||||
return file, None
|
||||
return file, None, None
|
||||
|
||||
return file, file_optim
|
||||
return file, file_optim, None
|
||||
|
||||
|
||||
def _optim(dest_dir: str, file: File) -> tuple["File", Optional["File"]] | None:
|
||||
def _optim(dest_dir: str, file: File) -> tuple["File", Optional["File"], str | None]:
|
||||
"""
|
||||
Entry point of `file` optimization selection the handler.
|
||||
NOTE: Must be launched in separated process.
|
||||
@ -59,7 +65,27 @@ def _optim(dest_dir: str, file: File) -> tuple["File", Optional["File"]] | None:
|
||||
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||
if file.mimetype == FileImgMimetype.JPEG.value:
|
||||
return _jpeg_optim(dest_dir, file)
|
||||
return None
|
||||
return file, None, f"mimetype: {file.mimetype} not supported for optimization"
|
||||
|
||||
|
||||
@dataclass(slots=True, frozen=True)
|
||||
class OptimizerErrors:
|
||||
_data: dict[str, str] = field(default_factory=dict)
|
||||
|
||||
def save(self):
|
||||
if not len(self._data):
|
||||
return
|
||||
|
||||
with open("optimg.err.csv", "w") as f:
|
||||
f.write("filepath;err\n")
|
||||
for filepath, error in self._data.items():
|
||||
f.write(f"{filepath};{error}\n")
|
||||
|
||||
def has_errors(self) -> bool:
|
||||
return len(self._data) > 0
|
||||
|
||||
def add(self, filepath: str, error: str):
|
||||
self._data[filepath] = error
|
||||
|
||||
|
||||
@dataclass(slots=True, frozen=True)
|
||||
@ -73,6 +99,7 @@ class OptimizerResult:
|
||||
orig: FileGroup
|
||||
opti: FileGroup
|
||||
optimized: int
|
||||
err: OptimizerErrors
|
||||
|
||||
def stats(self) -> tuple[int, float, float]:
|
||||
"""
|
||||
@ -85,6 +112,13 @@ class OptimizerResult:
|
||||
size = self.orig._size - self.opti._size
|
||||
return (self.optimized, percent, size)
|
||||
|
||||
def check_errors(self):
|
||||
if self.err.has_errors():
|
||||
logging.error(
|
||||
"some errors detected during optimization, check 'optimg.err.csv' for details" # noqa
|
||||
)
|
||||
self.err.save()
|
||||
|
||||
|
||||
@dataclass(slots=True, frozen=True)
|
||||
class ImgOptimizer:
|
||||
@ -94,8 +128,8 @@ class ImgOptimizer:
|
||||
|
||||
Example:
|
||||
```python
|
||||
optimizer = ImgOptimizer("mypath")
|
||||
optimizer.optimize()
|
||||
optimizer = ImgOptimizer()
|
||||
res = optimizer.optimize(file_group)
|
||||
```
|
||||
"""
|
||||
|
||||
@ -134,15 +168,19 @@ class ImgOptimizer:
|
||||
|
||||
file_group_optim = FileGroup()
|
||||
optimized = 0
|
||||
errors = OptimizerErrors()
|
||||
for f in tqdm(
|
||||
as_completed(futures), total=len(futures), desc="Optimizing..."
|
||||
):
|
||||
if (res := f.result()) and res is not None:
|
||||
match res:
|
||||
case (orig, None):
|
||||
match f.result():
|
||||
case (orig, None, err):
|
||||
logging.debug(f"optim error for file: {orig}, err: {err}")
|
||||
file_group_optim.add(orig)
|
||||
errors.add(orig.path, err)
|
||||
case (orig, None, None):
|
||||
logging.debug(f"no optimization for file: {orig}")
|
||||
file_group_optim.add(orig)
|
||||
case (orig, opti):
|
||||
case (orig, opti, None):
|
||||
optimized += 1
|
||||
logging.debug(
|
||||
f"optimization for file: {orig} -> {(1 - (opti.size / orig.size)) * 100:.2f}%" # noqa
|
||||
@ -150,4 +188,4 @@ class ImgOptimizer:
|
||||
file_group_optim.add(opti)
|
||||
|
||||
logging.info(f"optimization finished in {time.perf_counter() - start:.2f}s")
|
||||
return OptimizerResult(file_group, file_group_optim, optimized)
|
||||
return OptimizerResult(file_group, file_group_optim, optimized, errors)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user