scaffold code for lib + cli

This commit is contained in:
rmanach 2025-10-21 16:39:04 +02:00
parent c2084a5166
commit f5772f2783
8 changed files with 445 additions and 294 deletions

2
.gitignore vendored
View File

@ -2,6 +2,8 @@
.ruff_cache
venv
venv.dist
data
dist
*.log

View File

@ -1,10 +1,13 @@
ROOT_DIR := $(dir $(realpath $(lastword $(MAKEFILE_LIST))))
PYTHON := $(ROOT_DIR)venv/bin/python
PYTHONCLI := $(ROOT_DIR)venv.dist/bin/python
SRC_NAME = optimg
.PHONY: venv
venv:
@python3 -m venv venv
pip install -r requirements-dev.txt
$(PYTHON) -m pip install -r requirements-dev.txt
lint:
$(PYTHON) -m ruff check --fix
@ -15,4 +18,17 @@ format:
check-type:
$(PYTHON) -m mypy .
check: format lint check-type
check: format lint check-type
build: check
@rm -rf dist/*
$(PYTHON) -m hatch -v build -t wheel
install: build
@python3 -m venv venv.dist
$(PYTHONCLI) -m pip install dist/$(SRC_NAME)-*.whl --force-reinstall
documentation:
$(PYTHON) -m pdoc --html -o docs src/ --force
@find docs/src/* -type f -exec sed -i 's/src\./$(SRC_NAME)\./g' {} \; -exec sed -i 's/src</$(SRC_NAME)</g' {} \;
@sed -i 's/<code>src<\/code>/<code>$(SRC_NAME)<\/code>/g' docs/src/index.html

View File

@ -1,287 +0,0 @@
import logging
import mimetypes
import os
import subprocess
import sys
import time
from concurrent.futures import ProcessPoolExecutor
from dataclasses import dataclass, field
from datetime import datetime as dt
from enum import Enum, auto
from typing import Optional
SRC_PATH = "../users/lea/pictures"
DEFAULT_MIMETYPE = "unknown"
DEFAULT_DEST_DIR = "data"
DEFAULT_NB_WORKERS = 10
JPEG_MIMETYPE = "image/jpeg"
PNG_MIMETYPE = "image/png"
class FileSizeRange(Enum):
TINY = auto()
MEDIUM = auto()
LARGE = auto()
FAT = auto()
@classmethod
def from_size(cls, size: float) -> "FileSizeRange":
if size < 1:
return cls.TINY
if size >= 1 and size < 2:
return cls.MEDIUM
if size >= 2 and size < 5:
return cls.LARGE
return cls.FAT
def __str__(self):
match self:
case FileSizeRange.TINY:
return "tiny"
case FileSizeRange.MEDIUM:
return "medium"
case FileSizeRange.LARGE:
return "large"
case FileSizeRange.FAT:
return "fat"
@dataclass(slots=True, frozen=True)
class File:
directory: str
name: str
path: str
mime_type: str
size: float
size_range: FileSizeRange
modified: dt
@classmethod
def from_directory(cls, directory: str, name: str) -> "File":
path = os.path.join(directory, name)
mtype, _ = mimetypes.guess_type(path)
mime_type = mtype or DEFAULT_MIMETYPE
size = os.path.getsize(path) / 1_048_576
return File(
directory,
name,
path,
mime_type,
size,
FileSizeRange.from_size(size),
dt.fromtimestamp(os.path.getmtime(path)),
)
def __repr__(self):
return f"<FILE name={self.name} | dir={self.directory} | size={self.size:.2f} Mb | mtype={self.mime_type}>" # noqa
def _jpeg_opti(self, base_dest_dir: str) -> tuple["File", Optional["File"]] | None:
# remove ".." avoiding treat file in same dir
filepath = "/".join(self.path.split("/")[:-1])
if filepath.startswith(".."):
filepath = filepath.lstrip("../")
# replace all spaces in dir name
dest_dir = os.path.join(base_dest_dir, filepath).replace(" ", "_")
os.makedirs(dest_dir, exist_ok=True)
cmd = f"jpegoptim -s -p -q '{self.path}' -d {dest_dir}"
logging.debug("optimization launched for file: %s -> %s", self, cmd)
try:
_ = subprocess.run(cmd, shell=True, check=True)
except subprocess.CalledProcessError as e:
logging.error("error while running command: %s, err: %s", cmd, e.output)
return self, None
except Exception:
logging.error(
"unexpected error while running command: %s", cmd, exc_info=True
)
return None
try:
f_opti = File.from_directory(dest_dir, self.name)
except Exception as e:
logging.debug("unable to get file: %s after optimization: %s", self, e)
return self, None
return self, f_opti
def opti(self, base_dest_dir: str) -> tuple["File", Optional["File"]] | None:
if self.mime_type == JPEG_MIMETYPE:
return self._jpeg_opti(base_dest_dir)
return None
@dataclass(slots=True)
class FileGroup:
mime_type: str
file_range: FileSizeRange
files: dict[str, File] = field(default_factory=dict)
size: float = 0
_nb_files: int = 0
def __repr__(self):
return f"<FILEGROUP mime_type={self.mime_type} | range={self.file_range} | n={self._nb_files} | size={self.size:.2f} Mb>" # noqa
def __len__(self):
return self._nb_files
def add(self, file: File):
if self.files.get(file.path) is None:
self.files[file.path] = file
self._nb_files += 1
self.size += file.size
def get_size(self) -> float:
return self.size
@staticmethod
def format_size(size: float) -> str:
if size < 1000:
return f"{size:.2f} Mb"
return f"{size / 1024:.2f} Gb"
def get_size_formatted(self) -> str:
return FileGroup.format_size(self.size)
def get_files(self) -> list[File]:
return list(self.files.values())
@dataclass(slots=True, frozen=True)
class Dir:
path: str
nb_files: int
details: dict[str, dict[FileSizeRange, FileGroup]]
def show(self):
data = [f"directory ({self.path}) details:"]
for mime_type, group in self.details.items():
nb_files = 0
size = 0
to_display = [f"* {mime_type}"]
for file_range in group.keys():
file_group = self.details[mime_type][file_range]
to_display.append(
f"\t{file_range:<8}{len(file_group):<8}{file_group.get_size_formatted()}"
)
nb_files += len(self.details[mime_type][file_range])
size += file_group.size
to_display[0] += f" ({FileGroup.format_size(size)})"
data.append("\n".join(to_display))
print("\n".join(data))
@classmethod
def from_path(cls, path: str) -> "Dir":
if not os.path.isdir(path):
raise Exception(f"Dir path: {path} must be a directory")
nb_files = 0
details: dict[str, dict[FileSizeRange, FileGroup]] = {}
for dirpath, _, filenames in os.walk(path):
for file in filenames:
file_path = os.path.join(dirpath, file)
try:
f = File.from_directory(dirpath, file)
except OSError as e:
logging.error("error accessing %s, err: %s", file_path, e)
continue
if details.get(f.mime_type) is None:
details[f.mime_type] = {}
if details[f.mime_type].get(f.size_range) is None:
details[f.mime_type][f.size_range] = FileGroup(
f.mime_type, f.size_range
)
details[f.mime_type][f.size_range].add(f)
nb_files += 1
return Dir(path, nb_files, details)
def get_file_group(
self, mimetype: str, file_size: FileSizeRange
) -> FileGroup | None:
if (mt := self.details.get(mimetype)) is not None:
return mt.get(file_size)
return None
def get_files(self) -> list[File]:
files = []
for details in self.details.values():
for file_group in details.values():
files.extend(file_group.get_files())
return files
if __name__ == "__main__":
stdout_handler = logging.StreamHandler(stream=sys.stdout)
logging.basicConfig(
format="[%(levelname)s] - %(asctime)s - %(message)s",
level=logging.INFO,
handlers=(stdout_handler,),
)
d = Dir.from_path(SRC_PATH)
d.show()
os.makedirs(DEFAULT_DEST_DIR, exist_ok=True)
mtype = JPEG_MIMETYPE
frange = FileSizeRange.FAT
nb_workers = DEFAULT_NB_WORKERS
fg = d.get_file_group(mtype, frange)
if fg is None:
logging.error(
"no files found for mimetype: %s and file size range: %s", mtype, frange
)
exit(1)
logging.info(
"launching optimization (%d) for %s and range %s on %d workers...",
len(fg),
mtype,
frange,
nb_workers,
)
start = time.perf_counter()
with ProcessPoolExecutor(nb_workers) as p:
futures = [p.submit(f.opti, DEFAULT_DEST_DIR) for f in fg.get_files()]
fg_opti = FileGroup(mtype, frange)
optimized = 0
for f in futures:
if (res := f.result()) and res is not None:
match res:
case (orig, None):
logging.debug(f"no optimization for file: {orig}")
fg_opti.add(orig)
case (orig, opti):
optimized += 1
logging.debug(
f"optimization for file: {orig} -> {(1 - (opti.size / orig.size)) * 100:.2f}%" # noqa
)
fg_opti.add(opti)
logging.info(f"optimization finished in {time.perf_counter() - start:.2f}s")
percent = (1 - (fg_opti.size / fg.size)) * 100
size_gained = fg.size - fg_opti.size
logging.info(
f"total optimization ({optimized}/{len(fg)}): {percent:.2f}% -> {size_gained:.2f} Mb" # noqa
)

View File

@ -1,8 +1,27 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "imgopti"
name = "optimg"
dynamic = ["version"]
authors = []
requires-python = ">= 3.10"
description = "Optimize JPEG and PNG files from a directory."
dependencies = []
[project.scripts]
optimg = "optimg.__main__:main"
[tool.hatch.version]
path = "src/__init__.py"
[tool.hatch.build.targets.wheel]
packages = ["src"]
[tool.hatch.build.targets.sdist]
only-include = ["src"]
[tool.hatch.build.targets.wheel.sources]
"src" = "optimg"
[tool.ruff.lint]
select = ["E", "F", "I"]
@ -23,4 +42,4 @@ max-complexity = 10
exclude = [
"venv",
]
ignore_missing_imports = true
ignore_missing_imports = true

View File

@ -1,2 +1,5 @@
mypy==1.18.2
ruff==0.14.1
ruff==0.14.1
hatch==1.12.0
pdoc3==0.11.6
twine==6.2.0

1
src/__init__.py Normal file
View File

@ -0,0 +1 @@
VERSION = "0.1.0"

142
src/__main__.py Normal file
View File

@ -0,0 +1,142 @@
import argparse
import logging
import os
import sys
import time
from concurrent.futures import ProcessPoolExecutor
from .files import Directory, FileGroup, FileImgMimetype, FileSizeRange
DEFAULT_DEST_DIR = "data"
def main():
stdout_handler = logging.StreamHandler(stream=sys.stdout)
logging.basicConfig(
format="[%(levelname)s] - %(asctime)s - %(message)s",
level=logging.INFO,
handlers=(stdout_handler,),
)
parser = argparse.ArgumentParser(
"optimg", description="Optimize JPEG and PNG files from a directory."
)
parser.add_argument("src", type=str, help="Directory path to scan.")
parser.add_argument(
"--mimetype",
type=str,
choices=FileImgMimetype._member_names_,
default=None,
help="Filter by mimetype. Default is all.",
)
parser.add_argument(
"--size",
type=str,
choices=FileSizeRange._member_names_,
default=None,
help="Filter by file size. Default is all.",
)
parser.add_argument(
"--dest",
type=str,
help="Base destination directory of optimized files.",
)
parser.add_argument(
"--workers",
type=int,
default=4,
help="Number of workers used to optimize files.",
)
parser.add_argument(
"--show",
action="store_true",
default=False,
help="Details the directory by mimetypes and quit.",
)
parser.add_argument(
"--debug",
action="store_true",
default=False,
help="Set log level to debug. Default is info.",
)
args = parser.parse_args()
if args.debug:
logging.root.setLevel(logging.DEBUG)
logging.debug("set debug mode on")
try:
directory = Directory.from_path(args.src)
except Exception as e:
logging.error(e)
exit(1)
if args.show:
directory.show()
exit(0)
try:
mimetype = (
FileImgMimetype.from_str(args.mimetype)
if args.mimetype is not None
else None
)
size_range = (
FileSizeRange.from_str(args.size) if args.size is not None else None
)
except Exception as e:
logging.error(f"unexpected error occurred while parsing arguments: {e}")
exit(1)
nb_workers = args.workers
dest_dir = args.dest or DEFAULT_DEST_DIR
os.makedirs(dest_dir, exist_ok=True)
fg = directory.get_file_group(mimetype, size_range)
if not len(fg):
logging.info(
"no files found for mimetype: %s and file size range: %s",
mimetype,
size_range,
)
exit(0)
logging.info(
"launching optimization (%d) for type: %s and size range: %s on %d workers...",
len(fg),
mimetype.value if mimetype is not None else "all",
size_range.value if size_range is not None else "all",
nb_workers,
)
start = time.perf_counter()
with ProcessPoolExecutor(nb_workers) as p:
futures = [p.submit(f.opti, dest_dir) for f in fg.get_files()]
fg_opti = FileGroup()
optimized = 0
for f in futures:
if (res := f.result()) and res is not None:
match res:
case (orig, None):
logging.debug(f"no optimization for file: {orig}")
fg_opti.add(orig)
case (orig, opti):
optimized += 1
logging.debug(
f"optimization for file: {orig} -> {(1 - (opti.size / orig.size)) * 100:.2f}%" # noqa
)
fg_opti.add(opti)
logging.info(f"optimization finished in {time.perf_counter() - start:.2f}s")
percent = (1 - (fg_opti._size / fg._size)) * 100
size_gained = fg._size - fg_opti._size
logging.info(
f"total optimization ({optimized}/{len(fg)}): {percent:.2f}% -> {size_gained:.2f} Mb" # noqa
)
if __name__ == "__main__":
main()

255
src/files.py Normal file
View File

@ -0,0 +1,255 @@
import logging
import mimetypes
import os
import subprocess
from dataclasses import dataclass, field
from datetime import datetime as dt
from enum import Enum
from typing import Optional
DEFAULT_MIMETYPE = "unknown"
class FileImgMimetype(Enum):
JPEG = "image/jpeg"
PNG = "image/png"
@classmethod
def from_str(cls, value: str) -> "FileImgMimetype":
match value.upper():
case FileImgMimetype.JPEG.name:
return FileImgMimetype.JPEG
case FileImgMimetype.PNG.name:
return FileImgMimetype.PNG
raise Exception(f"unable to parse file img mimetype: {value}")
class FileSizeRange(Enum):
TINY = "TINY"
MEDIUM = "MEDIUM"
LARGE = "LARGE"
FAT = "FAT"
@classmethod
def from_str(cls, value: str) -> "FileSizeRange":
match value.upper():
case FileSizeRange.TINY.name:
return FileSizeRange.TINY
case FileSizeRange.MEDIUM.name:
return FileSizeRange.MEDIUM
case FileSizeRange.LARGE.name:
return FileSizeRange.LARGE
case FileSizeRange.FAT.name:
return FileSizeRange.FAT
raise Exception(f"unable to parse file size range: {value}")
@classmethod
def from_size(cls, size: float) -> "FileSizeRange":
if size < 1:
return cls.TINY
if size >= 1 and size < 2:
return cls.MEDIUM
if size >= 2 and size < 5:
return cls.LARGE
return cls.FAT
@dataclass(slots=True, frozen=True)
class File:
directory: str
name: str
path: str
mimetype: str
size: float
modified: dt
@classmethod
def from_directory(cls, directory: str, name: str) -> "File":
path = os.path.join(directory, name)
mimetype, _ = mimetypes.guess_type(path)
mimetype = mimetype or DEFAULT_MIMETYPE
size = os.path.getsize(path) / 1_048_576
return File(
directory,
name,
path,
mimetype,
size,
dt.fromtimestamp(os.path.getmtime(path)),
)
def __repr__(self):
return f"<FILE name={self.name} | dir={self.directory} | size={self.size:.2f} Mb | mimetype={self.mimetype}>" # noqa
def _jpeg_opti(self, base_dest_dir: str) -> tuple["File", Optional["File"]] | None:
# remove ".." avoiding treat file in same dir
filepath = "/".join(self.path.split("/")[:-1])
if filepath.startswith(".."):
filepath = filepath.lstrip("..")
# replace all spaces in dir name for readability
dest_dir = os.path.join(base_dest_dir, filepath.lstrip("/")).replace(" ", "_")
os.makedirs(dest_dir, exist_ok=True)
cmd = f"jpegoptim -s -p -q '{self.path}' -d {dest_dir}"
logging.debug("optimization launched for file: %s -> %s", self, cmd)
try:
_ = subprocess.run(cmd, shell=True, check=True)
except subprocess.CalledProcessError as e:
logging.error("error while running command: %s, err: %s", cmd, e.output)
return None
except Exception:
logging.error(
"unexpected error while running command: %s", cmd, exc_info=True
)
return None
try:
f_opti = File.from_directory(dest_dir, self.name)
except Exception as e:
logging.debug("unable to get file: %s after optimization: %s", self, e)
return self, None
return self, f_opti
def opti(self, base_dest_dir: str) -> tuple["File", Optional["File"]] | None:
if self.mimetype == FileImgMimetype.JPEG.value:
return self._jpeg_opti(base_dest_dir)
return None
@dataclass(slots=True)
class FileGroup:
_files: dict[str, File] = field(default_factory=dict)
_size: float = 0
_nb_files: int = 0
def __repr__(self):
return f"<FILEGROUP n={self._nb_files} | size={self._size:.2f} Mb>" # noqa
def __len__(self):
return self._nb_files
def add(self, file: File):
if self._files.get(file.path) is None:
self._files[file.path] = file
self._nb_files += 1
self._size += file.size
def get_size(self) -> float:
return self._size
def join(self, right: "FileGroup"):
for filepath, file in right._files.items():
if self._files.get(filepath) is None:
self._files[filepath] = file
self._size += file.size
self._nb_files += 1
@staticmethod
def format_size(size: float) -> str:
if size < 1000:
return f"{size:.2f} Mb"
return f"{size / 1024:.2f} Gb"
def get_size_formatted(self) -> str:
return FileGroup.format_size(self._size)
def get_files(self) -> list[File]:
return list(self._files.values())
@dataclass(slots=True, frozen=True)
class Directory:
path: str
nb_files: int
details: dict[str, dict[FileSizeRange, FileGroup]]
def __len__(self):
return self.nb_files
def show(self):
data = [f"directory ({self.path}) details:"]
for mimetype, group in self.details.items():
nb_files = 0
size = 0
to_display = [f"* {mimetype}"]
for file_range in group.keys():
file_group = self.details[mimetype][file_range]
to_display.append(
f"\t{file_range.value:<8}{len(file_group):<8}{file_group.get_size_formatted()}"
)
nb_files += len(self.details[mimetype][file_range])
size += file_group._size
to_display[0] += f" ({FileGroup.format_size(size)})"
data.append("\n".join(to_display))
print("\n".join(data))
@classmethod
def from_path(cls, path: str) -> "Directory":
if not os.path.isdir(path):
raise Exception(f"Directory path: {path} must be a directory")
nb_files = 0
details: dict[str, dict[FileSizeRange, FileGroup]] = {}
for dirpath, _, filenames in os.walk(path):
for file in filenames:
file_path = os.path.join(dirpath, file)
try:
f = File.from_directory(dirpath, file)
except OSError as e:
logging.error("error accessing %s, err: %s", file_path, e)
continue
if details.get(f.mimetype) is None:
details[f.mimetype] = {}
size_range = FileSizeRange.from_size(f.size)
if details[f.mimetype].get(size_range) is None:
details[f.mimetype][size_range] = FileGroup()
details[f.mimetype][size_range].add(f)
nb_files += 1
return Directory(path, nb_files, details)
def get_file_group(
self,
mimetype: FileImgMimetype | None = None,
size_range: FileSizeRange | None = None,
) -> FileGroup:
file_group = FileGroup()
match (mimetype, size_range):
case (None, None):
file_group = self.get_all()
case (mimetype, None):
if dict_file_range := self.details.get(mimetype.value): # type: ignore
for fg in dict_file_range.values():
file_group.join(fg)
case (None, size_range):
for dict_file_range in self.details.values():
if fg := dict_file_range.get(size_range): # type: ignore
file_group.join(fg)
case (mimetype, size_range):
if dict_file_range := self.details.get(mimetype.value):
if fg := dict_file_range.get(size_range): # type: ignore
file_group.join(fg)
return file_group
def get_all(self) -> FileGroup:
file_group = FileGroup()
for details in self.details.values():
for fg in details.values():
file_group.join(fg)
return file_group