from __future__ import annotations
from os.path import dirname
from os.path import exists
from os.path import join
import ubelt as ub
import os
import tarfile
import zipfile
from os.path import relpath
from typing import Dict, Tuple, Literal, Iterator
from types import ModuleType
[docs]
class Archive:
"""
Abstraction over zipfile and tarfile
TODO:
see if we can use one of these other tools instead
SeeAlso:
https://github.com/RKrahl/archive-tools
https://pypi.org/project/arlib/
Example:
>>> from kwcoco.util.util_archive import Archive
>>> import ubelt as ub
>>> dpath = ub.Path.appdir('kwcoco', 'tests', 'util', 'archive')
>>> dpath.delete().ensuredir()
>>> # Test write mode
>>> mode = 'w'
>>> arc_zip = Archive(str(dpath / 'demo.zip'), mode=mode)
>>> arc_tar = Archive(str(dpath / 'demo.tar.gz'), mode=mode)
>>> open(dpath / 'data_1only.txt', 'w').write('bazbzzz')
>>> open(dpath / 'data_2only.txt', 'w').write('buzzz')
>>> open(dpath / 'data_both.txt', 'w').write('foobar')
>>> #
>>> arc_zip.add(dpath / 'data_both.txt')
>>> arc_zip.add(dpath / 'data_1only.txt')
>>> #
>>> arc_tar.add(dpath / 'data_both.txt')
>>> arc_tar.add(dpath / 'data_2only.txt')
>>> #
>>> arc_zip.close()
>>> arc_tar.close()
>>> #
>>> # Test read mode
>>> arc_zip = Archive(str(dpath / 'demo.zip'), mode='r')
>>> arc_tar = Archive(str(dpath / 'demo.tar.gz'), mode='r')
>>> # Test names
>>> name = 'data_both.txt'
>>> assert name in arc_zip.names()
>>> assert name in arc_tar.names()
>>> # Test read
>>> assert arc_zip.read(name, mode='r') == 'foobar'
>>> assert arc_tar.read(name, mode='r') == 'foobar'
>>> #
>>> # Test extractall
>>> extract_dpath = ub.ensuredir(str(dpath / 'extracted'))
>>> extracted1 = arc_zip.extractall(extract_dpath)
>>> extracted2 = arc_tar.extractall(extract_dpath)
>>> for fpath in extracted2:
>>> print(open(fpath, 'r').read())
>>> for fpath in extracted1:
>>> print(open(fpath, 'r').read())
"""
_available_backends: Dict[str, ModuleType] = {
'tarfile': tarfile,
'zipfile': zipfile,
}
def __init__(
self,
fpath: str | None = None,
mode: Literal['r', 'w'] = 'r',
backend: str | ModuleType | None = None,
file: tarfile.TarFile | zipfile.ZipFile| None = None
) -> None:
"""
Args:
fpath (str | None): path to open
mode (str): either r or w
backend (str | ModuleType | None):
either tarfile, zipfile string or module.
file (tarfile.TarFile | zipfile.ZipFile | None):
the open backend file if it already exists.
If not set, than fpath will open it.
"""
self.fpath = fpath
self.mode = mode
if isinstance(backend, str):
backend = self._available_backends.get(backend)
if backend is None:
raise ValueError("unknown backend")
if file is None:
if fpath is None:
raise ValueError("file and fpath are None")
file, backend = self._open(fpath, mode, backend)
self.file = file
self.backend = backend
[docs]
@classmethod
def _open(
cls,
fpath: str,
mode: Literal['r', 'w'] = 'r',
backend: str | ModuleType | None = None,
) -> Tuple[tarfile.TarFile | zipfile.ZipFile , ModuleType]:
fpath = os.fspath(fpath)
exist_flag = os.path.exists(fpath)
if isinstance(backend, str):
backend = cls._available_backends.get(backend, None) or backend
if backend is None:
if fpath.endswith('.tar.gz'):
backend = tarfile
elif fpath.endswith('.zip'):
backend = zipfile
else:
if exist_flag and zipfile.is_zipfile(fpath):
backend = zipfile
elif exist_flag and tarfile.is_tarfile(fpath):
backend = tarfile
else:
raise NotImplementedError('no-exist case')
file: tarfile.TarFile | zipfile.ZipFile
if backend is zipfile:
if exist_flag and not zipfile.is_zipfile(fpath):
raise Exception('corrupted zip?')
file = zipfile.ZipFile(fpath, mode=mode)
elif backend is tarfile:
if exist_flag and not tarfile.is_tarfile(fpath):
raise Exception('corrupted tar.gz?')
if mode == 'r':
tar_mode: Literal['r:gz', 'w:gz'] = 'r:gz'
else:
tar_mode = 'w:gz'
file = tarfile.open(fpath, tar_mode)
else:
raise NotImplementedError
return file, backend
def __iter__(self) -> Iterator[str]:
return self.names()
[docs]
def names(self) -> Iterator[str]:
from typing import cast
if self.backend is tarfile:
return (mem.name for mem in cast(tarfile.TarFile, self.file))
else:
# does zip have an iterable structure?
return iter(cast(zipfile.ZipFile, self.file).namelist())
[docs]
def read(self, name, mode='rb'):
"""
Read data directly out of the archive.
Args:
name (str):
the name of the archive member to read
mode (str):
This is a conceptual parameter that emulates the usual
open mode. Defaults to "rb", which returns data as raw bytes.
If "r" will decode the bytes into utf8-text.
"""
if self.backend is tarfile:
# a rework of makefile in tarfile.
import io
from tarfile import copyfileobj, ReadError
self.file._check('r')
tarinfo = self.file.getmember(name)
source = self.file.fileobj
source.seek(tarinfo.offset_data)
bufsize = self.file.copybufsize
target = io.BytesIO()
if tarinfo.sparse is not None:
for offset, size in tarinfo.sparse:
target.seek(offset)
copyfileobj(source, target, size, ReadError, bufsize)
target.seek(tarinfo.size)
target.truncate()
else:
copyfileobj(source, target, tarinfo.size, ReadError, bufsize)
target.seek(0)
data = target.read()
elif self.backend is zipfile:
# does zip have an iterable structure?
data = self.file.read(name)
else:
raise NotImplementedError
if mode == 'rb':
return data
elif mode == 'r':
return data.decode('utf8')
else:
raise KeyError(mode)
[docs]
@classmethod
def coerce(cls, data):
"""
Either open an archive file path or coerce an existing
ZipFile or tarfile structure into this wrapper class
"""
if isinstance(data, str):
return cls(data)
if isinstance(data, zipfile.ZipFile):
assert data.fp is not None, 'ZipFile has no associated file object'
fpath = data.fp.name
return cls(fpath, file=data, backend=zipfile)
else:
raise NotImplementedError
[docs]
def add(self, fpath, arcname=None):
if arcname is None:
assert self.fpath is not None, 'fpath must be set to compute arcname'
arcname = relpath(fpath, dirname(self.fpath))
if self.backend is tarfile:
self.file.add(fpath, arcname)
if self.backend is zipfile:
self.file.write(fpath, arcname)
[docs]
def close(self):
return self.file.close()
def __enter__(self):
self.file.__enter__()
return self
def __exit__(self, *args):
self.file.__exit__(*args)
# def move_internal(self, src, dst):
# """
# Move a file in the archive to a new location
# """
# # Seems to be tricky
# if self.backend is zipfile:
# raise
# else:
# raise NotImplementedError
[docs]
def unarchive_file(archive_fpath, output_dpath='.', verbose=1, overwrite=True):
import tarfile
import zipfile
if verbose:
print(
'Unarchive archive_fpath = {!r} in {}'.format(archive_fpath, output_dpath)
)
archive_file = None
try:
if tarfile.is_tarfile(archive_fpath):
archive_file = tarfile.open(archive_fpath, 'r:gz')
archive_namelist = [
mem.path
for mem in ub.ProgIter(iter(archive_file), desc='enumerate members')
]
elif zipfile.is_zipfile(archive_fpath):
archive_file = zipfile.ZipFile(archive_fpath)
if verbose:
print('Enumerate members')
archive_namelist = archive_file.namelist()
else:
raise NotImplementedError
unarchived_paths = []
for member in ub.ProgIter(archive_namelist, desc='extracting', verbose=verbose):
fpath = join(output_dpath, member)
unarchived_paths.append(fpath)
if not overwrite and exists(fpath):
continue
ub.ensuredir(dirname(fpath))
archive_file.extract(member, path=output_dpath)
finally:
if archive_file is not None:
archive_file.close()
return unarchived_paths
[docs]
@ub.memoize
def _available_zipfile_compressions():
available = set(['ZIP_STORED'])
try:
import zlib # NOQA
except ImportError:
...
else:
available.add('ZIP_DEFLATED')
try:
import bz2 # NOQA
except ImportError:
...
else:
available.add('ZIP_BZIP2')
try:
import lzma # NOQA
except ImportError:
...
else:
available.add('ZIP_LZMA')
return available
[docs]
def _coerce_zipfile_compression(compression):
if isinstance(compression, str):
if compression == 'auto':
priority = ['ZIP_LZMA', 'ZIP_DEFLATED', 'ZIP_BZIP2', 'ZIP_STORED']
available = _available_zipfile_compressions()
found = None
for cand in priority:
if cand in available:
found = cand
break
compression = found
if compression is None:
raise ValueError('No supported zipfile compression found')
compression = getattr(zipfile, compression)
return compression