aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArthur Zamarin <arthurzam@gentoo.org>2022-10-10 15:42:26 +0300
committerArthur Zamarin <arthurzam@gentoo.org>2022-10-10 18:55:00 +0300
commit7c1d50adc0f82f6fc19f857930eb70c2ba7a84f2 (patch)
tree2a5f080d3eb241fe22d1ceb5266962a84402a80a
parentcompression/_bzip2: add tests (diff)
downloadsnakeoil-7c1d50adc0f82f6fc19f857930eb70c2ba7a84f2.tar.gz
snakeoil-7c1d50adc0f82f6fc19f857930eb70c2ba7a84f2.tar.bz2
snakeoil-7c1d50adc0f82f6fc19f857930eb70c2ba7a84f2.zip
compression.__init__: add tests
Signed-off-by: Arthur Zamarin <arthurzam@gentoo.org>
-rw-r--r--src/snakeoil/compression/__init__.py27
-rw-r--r--src/snakeoil/compression/_util.py24
-rw-r--r--tests/compression/__init__.py11
-rw-r--r--tests/compression/test_bzip2.py10
-rw-r--r--tests/compression/test_init.py88
5 files changed, 121 insertions, 39 deletions
diff --git a/src/snakeoil/compression/__init__.py b/src/snakeoil/compression/__init__.py
index b250b1af..4b4a437c 100644
--- a/src/snakeoil/compression/__init__.py
+++ b/src/snakeoil/compression/__init__.py
@@ -1,9 +1,9 @@
import shlex
+from functools import cached_property
from importlib import import_module
-from .. import klass
+from .. import process
from ..cli.exceptions import UserException
-from ..process import CommandNotFound, find_binary
from ..process.spawn import spawn_get_output
@@ -12,14 +12,10 @@ class _transform_source:
def __init__(self, name):
self.name = name
- @klass.jit_attr
+ @cached_property
def module(self):
return import_module(f'snakeoil.compression._{self.name}')
- @klass.jit_attr
- def parallelizable(self):
- return bool(getattr(self.module, 'parallelizable', False))
-
def compress_data(self, data, level, parallelize=False):
parallelize = parallelize and self.module.parallelizable
return self.module.compress_data(data, level, parallelize=parallelize)
@@ -81,7 +77,7 @@ class ArComp:
def __init_subclass__(cls, **kwargs):
"""Initialize result subclasses and register archive extensions."""
super().__init_subclass__(**kwargs)
- if not all((cls.binary, cls.default_unpack_cmd, cls.exts)):
+ if not all((cls.binary, cls.default_unpack_cmd, cls.exts)): # pragma: no cover
raise ValueError(f'class missing required attrs: {cls!r}')
for ext in cls.exts:
cls.known_exts[ext] = cls
@@ -89,13 +85,13 @@ class ArComp:
def __init__(self, path, ext=None):
self.path = path
- @klass.jit_attr
+ @cached_property
def _unpack_cmd(self):
for b in self.binary:
try:
- binary = find_binary(b)
+ binary = process.find_binary(b)
break
- except CommandNotFound:
+ except process.CommandNotFound:
continue
else:
choices = ', '.join(self.binary)
@@ -107,9 +103,6 @@ class ArComp:
def unpack(self, dest=None, **kwargs):
raise NotImplementedError
- def create(self, dest):
- raise NotImplementedError
-
class _Archive:
"""Generic archive format support."""
@@ -155,16 +148,16 @@ class _Tar(_Archive, ArComp):
compress_binary = None
default_unpack_cmd = '{binary} xf "{path}"'
- @klass.jit_attr
+ @cached_property
def _unpack_cmd(self):
cmd = super()._unpack_cmd
if self.compress_binary is not None:
for b in self.compress_binary:
try:
- find_binary(b)
+ process.find_binary(b)
cmd += f' --use-compress-program={b}'
break
- except CommandNotFound:
+ except process.CommandNotFound:
pass
else:
choices = ', '.join(self.compress_binary)
diff --git a/src/snakeoil/compression/_util.py b/src/snakeoil/compression/_util.py
index 6a3c7258..e1af5aef 100644
--- a/src/snakeoil/compression/_util.py
+++ b/src/snakeoil/compression/_util.py
@@ -22,7 +22,7 @@ def _drive_process(args, mode, data):
def compress_data(binary, data, compresslevel=9, extra_args=()):
- args = [binary, '-%ic' % compresslevel]
+ args = [binary, f'-{compresslevel}c']
args.extend(extra_args)
return _drive_process(args, 'compression', data)
@@ -36,9 +36,7 @@ def decompress_data(binary, data, extra_args=()):
class _process_handle:
def __init__(self, handle, args, is_read=False):
- self.mode = 'wb'
- if is_read:
- self.mode = 'rb'
+ self.mode = 'rb' if is_read else 'wb'
self.args = tuple(args)
self.is_read = is_read
@@ -55,8 +53,7 @@ class _process_handle:
elif not isinstance(handle, int):
if not hasattr(handle, 'fileno'):
raise TypeError(
- "handle %r isn't a string, integer, and lacks a fileno "
- "method" % (handle,))
+ f"handle {handle!r} isn't a string, integer, and lacks a fileno method")
handle = handle.fileno()
try:
@@ -108,8 +105,8 @@ class _process_handle:
if fwd_seek < 0:
if self._allow_reopen is None:
raise TypeError(
- "instance %s can't do negative seeks: asked for %i, "
- "was at %i" % (self, position, self.position))
+ f"instance {self} can't do negative seeks: "
+ f"asked for {position}, was at {self.position}")
self._terminate()
self._open_handle(self._allow_reopen)
return self.seek(position)
@@ -142,16 +139,17 @@ class _process_handle:
def _terminate(self):
try:
self._process.terminate()
- except EnvironmentError as e:
+ except EnvironmentError as exc:
# allow no such process only.
- if e.errno != errno.ESRCH:
+ if exc.errno != errno.ESRCH:
raise
def close(self):
+ if not hasattr(self, '_process'):
+ return
if self._process.returncode is not None:
if self._process.returncode != 0:
- raise Exception("%s invocation had non zero exit: %i" %
- (self.args, self._process.returncode))
+ raise Exception(f"{self.args} invocation had non zero exit: {self._process.returncode}")
return
self.handle.close()
@@ -165,7 +163,7 @@ class _process_handle:
def compress_handle(binary_path, handle, compresslevel=9, extra_args=()):
- args = [binary_path, '-%ic' % compresslevel]
+ args = [binary_path, f'-{compresslevel}c']
args.extend(extra_args)
return _process_handle(handle, args, False)
diff --git a/tests/compression/__init__.py b/tests/compression/__init__.py
index e69de29b..178d4b64 100644
--- a/tests/compression/__init__.py
+++ b/tests/compression/__init__.py
@@ -0,0 +1,11 @@
+from unittest.mock import patch
+
+from snakeoil.process import CommandNotFound, find_binary
+
+def hide_binary(*binaries: str):
+ def mock_find_binary(name):
+ if name in binaries:
+ raise CommandNotFound(name)
+ return find_binary(name)
+
+ return patch('snakeoil.process.find_binary', side_effect=mock_find_binary)
diff --git a/tests/compression/test_bzip2.py b/tests/compression/test_bzip2.py
index f8f1d1fb..d11d61fa 100644
--- a/tests/compression/test_bzip2.py
+++ b/tests/compression/test_bzip2.py
@@ -1,21 +1,13 @@
import importlib
from bz2 import decompress
from pathlib import Path
-from unittest import mock
import pytest
from snakeoil.compression import _bzip2
from snakeoil.process import CommandNotFound, find_binary
from snakeoil.test import hide_imports
-
-def hide_binary(*binaries: str):
- def mock_find_binary(name):
- if name in binaries:
- raise CommandNotFound(name)
- return find_binary(name)
-
- return mock.patch('snakeoil.process.find_binary', side_effect=mock_find_binary)
+from . import hide_binary
def test_no_native():
diff --git a/tests/compression/test_init.py b/tests/compression/test_init.py
new file mode 100644
index 00000000..0726571b
--- /dev/null
+++ b/tests/compression/test_init.py
@@ -0,0 +1,88 @@
+import shutil
+import subprocess
+import sys
+
+import pytest
+from snakeoil.compression import ArComp, ArCompError, _TarBZ2
+from snakeoil.contexts import chdir
+
+from . import hide_binary
+
+
+@pytest.mark.skipif(sys.platform == "darwin", reason="darwin fails with bzip2")
+class TestArComp:
+
+ @pytest.fixture(scope='class')
+ def tar_file(self, tmp_path_factory):
+ data = tmp_path_factory.mktemp("data")
+ (data / 'file1').write_text('Hello world')
+ (data / 'file2').write_text('Larry the Cow')
+ path = data / 'test 1.tar'
+ subprocess.run(['tar', 'cf', str(path), 'file1', 'file2'], cwd=data, check=True)
+ (data / 'file1').unlink()
+ (data / 'file2').unlink()
+ return str(path)
+
+ @pytest.fixture(scope='class')
+ def tar_bz2_file(self, tar_file):
+ subprocess.run(['bzip2', '-z', '-k', tar_file], check=True)
+ return tar_file + ".bz2"
+
+ @pytest.fixture(scope='class')
+ def tbz2_file(self, tar_bz2_file):
+ new_path = tar_bz2_file.replace('.tar.bz2', '.tbz2')
+ shutil.copyfile(tar_bz2_file, new_path)
+ return new_path
+
+ @pytest.fixture(scope='class')
+ def lzma_file(self, tmp_path_factory):
+ data = (tmp_path_factory.mktemp("data") / 'test 2.lzma')
+ with data.open('wb') as f:
+ subprocess.run(['lzma'], check=True, input=b'Hello world', stdout=f)
+ return str(data)
+
+ def test_unknown_extenstion(self, tmp_path):
+ file = tmp_path / 'test.file'
+ with pytest.raises(ArCompError, match='unknown compression file extension'):
+ ArComp(file, ext='.foo')
+
+ def test_missing_tar(self, tmp_path, tar_file):
+ with hide_binary('tar'), chdir(tmp_path):
+ with pytest.raises(ArCompError, match='required binary not found'):
+ ArComp(tar_file, ext='.tar').unpack(dest=tmp_path)
+
+ def test_tar(self, tmp_path, tar_file):
+ with chdir(tmp_path):
+ ArComp(tar_file, ext='.tar').unpack(dest=tmp_path)
+ assert (tmp_path / 'file1').read_text() == 'Hello world'
+ assert (tmp_path / 'file2').read_text() == 'Larry the Cow'
+
+ def test_tar_bz2(self, tmp_path, tar_bz2_file):
+ with chdir(tmp_path):
+ ArComp(tar_bz2_file, ext='.tar.bz2').unpack(dest=tmp_path)
+ assert (tmp_path / 'file1').read_text() == 'Hello world'
+ assert (tmp_path / 'file2').read_text() == 'Larry the Cow'
+
+ def test_tbz2(self, tmp_path, tbz2_file):
+ with chdir(tmp_path):
+ ArComp(tbz2_file, ext='.tbz2').unpack(dest=tmp_path)
+ assert (tmp_path / 'file1').read_text() == 'Hello world'
+ assert (tmp_path / 'file2').read_text() == 'Larry the Cow'
+
+ def test_fallback_tbz2(self, tmp_path, tbz2_file):
+ with hide_binary(*_TarBZ2.compress_binary[:-1]):
+ with chdir(tmp_path):
+ ArComp(tbz2_file, ext='.tbz2').unpack(dest=tmp_path)
+ assert (tmp_path / 'file1').read_text() == 'Hello world'
+ assert (tmp_path / 'file2').read_text() == 'Larry the Cow'
+
+ def test_no_fallback_tbz2(self, tmp_path, tbz2_file):
+ with hide_binary(*_TarBZ2.compress_binary), chdir(tmp_path):
+ with pytest.raises(ArCompError, match='no compression binary'):
+ ArComp(tbz2_file, ext='.tbz2').unpack(dest=tmp_path)
+
+ def test_lzma(self, tmp_path, lzma_file):
+ dest = tmp_path / 'file'
+ with chdir(tmp_path):
+ ArComp(lzma_file, ext='.lzma').unpack(dest=dest)
+ assert (dest).read_bytes() == b'Hello world'