Skip to content

Instantly share code, notes, and snippets.

@hzhu212
Created May 12, 2021 12:50
Show Gist options
  • Save hzhu212/52a342065af15b9caa90e34103fbd2fd to your computer and use it in GitHub Desktop.
Save hzhu212/52a342065af15b9caa90e34103fbd2fd to your computer and use it in GitHub Desktop.
Wrap shell command with subprocess

Wrap shell command with subprocess

create a simple HDFS client for example:

import logging
import subprocess
from typing import List, Tuple, Union, TextIO, BinaryIO


logger = logging.getLogger(__name__)


def run_command(args: List, **kwargs) -> subprocess.CompletedProcess:
    """run shell command in subprocess"""
    def pretty(cmd: List[str]) -> str:
        import shlex
        return ' '.join(shlex.quote(s) for s in cmd)

    def ellipsised(lst: List, head: int, tail: int) -> List:
        """truncate the content of a list with ellipsis, in case that the list is too long to print"""
        if len(lst) <= head + tail:
            return lst
        return lst[:head] + ['...'] + lst[-tail:]

    logger.info(f'running command: >>> {pretty(ellipsised(args, 6, 4))} <<<')
    kwargs = {
        'stdout': subprocess.PIPE,
        'stderr': subprocess.PIPE,
        'check': False,
        'encoding': 'utf8',
        **kwargs}

    try:
        p = subprocess.run(args, **kwargs)
    except subprocess.CalledProcessError as e:
        raise RuntimeError(f'command {pretty(args)} exited with code {e.returncode}. stdout: {e.stdout}. stderr: {e.stderr}')

    if not kwargs.get('check') and p.returncode != 0:
        logger.error(f'command {pretty(args)} exited with code {p.returncode}. stdout: {p.stdout}. stderr: {p.stderr}')

    return p


class HDFSClient(object):
    """
    HDFS client
    """
    def __init__(self, hdfs_bin: str):
        self._hdfs_bin = hdfs_bin

    def ls(self, path: str) -> Tuple[List, str]:
        """A wrapper of `hdfs dfs -ls ...`
        return ls results as a list, and additional raw output of the command
        """
        p = run_command([self._hdfs_bin, 'dfs', '-ls', path])
        subs = [line.split()[-1] for line in p.stdout.strip().split('\n')[1:]]
        return subs, p.stdout

    def rm(self, path: Union[str, List[str]], check=False) -> None:
        """A wrapper of `hdfs dfs -rm -r -f ...`"""
        if isinstance(path, (list, tuple)):
            cmd = [self._hdfs_bin, 'dfs', '-rm', '-r', '-f', *path]
        else:
            cmd = [self._hdfs_bin, 'dfs', '-rm', '-r', '-f', path]
        run_command(cmd, check=check)

    def mkdir(self, path: Union[str, List[str]], silent=True, check=False) -> None:
        """A wrapper of `hdfs dfs -mkdir -p ...`"""
        if isinstance(path, (list, tuple)):
            cmd = [self._hdfs_bin, 'dfs', '-mkdir', *path]
        else:
            cmd = [self._hdfs_bin, 'dfs', '-mkdir', path]
        if silent:
            cmd.insert(3, '-p')
        run_command(cmd, check=check)

    def touch(self, path: Union[str, List[str]], check=False) -> None:
        """A wrapper of `hdfs dfs -touchz ...`"""
        if isinstance(path, (list, tuple)):
            cmd = [self._hdfs_bin, 'dfs', '-touchz', *path]
        else:
            cmd = [self._hdfs_bin, 'dfs', '-touchz', path]
        run_command(cmd, check=check)

    def put(self, localsrc: Union[str, List[str]], dst: str, overwrite=False, check=False) -> None:
        """A wrapper of `hdfs dfs -put ...`"""
        if isinstance(localsrc, (list, tuple)):
            cmd = [self._hdfs_bin, 'dfs', '-put', *localsrc, dst]
        else:
            cmd = [self._hdfs_bin, 'dfs', '-put', localsrc, dst]
        if overwrite:
            cmd.insert(3, '-f')
        run_command(cmd, check=check)

    def append_to_file(self, localsrc: Union[str, List[str]], dst: str, check=False) -> None:
        """A wrapper of `hdfs dfs -appendToFile ...`"""
        if isinstance(localsrc, (list, tuple)):
            cmd = [self._hdfs_bin, 'dfs', '-appendToFile', *localsrc, dst]
        else:
            cmd = [self._hdfs_bin, 'dfs', '-appendToFile', localsrc, dst]
        run_command(cmd, check=check)

    def get(self, src: Union[str, List[str]], localdst: str, check=False) -> None:
        """A wrapper of `hdfs dfs -get ...``"""
        if isinstance(src, (list, tuple)):
            cmd = [self._hdfs_bin, 'dfs', '-get', *src, localdst]
        else:
            cmd = [self._hdfs_bin, 'dfs', '-get', src, localdst]
        run_command(cmd, check=check)

    def cat(self, path: Union[str, List[str]], redirect: BinaryIO = None, check=False) -> Union[bytes, None]:
        """A wrapper of `hdfs dfs -cat ...`"""
        if isinstance(path, (list, tuple)):
            cmd = [self._hdfs_bin, 'dfs', '-cat', *path]
        else:
            cmd = [self._hdfs_bin, 'dfs', '-cat', path]
        if redirect:
            run_command(cmd, encoding=None, stdout=redirect, check=check)
            return None
        else:
            p = run_command(cmd, encoding=None, check=check)
            return p.stdout

    def text(self, path: Union[str, List[str]], redirect: TextIO = None, encoding: str = 'utf8', check=False) -> Union[str, None]:
        """A wrapper of `hdfs dfs -text ...`"""
        if isinstance(path, (list, tuple)):
            cmd = [self._hdfs_bin, 'dfs', '-text', *path]
        else:
            cmd = [self._hdfs_bin, 'dfs', '-text', path]
        if redirect:
            run_command(cmd, encoding=encoding, stdout=redirect, check=check)
            return None
        else:
            p = run_command(cmd, check=check)
            return p.stdout

    def write(self, dst: str, content: Union[str, bytes], mode: str = 'w', encoding: str = 'utf8', check=False) -> None:
        """write str or bytes into a HDFS file. Support w(write) and a(append) modes"""
        if mode == 'w':
            cmd = [self._hdfs_bin, 'dfs', '-put', '-f', '-', dst]
        elif mode == 'a':
            cmd = [self._hdfs_bin, 'dfs', '-appendToFile', '-', dst]
        else:
            raise ValueError(f'Invalid write mode: {mode!r}, should be "w" or "a"')
        run_command(cmd, input=content, encoding=encoding, check=check)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment