-
-
Save pydemo/35f8f75db5e7af947aa85a7084a8d496 to your computer and use it in GitHub Desktop.
Python HDFS + Parquet (hdfs3, PyArrow + libhdfs, HdfsCLI + Knox)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
################################################################## | |
## Native hdfs access (only on the cluster) | |
# conda install -c conda-forge libhdfs3=2.3.0=1 hdfs3 --yes | |
import hdfs3 | |
import pandas as pd | |
nameNodeHost = 'hadoopnn1.localdomain' | |
nameNodeIPCPort = 8020 | |
hdfs = hdfs3.HDFileSystem(nameNodeHost, port=nameNodeIPCPort) | |
# ls remote folder | |
hdfs.ls('/user/hdfs', detail=False) | |
# pandas csv using a file descriptor | |
with hdfs.open('/user/hdfs/user_stats.log') as f: | |
df = pd.read_csv(f, nrows=1000) | |
import pyarrow as pa | |
with hdfs.open('/user/hdfs/user_loc_data.parquet') as f: | |
table = pa.read_parquet(f) | |
################################################################## | |
## PyArrow | |
# libhdfs.so path | |
import os | |
import subprocess | |
from subprocess import PIPE | |
import pyarrow as pa | |
import pyarrow.parquet as pq | |
import pandas as pd | |
cmd = ["locate", "-l", "1", "libhdfs.so"] | |
libhdfsso_path = subprocess\ | |
.Popen(cmd, stdout=PIPE)\ | |
.stdout\ | |
.read()\ | |
.rstrip() | |
os.environ["ARROW_LIBHDFS_DIR"] = os.path.dirname(libhdfsso_path.decode("utf-8") ) | |
# classpath | |
cmd = ["/usr/bin/hdfs", "classpath", "--glob"] | |
hadoop_cp = subprocess\ | |
.Popen(cmd, stdout=PIPE)\ | |
.stdout\ | |
.read()\ | |
.rstrip() | |
os.environ["CLASSPATH"] = hadoop_cp.decode("utf-8") | |
nameNodeHost = 'hadoopnn1.localdomain' | |
nameNodeIPCPort = 8020 | |
fs = pa.hdfs.connect(nameNodeHost, nameNodeIPCPort, user='hdfs') | |
fs.ls('/user/hdfs/') | |
# Either use a Python file as with hdfs3 or | |
# define a pyarrow dataset (much faster access) | |
dataset = pq.ParquetDataset('/user/hdfs/user_loc_data.parquet', filesystem=fs) | |
table = dataset.read(nthreads=10) | |
# print some info about the table | |
table.num_columns | |
table.num_rows | |
table.schema | |
# Convert the PyArrow Table to Pandas DF | |
df = table.to_pandas() | |
################################################################## | |
## Access using Knox Gateway and the HdfsCLI | |
# conda install -c conda-forge python-hdfs | |
from hdfs import InsecureClient, HdfsError | |
import pandas as pd | |
nameNodeHost = 'hadoopnn1.localdomain' | |
nameNodeHttpPort = 50070 | |
webhdfsUrl = f'http://{nameNodeHost}:{nameNodeHttpPort}' | |
client = InsecureClient(webhdfsUrl, user='hdfs') | |
with client.read('/user/hdfs/user_stats.log', encoding = 'utf-8') as f: | |
df = pd.read_csv(f, nrows=1000) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment