Source code for kwcoco.util.util_archive

from __future__ import annotations

from os.path import dirname
from os.path import exists
from os.path import join
import ubelt as ub

import os
import tarfile
import zipfile
from os.path import relpath
from typing import Dict, Tuple, Literal, Iterator
from types import ModuleType

[docs] class Archive: """ Abstraction over zipfile and tarfile TODO: see if we can use one of these other tools instead SeeAlso: https://github.com/RKrahl/archive-tools https://pypi.org/project/arlib/ Example: >>> from kwcoco.util.util_archive import Archive >>> import ubelt as ub >>> dpath = ub.Path.appdir('kwcoco', 'tests', 'util', 'archive') >>> dpath.delete().ensuredir() >>> # Test write mode >>> mode = 'w' >>> arc_zip = Archive(str(dpath / 'demo.zip'), mode=mode) >>> arc_tar = Archive(str(dpath / 'demo.tar.gz'), mode=mode) >>> open(dpath / 'data_1only.txt', 'w').write('bazbzzz') >>> open(dpath / 'data_2only.txt', 'w').write('buzzz') >>> open(dpath / 'data_both.txt', 'w').write('foobar') >>> # >>> arc_zip.add(dpath / 'data_both.txt') >>> arc_zip.add(dpath / 'data_1only.txt') >>> # >>> arc_tar.add(dpath / 'data_both.txt') >>> arc_tar.add(dpath / 'data_2only.txt') >>> # >>> arc_zip.close() >>> arc_tar.close() >>> # >>> # Test read mode >>> arc_zip = Archive(str(dpath / 'demo.zip'), mode='r') >>> arc_tar = Archive(str(dpath / 'demo.tar.gz'), mode='r') >>> # Test names >>> name = 'data_both.txt' >>> assert name in arc_zip.names() >>> assert name in arc_tar.names() >>> # Test read >>> assert arc_zip.read(name, mode='r') == 'foobar' >>> assert arc_tar.read(name, mode='r') == 'foobar' >>> # >>> # Test extractall >>> extract_dpath = ub.ensuredir(str(dpath / 'extracted')) >>> extracted1 = arc_zip.extractall(extract_dpath) >>> extracted2 = arc_tar.extractall(extract_dpath) >>> for fpath in extracted2: >>> print(open(fpath, 'r').read()) >>> for fpath in extracted1: >>> print(open(fpath, 'r').read()) """ _available_backends: Dict[str, ModuleType] = { 'tarfile': tarfile, 'zipfile': zipfile, } def __init__( self, fpath: str | None = None, mode: Literal['r', 'w'] = 'r', backend: str | ModuleType | None = None, file: tarfile.TarFile | zipfile.ZipFile| None = None ) -> None: """ Args: fpath (str | None): path to open mode (str): either r or w backend (str | ModuleType | None): either tarfile, zipfile string or module. file (tarfile.TarFile | zipfile.ZipFile | None): the open backend file if it already exists. If not set, than fpath will open it. """ self.fpath = fpath self.mode = mode if isinstance(backend, str): backend = self._available_backends.get(backend) if backend is None: raise ValueError("unknown backend") if file is None: if fpath is None: raise ValueError("file and fpath are None") file, backend = self._open(fpath, mode, backend) self.file = file self.backend = backend
[docs] @classmethod def _open( cls, fpath: str, mode: Literal['r', 'w'] = 'r', backend: str | ModuleType | None = None, ) -> Tuple[tarfile.TarFile | zipfile.ZipFile , ModuleType]: fpath = os.fspath(fpath) exist_flag = os.path.exists(fpath) if isinstance(backend, str): backend = cls._available_backends.get(backend, None) or backend if backend is None: if fpath.endswith('.tar.gz'): backend = tarfile elif fpath.endswith('.zip'): backend = zipfile else: if exist_flag and zipfile.is_zipfile(fpath): backend = zipfile elif exist_flag and tarfile.is_tarfile(fpath): backend = tarfile else: raise NotImplementedError('no-exist case') file: tarfile.TarFile | zipfile.ZipFile if backend is zipfile: if exist_flag and not zipfile.is_zipfile(fpath): raise Exception('corrupted zip?') file = zipfile.ZipFile(fpath, mode=mode) elif backend is tarfile: if exist_flag and not tarfile.is_tarfile(fpath): raise Exception('corrupted tar.gz?') if mode == 'r': tar_mode: Literal['r:gz', 'w:gz'] = 'r:gz' else: tar_mode = 'w:gz' file = tarfile.open(fpath, tar_mode) else: raise NotImplementedError return file, backend
def __iter__(self) -> Iterator[str]: return self.names()
[docs] def names(self) -> Iterator[str]: from typing import cast if self.backend is tarfile: return (mem.name for mem in cast(tarfile.TarFile, self.file)) else: # does zip have an iterable structure? return iter(cast(zipfile.ZipFile, self.file).namelist())
[docs] def read(self, name, mode='rb'): """ Read data directly out of the archive. Args: name (str): the name of the archive member to read mode (str): This is a conceptual parameter that emulates the usual open mode. Defaults to "rb", which returns data as raw bytes. If "r" will decode the bytes into utf8-text. """ if self.backend is tarfile: # a rework of makefile in tarfile. import io from tarfile import copyfileobj, ReadError self.file._check('r') tarinfo = self.file.getmember(name) source = self.file.fileobj source.seek(tarinfo.offset_data) bufsize = self.file.copybufsize target = io.BytesIO() if tarinfo.sparse is not None: for offset, size in tarinfo.sparse: target.seek(offset) copyfileobj(source, target, size, ReadError, bufsize) target.seek(tarinfo.size) target.truncate() else: copyfileobj(source, target, tarinfo.size, ReadError, bufsize) target.seek(0) data = target.read() elif self.backend is zipfile: # does zip have an iterable structure? data = self.file.read(name) else: raise NotImplementedError if mode == 'rb': return data elif mode == 'r': return data.decode('utf8') else: raise KeyError(mode)
[docs] @classmethod def coerce(cls, data): """ Either open an archive file path or coerce an existing ZipFile or tarfile structure into this wrapper class """ if isinstance(data, str): return cls(data) if isinstance(data, zipfile.ZipFile): assert data.fp is not None, 'ZipFile has no associated file object' fpath = data.fp.name return cls(fpath, file=data, backend=zipfile) else: raise NotImplementedError
[docs] def add(self, fpath, arcname=None): if arcname is None: assert self.fpath is not None, 'fpath must be set to compute arcname' arcname = relpath(fpath, dirname(self.fpath)) if self.backend is tarfile: self.file.add(fpath, arcname) if self.backend is zipfile: self.file.write(fpath, arcname)
[docs] def close(self): return self.file.close()
def __enter__(self): self.file.__enter__() return self def __exit__(self, *args): self.file.__exit__(*args)
[docs] def extractall(self, output_dpath='.', verbose=1, overwrite=True): if verbose: print('Enumerate members') archive_namelist = list(ub.ProgIter(iter(self), desc='enumerate members')) unarchived_paths = [] for member in ub.ProgIter(archive_namelist, desc='extracting', verbose=verbose): fpath = join(output_dpath, member) unarchived_paths.append(fpath) if not overwrite and exists(fpath): continue ub.ensuredir(dirname(fpath)) self.file.extract(member, path=output_dpath) return unarchived_paths
# def move_internal(self, src, dst): # """ # Move a file in the archive to a new location # """ # # Seems to be tricky # if self.backend is zipfile: # raise # else: # raise NotImplementedError
[docs] def unarchive_file(archive_fpath, output_dpath='.', verbose=1, overwrite=True): import tarfile import zipfile if verbose: print( 'Unarchive archive_fpath = {!r} in {}'.format(archive_fpath, output_dpath) ) archive_file = None try: if tarfile.is_tarfile(archive_fpath): archive_file = tarfile.open(archive_fpath, 'r:gz') archive_namelist = [ mem.path for mem in ub.ProgIter(iter(archive_file), desc='enumerate members') ] elif zipfile.is_zipfile(archive_fpath): archive_file = zipfile.ZipFile(archive_fpath) if verbose: print('Enumerate members') archive_namelist = archive_file.namelist() else: raise NotImplementedError unarchived_paths = [] for member in ub.ProgIter(archive_namelist, desc='extracting', verbose=verbose): fpath = join(output_dpath, member) unarchived_paths.append(fpath) if not overwrite and exists(fpath): continue ub.ensuredir(dirname(fpath)) archive_file.extract(member, path=output_dpath) finally: if archive_file is not None: archive_file.close() return unarchived_paths
[docs] @ub.memoize def _available_zipfile_compressions(): available = set(['ZIP_STORED']) try: import zlib # NOQA except ImportError: ... else: available.add('ZIP_DEFLATED') try: import bz2 # NOQA except ImportError: ... else: available.add('ZIP_BZIP2') try: import lzma # NOQA except ImportError: ... else: available.add('ZIP_LZMA') return available
[docs] def _coerce_zipfile_compression(compression): if isinstance(compression, str): if compression == 'auto': priority = ['ZIP_LZMA', 'ZIP_DEFLATED', 'ZIP_BZIP2', 'ZIP_STORED'] available = _available_zipfile_compressions() found = None for cand in priority: if cand in available: found = cand break compression = found if compression is None: raise ValueError('No supported zipfile compression found') compression = getattr(zipfile, compression) return compression