def zip_dir(src_dir, dst_zip, *, skip_suffixes=None, dry=False):
import logging
from pathlib import Path
from os import walk
from tempfile import TemporaryDirectory
from zipfile import ZipFile, ZipInfo
_log = logging.getLogger(zip_dir.__name__)
_log.addHandler(logging.NullHandler())
_sep = 50 * "-"
skip_suffixes = skip_suffixes or []
src_dir, dst_zip = Path(src_dir), Path(dst_zip)
_log.info("zipping dir: '%s' to: '%s", str(src_dir), str(dst_zip))
if not src_dir.exists():
raise FileNotFoundError(str(src_dir))
if not src_dir.is_dir():
raise NotADirectoryError(str(src_dir))
if dst_zip.exists():
raise FileExistsError(str(dst_zip))
with TemporaryDirectory() as tmp_dir:
tmp_zip_path = Path(tmp_dir).joinpath(dst_zip.name)
with ZipFile(str(tmp_zip_path), mode="w") as zip_out:
for root, dirs, files in walk(src_dir):
root = Path(root)
for folder in dirs:
folder = root.joinpath(folder)
# add empty folders to the zip
if not list(folder.iterdir()):
_log.debug(_sep)
folder_name = f"{str(folder.relative_to(src_dir))}/"
_log.debug("empty dir: '%s'", folder_name)
if dry:
continue
zip_out.writestr(ZipInfo(folder_name), "")
for file in files:
file = root.joinpath(file)
_log.debug(_sep)
_log.debug("adding: '%s'", str(file))
should_skip = None
for suffix in file.suffixes:
if suffix in skip_suffixes:
should_skip = suffix
break
if should_skip:
_log.debug("skipped [%s]: %s", should_skip, str(file))
continue
arcname = str(file.relative_to(src_dir))
_log.debug("arcname: '%s'", arcname)
if dry:
continue
zip_out.write(str(file), arcname=arcname)
if not dry:
dst_zip.write_bytes(tmp_zip_path.read_bytes())
tmp_zip_path.unlink()
if __name__ == '__main__':
import logging
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s | %(levelname)8s | %(module)25s:%(lineno)-5s | %(message)s")
zip_dir("/tmp/opera_profile", "opera_profile.zip", skip_suffixes=[".log"], dry=True)