Last active
April 11, 2025 07:02
-
-
Save idiomer/ef70cd0ac6bf0326599db2286f41a974 to your computer and use it in GitHub Desktop.
Using PySpark to handle HDFS, such as list (ls), rename (mv), delete (rm)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
The path is a directory by default | |
''' | |
def hdfs_list(path, subtract_one=True): | |
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration()) | |
list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(path)) | |
# file.getPath().getName(), file.getBlockSize(), file.getLen() | |
files_size = [file.getLen() for file in list_status] | |
totol_size_in_MB = sum(files_size) / 1024.0 / 1024.0 | |
total_num_files = len(files_size) - 1 if subtract_one else len(files_size) # dont count _SUCCESS file | |
return totol_size_in_MB, total_num_files | |
def hdfs_rename(old_path, new_path): | |
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration()) | |
fs.rename( | |
spark._jvm.org.apache.hadoop.fs.Path(old_path), | |
spark._jvm.org.apache.hadoop.fs.Path(new_path) | |
) | |
return True | |
def hdfs_delete(path): | |
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration()) | |
fs.delete(spark._jvm.org.apache.hadoop.fs.Path(path), True) | |
return True | |
def smart_overwrite(path, fmt='parquet', compression='gzip', block_size_in_MB=128, min_max_scale=8.0): | |
# 只支持parquet | |
totol_size_in_MB, total_num_files = hdfs_list(path) | |
avg_file_size_in_MB = totol_size_in_MB / total_num_files | |
min_file_size, max_file_size = min_max_scale / min_max_scale, block_size_in_MB * min_max_scale | |
import math | |
n_files = int(math.ceil(totol_size_in_MB / (block_size_in_MB * 0.9))) | |
bak_path = path.rstrip('/') + '.bak' | |
if min_file_size <= avg_file_size_in_MB <= max_file_size: | |
print("INFO: file size is normal. Don't overwrite") | |
return False | |
elif avg_file_size_in_MB < min_file_size: | |
if total_num_files <= 10: | |
print("INFO: file size is too small, but number of files <= 10. Don't overwrite") | |
return False | |
else: | |
print("WARN: file size is too small, will read data, coalesce and overwrite") | |
spark.read.load(path).coalesce(max(10, n_files)).write.parquet( | |
bak_path, compression=compression, mode='overwrite' | |
) | |
hdfs_delete(path) | |
hdfs_rename(bak_path, path) | |
return True | |
else: | |
if total_num_files >= 10000: | |
print("INFO: file size is too large, but number of files >= 10000. Don't overwrite") | |
return False | |
else: | |
print("WARN: file size is too large, will read data, repartition and overwrite") | |
spark.read.load(path).repartition(min(10000, n_files)).write.parquet( | |
bak_path, compression=compression, mode='overwrite' | |
) | |
hdfs_delete(path) | |
hdfs_rename(bak_path, path) | |
return True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment