create a simple HDFS client for example:
import logging
import subprocess
from typing import List, Tuple, Union, TextIO, BinaryIO
logger = logging.getLogger(__name__)
def run_command(args: List, **kwargs) -> subprocess.CompletedProcess:
"""run shell command in subprocess"""
def pretty(cmd: List[str]) -> str:
import shlex
return ' '.join(shlex.quote(s) for s in cmd)
def ellipsised(lst: List, head: int, tail: int) -> List:
"""truncate the content of a list with ellipsis, in case that the list is too long to print"""
if len(lst) <= head + tail:
return lst
return lst[:head] + ['...'] + lst[-tail:]
logger.info(f'running command: >>> {pretty(ellipsised(args, 6, 4))} <<<')
kwargs = {
'stdout': subprocess.PIPE,
'stderr': subprocess.PIPE,
'check': False,
'encoding': 'utf8',
**kwargs}
try:
p = subprocess.run(args, **kwargs)
except subprocess.CalledProcessError as e:
raise RuntimeError(f'command {pretty(args)} exited with code {e.returncode}. stdout: {e.stdout}. stderr: {e.stderr}')
if not kwargs.get('check') and p.returncode != 0:
logger.error(f'command {pretty(args)} exited with code {p.returncode}. stdout: {p.stdout}. stderr: {p.stderr}')
return p
class HDFSClient(object):
"""
HDFS client
"""
def __init__(self, hdfs_bin: str):
self._hdfs_bin = hdfs_bin
def ls(self, path: str) -> Tuple[List, str]:
"""A wrapper of `hdfs dfs -ls ...`
return ls results as a list, and additional raw output of the command
"""
p = run_command([self._hdfs_bin, 'dfs', '-ls', path])
subs = [line.split()[-1] for line in p.stdout.strip().split('\n')[1:]]
return subs, p.stdout
def rm(self, path: Union[str, List[str]], check=False) -> None:
"""A wrapper of `hdfs dfs -rm -r -f ...`"""
if isinstance(path, (list, tuple)):
cmd = [self._hdfs_bin, 'dfs', '-rm', '-r', '-f', *path]
else:
cmd = [self._hdfs_bin, 'dfs', '-rm', '-r', '-f', path]
run_command(cmd, check=check)
def mkdir(self, path: Union[str, List[str]], silent=True, check=False) -> None:
"""A wrapper of `hdfs dfs -mkdir -p ...`"""
if isinstance(path, (list, tuple)):
cmd = [self._hdfs_bin, 'dfs', '-mkdir', *path]
else:
cmd = [self._hdfs_bin, 'dfs', '-mkdir', path]
if silent:
cmd.insert(3, '-p')
run_command(cmd, check=check)
def touch(self, path: Union[str, List[str]], check=False) -> None:
"""A wrapper of `hdfs dfs -touchz ...`"""
if isinstance(path, (list, tuple)):
cmd = [self._hdfs_bin, 'dfs', '-touchz', *path]
else:
cmd = [self._hdfs_bin, 'dfs', '-touchz', path]
run_command(cmd, check=check)
def put(self, localsrc: Union[str, List[str]], dst: str, overwrite=False, check=False) -> None:
"""A wrapper of `hdfs dfs -put ...`"""
if isinstance(localsrc, (list, tuple)):
cmd = [self._hdfs_bin, 'dfs', '-put', *localsrc, dst]
else:
cmd = [self._hdfs_bin, 'dfs', '-put', localsrc, dst]
if overwrite:
cmd.insert(3, '-f')
run_command(cmd, check=check)
def append_to_file(self, localsrc: Union[str, List[str]], dst: str, check=False) -> None:
"""A wrapper of `hdfs dfs -appendToFile ...`"""
if isinstance(localsrc, (list, tuple)):
cmd = [self._hdfs_bin, 'dfs', '-appendToFile', *localsrc, dst]
else:
cmd = [self._hdfs_bin, 'dfs', '-appendToFile', localsrc, dst]
run_command(cmd, check=check)
def get(self, src: Union[str, List[str]], localdst: str, check=False) -> None:
"""A wrapper of `hdfs dfs -get ...``"""
if isinstance(src, (list, tuple)):
cmd = [self._hdfs_bin, 'dfs', '-get', *src, localdst]
else:
cmd = [self._hdfs_bin, 'dfs', '-get', src, localdst]
run_command(cmd, check=check)
def cat(self, path: Union[str, List[str]], redirect: BinaryIO = None, check=False) -> Union[bytes, None]:
"""A wrapper of `hdfs dfs -cat ...`"""
if isinstance(path, (list, tuple)):
cmd = [self._hdfs_bin, 'dfs', '-cat', *path]
else:
cmd = [self._hdfs_bin, 'dfs', '-cat', path]
if redirect:
run_command(cmd, encoding=None, stdout=redirect, check=check)
return None
else:
p = run_command(cmd, encoding=None, check=check)
return p.stdout
def text(self, path: Union[str, List[str]], redirect: TextIO = None, encoding: str = 'utf8', check=False) -> Union[str, None]:
"""A wrapper of `hdfs dfs -text ...`"""
if isinstance(path, (list, tuple)):
cmd = [self._hdfs_bin, 'dfs', '-text', *path]
else:
cmd = [self._hdfs_bin, 'dfs', '-text', path]
if redirect:
run_command(cmd, encoding=encoding, stdout=redirect, check=check)
return None
else:
p = run_command(cmd, check=check)
return p.stdout
def write(self, dst: str, content: Union[str, bytes], mode: str = 'w', encoding: str = 'utf8', check=False) -> None:
"""write str or bytes into a HDFS file. Support w(write) and a(append) modes"""
if mode == 'w':
cmd = [self._hdfs_bin, 'dfs', '-put', '-f', '-', dst]
elif mode == 'a':
cmd = [self._hdfs_bin, 'dfs', '-appendToFile', '-', dst]
else:
raise ValueError(f'Invalid write mode: {mode!r}, should be "w" or "a"')
run_command(cmd, input=content, encoding=encoding, check=check)