Last active
March 21, 2022 16:33
-
-
Save Glutexo/71c2d84fbcbdcc0238b162652425d6f4 to your computer and use it in GitHub Desktop.
Get a file from an Insights archive
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import namedtuple | |
from contextlib import contextmanager | |
from functools import partial | |
from io import BytesIO | |
from io import StringIO | |
from os.path import join | |
from re import escape | |
from re import fullmatch | |
from shlex import quote | |
from tarfile import open as tar_open | |
from tarfile import TarFile | |
from tarfile import TarInfo | |
from tempfile import TemporaryDirectory | |
from unittest.mock import patch | |
# Not from sys import, for the pytest capsys fixture to work. | |
# See https://github.com/pytest-dev/pytest/issues/8900 | |
import sys | |
try: | |
from pytest import mark | |
from pytest import raises | |
except ImportError: | |
PYTEST = False | |
else: | |
PYTEST = True | |
# Reads Insights Client archive path from a --no-upload run and prints contents | |
# of a single file in the archive. | |
# Tested with Python 3.6.8, pytest-3.0.6. | |
# Usage: | |
# $ insights-client --no-upload | python get_file_from_insights_archive.py | |
# data/etc/ssh/sshd_config | |
# $ python get_file_from_insights_archive.py | |
# /var/tmp/j_ql8l7v/insights-localhost-20211026190220.tar.gz | |
# data/etc/ssh/sshd_config | |
# Caveats: | |
# Does not work with --verbose. | |
# Run tests: | |
# $ pytest get_file_from_insights_archive.py | |
# Example input: | |
# Starting to collect Insights data for localhost.localdomain | |
# Archive saved at /var/tmp/j_ql8l7v/insights-localhost-20211026190220.tar.gz | |
_ENCODING = "utf-8" | |
_TMP_PATH = "/var/tmp" | |
_PATH_PATTERN = ( | |
escape(_TMP_PATH) + | |
r"/[^/]+/((insights-.+?|soscleaner)-\d{14,})\.tar\.gz" | |
) | |
_STDIN_PATTERN = r"Archive saved at (.+)\n" | |
_stdin_match = partial(fullmatch, _STDIN_PATTERN) | |
_ArchiveInfo = namedtuple("_ArchiveInfo", ("path", "name")) | |
def _find_archive_path_in_stdin(): | |
for matched in filter(None, map(_stdin_match, sys.stdin)): | |
return matched[1] | |
else: | |
raise ValueError("Archive path not found.") | |
@contextmanager | |
def _open_archive(archive_path): | |
with tar_open(archive_path, "r:gz") as archive: | |
yield archive | |
def _member_path(archive_path, file_path): | |
match = fullmatch(_PATH_PATTERN, archive_path) | |
archive_name = match[1] | |
archive_prefix = match[2] | |
member_path = join(archive_name, file_path) | |
if archive_prefix != "soscleaner": | |
member_path = join(".", member_path) | |
return member_path | |
def _extract_member(archive, member_path): | |
try: | |
return archive.extractfile(member_path) | |
except KeyError: | |
raise LookupError("File not found in archive.") | |
def _print_to_stderr(*args, **kwargs): | |
print(*args, **kwargs, file=sys.stderr) | |
def _print_to_stdout(*args, **kwargs): | |
print(*args, **kwargs, file=sys.stdout) | |
def main(archive_path, file_path): | |
try: | |
archive_path = archive_path or _find_archive_path_in_stdin() | |
except ValueError as error: | |
_print_to_stderr(error) | |
return | |
_print_to_stderr("Found archive path %s." % (archive_path,)) | |
with _open_archive(archive_path) as archive: | |
member_path = _member_path(archive_path, file_path) | |
try: | |
member = _extract_member(archive, member_path) | |
except LookupError as error: | |
_print_to_stderr(error) | |
return | |
_print_to_stderr("Found member %s." % (member_path,)) | |
command_raw = [sys.executable, __file__, archive_path, file_path] | |
command_shell = " ".join(map(quote, command_raw)) | |
_print_to_stderr( | |
"Re-run with the same archive and member with %s." % command_shell | |
) | |
for line_bytes in member: | |
line_str = line_bytes.decode(_ENCODING) | |
_print_to_stdout(line_str, end="") | |
# --------- Tests begin here --------- | |
if PYTEST: | |
@patch( | |
"%s.sys.stdin" % __name__, | |
StringIO("""# Starting to collect Insights data for localhost.localdomain | |
Archive saved at /var/tmp/j_ql8l7v/insights-localhost-20211026190220.tar.gz | |
""") | |
) | |
def test_find_archive_path_in_stdin_found(): | |
actual = _find_archive_path_in_stdin() | |
expected = "/var/tmp/j_ql8l7v/insights-localhost-20211026190220.tar.gz" | |
assert actual == expected | |
@patch("%s.sys.stdin" % __name__, StringIO("")) | |
def test_find_archive_path_in_stdin_not_found(): | |
with raises(ValueError) as exception_info: | |
_find_archive_path_in_stdin() | |
assert str(exception_info.value) == "Archive path not found." | |
def test_open_archive(): | |
with TemporaryDirectory(dir=_TMP_PATH) as tmp_dir: | |
archive_path = join( | |
tmp_dir, "insights-localhost-20211026190220.tar.gz" | |
) | |
with TarFile.open(archive_path, mode="w:gz") as _tar: | |
pass | |
with _open_archive(archive_path) as archive: | |
assert archive.name == archive_path | |
assert archive.mode == "r" | |
def test_member_path(): | |
archive_path =\ | |
"/var/tmp/j_ql8l7v/insights-localhost-20211026190220.tar.gz" | |
file_path = "data/etc/ssh/sshd_config" | |
member_path = _member_path(archive_path, file_path) | |
assert member_path ==\ | |
"./insights-localhost-20211026190220/data/etc/ssh/sshd_config" | |
def test_extract_member_found(): | |
with TemporaryDirectory(dir=_TMP_PATH) as tmp_dir: | |
archive_name = "insights-localhost-20211026190220" | |
archive_path = join(tmp_dir, archive_name + ".tar.gz") | |
member_path = "./%s/data/etc/ssh/sshd_config" % archive_name | |
with TarFile.open(archive_path, mode="w:gz") as tar: | |
info = TarInfo(member_path) | |
tar.addfile(info) | |
with TarFile.open(archive_path, mode="r:gz") as tar: | |
member = _extract_member(tar, member_path) | |
assert member.read() == b"" | |
def test_extract_member_not_found(): | |
with TemporaryDirectory(dir=_TMP_PATH) as tmp_dir: | |
archive_name = "insights-localhost-20211026190220" | |
archive_path = join(tmp_dir, archive_name + ".tar.gz") | |
with TarFile.open(archive_path, mode="w:gz") as tar: | |
pass | |
with TarFile.open(archive_path, mode="r:gz") as tar: | |
member_path = "./%s/data/etc/ssh/sshd_config" % archive_name | |
with raises(LookupError) as exception_info: | |
_extract_member(tar, member_path) | |
assert str(exception_info.value) == "File not found in archive." | |
@patch("%s._extract_member" % __name__) | |
@patch("%s._member_path" % __name__) | |
@patch("%s._open_archive" % __name__) | |
@patch("%s._find_archive_path_in_stdin" % __name__) | |
def test_main_arg_calls( | |
find_archive_path_in_stdin, | |
open_archive, | |
member_path, | |
extract_member, | |
capsys | |
): | |
archive_path =\ | |
"/var/tmp/j_ql8l7v/insights-localhost-20211026190220.tar.gz" | |
file_path = "data/etc/ssh/sshd_config" | |
main(archive_path, file_path) | |
find_archive_path_in_stdin.assert_not_called() | |
open_archive.assert_called_once_with(archive_path) | |
member_path.assert_called_once_with(archive_path, file_path) | |
extract_member.assert_called_once_with( | |
open_archive.return_value.__enter__.return_value, | |
member_path.return_value | |
) | |
@patch("%s._extract_member" % __name__) | |
@patch("%s._member_path" % __name__) | |
@patch("%s._open_archive" % __name__) | |
@patch( | |
"%s._find_archive_path_in_stdin" % __name__, | |
return_value=\ | |
"/var/tmp/j_ql8l7v/insights-localhost-20211026190220.tar.gz" | |
) | |
def test_main_stdin_calls( | |
find_archive_path_in_stdin, | |
open_archive, | |
member_path, | |
extract_member, | |
capsys | |
): | |
file_path = "data/etc/ssh/sshd_config" | |
main(None, file_path) | |
find_archive_path_in_stdin.assert_called_once_with() | |
open_archive.assert_called_once_with( | |
find_archive_path_in_stdin.return_value | |
) | |
member_path.assert_called_once_with( | |
find_archive_path_in_stdin.return_value, file_path | |
) | |
extract_member.assert_called_once_with( | |
open_archive.return_value.__enter__.return_value, | |
member_path.return_value | |
) | |
@patch("%s._member_path" % __name__) | |
@patch("%s._open_archive" % __name__) | |
def test_main_output(_open_archive, _member_path, capsys): | |
expected_output = "HostKey /etc/ssh/ssh_host_rsa_key\n" | |
return_value = BytesIO(expected_output.encode(_ENCODING)) | |
with patch( | |
"%s._extract_member" % __name__, return_value=return_value | |
) as __extract_member: | |
main( | |
"/var/tmp/j_ql8l7v/insights-localhost-20211026190220.tar.gz", | |
"data/etc/ssh/sshd_config" | |
) | |
out, _err = capsys.readouterr() | |
assert out == expected_output | |
@patch( | |
"%s._find_archive_path_in_stdin" % __name__, | |
side_effect=ValueError("not found") | |
) | |
def test_main_message_archive_not_found( | |
find_archive_path_in_stdin, capsys | |
): | |
main(None, "data/etc/ssh/sshd_config") | |
_out, err = capsys.readouterr() | |
err_lines = err.rstrip("\n").split("\n") | |
assert len(err_lines) == 1 | |
assert err_lines[0] == "%s" % (find_archive_path_in_stdin.side_effect,) | |
@mark.parametrize( | |
("extract_member_config",), | |
[({},), ({"side_effect": LookupError("not found")},)] | |
) | |
@patch("%s._extract_member" % __name__) | |
@patch("%s._member_path" % __name__) | |
@patch("%s._open_archive" % __name__) | |
def test_main_message_archive_found( | |
_open_archive, | |
_member_path, | |
extract_member, | |
extract_member_config, | |
capsys | |
): | |
extract_member.configure_mock(**extract_member_config) | |
archive_path =\ | |
"/var/tmp/j_ql8l7v/insights-localhost-20211026190220.tar.gz" | |
main(archive_path, "data/etc/ssh/sshd_config") | |
_out, err = capsys.readouterr() | |
err_lines = err.rstrip("\n").split("\n") | |
assert err_lines[0] == "Found archive path %s." % (archive_path,) | |
@patch("%s._extract_member" % __name__) | |
@patch( | |
"%s._member_path" % __name__, | |
return_value="./insights-localhost-timestamp/file/path" | |
) | |
@patch("%s._open_archive" % __name__) | |
def test_main_message_member_found( | |
_open_archive, member_path, _extract_member, capsys | |
): | |
main( | |
"/var/tmp/j_ql8l7v/insights-localhost-20211026190220.tar.gz", | |
"data/etc/ssh/sshd_config" | |
) | |
_out, err = capsys.readouterr() | |
err_lines = err.rstrip("\n").split("\n") | |
assert len(err_lines) == 3 | |
assert err_lines[1] == "Found member %s." % (member_path.return_value,) | |
@patch("%s._extract_member" % __name__) | |
@patch( | |
"%s._member_path" % __name__, | |
return_value="./insights-localhost-timestamp/file/path" | |
) | |
@patch("%s._open_archive" % __name__) | |
def test_main_message_command( | |
_open_archive, _member_path, _extract_member, capsys | |
): | |
archive_path =\ | |
"/var/tmp/j_ql8l7v/insights-localhost-20211026190220.tar.gz" | |
file_path = "data/etc/ssh/sshd_config" | |
main(archive_path, file_path) | |
_out, err = capsys.readouterr() | |
err_lines = err.rstrip("\n").split("\n") | |
assert len(err_lines) == 3 | |
command_raw = [sys.executable, __file__, archive_path, file_path] | |
command_shell = " ".join(map(quote, command_raw)) | |
assert err_lines[2] == ( | |
"Re-run with the same archive and member with %s." % | |
(command_shell,) | |
) | |
if __name__ == "__main__": | |
if len(sys.argv) == 3: | |
archive_path_arg = sys.argv[1] | |
file_path_arg = sys.argv[2] | |
elif len(sys.argv) == 2: | |
archive_path_arg = None | |
file_path_arg = sys.argv[1] | |
else: | |
raise ValueError("Invalid argument count.") | |
main(archive_path_arg, file_path_arg) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment