Created
November 16, 2018 16:26
-
-
Save xiaoshuai/da3f0195177abcad95a9fe86f4a19f28 to your computer and use it in GitHub Desktop.
hdfs.py : call hdfs command via python3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
"""hdfs.py: call hdfs command via python3""" | |
__author__ = "xiaoshuai, http://github.com/xiaoshuai" | |
__license__ = "GPL" | |
__version__ = "1.0.1" | |
import os | |
import re | |
import subprocess | |
import sys | |
from subprocess import check_output, CalledProcessError | |
PY3 = sys.version_info[0] == 3 | |
if not PY3: | |
print('Only Python3.6 above supported.', file=sys.stderr) | |
sys.exit(1) | |
class HdfsCommandError(RuntimeError): | |
def __init__(self, message): | |
super(HdfsCommandError, self).__init__(message) | |
self.message = message | |
print(self.message, file=sys.stderr) | |
class HdfsClient: | |
def __init__(self, hadoop_cmd='hdfs dfs', debug=False): | |
self.hadoop_cmd = hadoop_cmd.split(' ') | |
self.debug = debug | |
@staticmethod | |
def __local_path_exists(local_path): | |
import os.path | |
isfile = os.path.isfile(local_path) | |
isdir = os.path.isdir(local_path) | |
return isfile or isdir | |
def __call_check(self, cmd): | |
if self.debug: | |
debug_message = 'hdfs command "{0}" will call.'.format(subprocess.list2cmdline(cmd)) | |
print(debug_message, file=sys.stderr) | |
try: | |
stdout = check_output(cmd) | |
except CalledProcessError as err: | |
message = 'hdfs command "{0}" failed with error code {1}.'.format(' '.join(err.cmd), err.returncode) | |
raise HdfsCommandError(message) from err | |
stdout = stdout.decode('utf-8') | |
return stdout | |
def exists(self, path): | |
cmd = self.hadoop_cmd + ['-stat', path] | |
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, | |
close_fds=True, universal_newlines=True) | |
stdout, stderr = p.communicate() | |
if p.returncode == 0: | |
return True | |
else: | |
not_found_pattern = "^.*No such file or directory$" | |
not_found_re = re.compile(not_found_pattern) | |
for line in stderr.split('\n'): | |
if not_found_re.match(line): | |
return False | |
message = 'run_cmd="{0}" ret_code={1} error.'.format(subprocess.list2cmdline(cmd), p.returncode) | |
raise HdfsCommandError(message) | |
def move(self, path, dest): | |
if not dest: | |
return | |
if dest.startswith('/'): | |
parent_dir = os.path.dirname(dest) | |
if parent_dir != '' and not self.exists(parent_dir): | |
self.mkdir(parent_dir) | |
if not isinstance(path, (list, tuple)): | |
path = [path] | |
cmd = self.hadoop_cmd + ['-mv'] + path + [dest] | |
return self.__call_check(cmd) | |
def rename(self, path, dest): | |
return self.move(path, dest) | |
def rename_dont_move(self, path, dest): | |
if self.exists(dest): | |
raise HdfsCommandError('destination file already exist.') | |
return self.move(path, dest) | |
def remove(self, path, recursive=True, skip_trash=False): | |
pass | |
def chmod(self, path, permissions, recursive=False): | |
pass | |
def chown(self, path, owner, group, recursive=False): | |
pass | |
def count(self, path): | |
pass | |
def copy(self, path, destination): | |
if not self.exists(path): | |
raise HdfsCommandError('source file does not exist.') | |
cmd = self.hadoop_cmd + ['-cp', path, destination] | |
return self.__call_check(cmd) | |
def put(self, local_path, destination): | |
if not self.__local_path_exists(local_path): | |
raise HdfsCommandError('source file does not exist.') | |
cmd = self.hadoop_cmd + ['-put', local_path, destination] | |
return self.__call_check(cmd) | |
def get(self, path, local_destination): | |
if self.__local_path_exists(local_destination): | |
raise HdfsCommandError('destination file already exist.') | |
cmd = self.hadoop_cmd + ['-get', path, local_destination] | |
return self.__call_check(cmd) | |
def cat(self, path): | |
cmd = self.hadoop_cmd + ['-cat', path] | |
return self.__call_check(cmd) | |
def mkdir(self, path, parents=True, raise_if_exists=False): | |
pass | |
def listdir(self, path, ignore_directories=False, ignore_files=False, | |
include_size=False, include_type=False, include_time=False, recursive=False): | |
pass | |
def touchz(self, path): | |
pass | |
if __name__ == '__main__': | |
# /opt/python36/bin/python3 test_hdfs.py | |
hdfs = HdfsClient(debug=True) | |
return_msg = hdfs.cat('hello.txt') | |
print(return_msg) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment