Skip to content

Instantly share code, notes, and snippets.

@AoJ
Created August 4, 2020 07:25
Show Gist options
  • Save AoJ/b5c506eab4f179fd12722fa6c9dae027 to your computer and use it in GitHub Desktop.
Save AoJ/b5c506eab4f179fd12722fa6c9dae027 to your computer and use it in GitHub Desktop.
HDFS - Thousands json files

Create binary text file with json files inside. Ideally about 500MB - 1GB size.

CREATE TABLE if not exists json_files
(
   `filename`           STRING,
   `modified_dt`        DOUBLE,
   `content_size`       INT,
   `content`            STRING
)
PARTITIONED BY (`pr_day` STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\u0001'
LINES TERMINATED BY '\u0002'
STORED AS TEXTFILE;

example file layout (500MB - 1G):

<filename>\0001<modified_dt>\0001<content_size>\0001<content json>\0002
<filename>\0001<modified_dt>\0001<content_size>\0001<content json>\0002
<filename>\0001<modified_dt>\0001<content_size>\0001<content json>\0002
<filename>\0001<modified_dt>\0001<content_size>\0001<content json>\0002

Tip1: concat.py can stream concated file to stdout and you can use hdfs -put - to stream it to hdfs directly (and gziped)

#!/usr/bin/env python
fields_delimiter=b'\001'
lines_delimiter=b'\002'
import os
import sys
import mmap
import tarfile
def save_from_file(output, input, filename):
file_stat = os.stat(filename)
output.write(
str(filename) +
fields_delimiter +
str(file_stat.st_ctime) +
fields_delimiter +
str(file_stat.st_size) +
fields_delimiter
)
output.write(mmap.mmap(input.fileno(), 0, prot=mmap.PROT_READ))
output.write(lines_delimiter)
input.close()
def save_from_tar(output, input, member):
output.write(
str(member.name) +
fields_delimiter +
str(member.mtime) +
fields_delimiter +
str(member.size) +
fields_delimiter
)
output.write(input.read())
output.write(lines_delimiter)
input.close()
def concat_files(filenames, outfile):
if outfile == "/dev/stdout":
output = sys.stdout
else:
output = open(outfile, "w+b")
for filename in filenames:
if (filename.endswith(".tar.gz")):
tarcontent = tarfile.open(filename, "r:gz")
for member in tarcontent.getmembers():
if member.isfile():
input = tarcontent.extractfile(member)
save_from_tar(output, input, member)
else:
input = open(filename, "rb")
save_from_file(output, input, filename)
def main(argv):
import argparse
parser = argparse.ArgumentParser(description="concating files")
parser.add_argument("outfile", help="Output file.")
parser.add_argument('inputfiles', nargs='+', help='Input files')
args = parser.parse_args(argv[1:])
concat_files(args.inputfiles, args.outfile)
if __name__ == '__main__':
main(sys.argv)
# example usage
# ./concat.py target_file source_file1 source_file2 source_file3
# ./concat.py target_file source_file1 source_file2 source_file3 | gzip --stdout
# ./concat.py target_file source_file.tar.gz | gzip --stdout
@AoJ
Copy link
Author

AoJ commented Aug 4, 2020

copy and use example

echo '{"name": "Mars"}' > source_file1
echo '{"name": "Jupiter"}' > source_file2
echo '{"name": "Earth"}' > source_file3
echo '{"name": "Pluto"}' > source_file4

./concat.py target_file1 source_file1 source_file2
./concat.py target_file2 source_file3 source_file4

impala-shell -q 'CREATE TABLE if not exists default.json_files ( `filename` STRING,`modified_dt` DOUBLE, `content_size` INT, `content` STRING ) PARTITIONED BY (`pr_day` STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY "\u0001" LINES TERMINATED BY "\u0002" STORED AS TEXTFILE;'

impala-shell -q 'alter table default.json_files add partition (pr_day="01_01_2020")'
impala-shell -q 'alter table default.json_files add partition (pr_day="02_01_2020")'

./concat.py /dev/stdout source_file1 source_file2 | gzip --stdout | hadoop fs -put - /user/hive/warehouse/default/json_files/pr_day=01_01_2020/01_01_2020.gz
./concat.py /dev/stdout source_file3 source_file4 | gzip --stdout | hadoop fs -put - /user/hive/warehouse/default/json_files/pr_day=02_01_2020/02_01_2020.gz

impala-shell -q 'refresh default.json_files'
impala-shell -q 'select * from default.json_files'

PROD [aoj@hdfs json]# impala-shell -q 'select * from default.json_files' 2>/dev/null
+--------------+---------------+--------------+---------------------+------------+
| filename     | modified_dt   | content_size | content             | pr_day     |
+--------------+---------------+--------------+---------------------+------------+
| source_file1 | 1583242930.56 | 17           | {"name": "Mars"}    | 01_01_2020 |
|              |               |              |                     |            |
| source_file2 | 1583242930.57 | 20           | {"name": "Jupiter"} | 01_01_2020 |
|              |               |              |                     |            |
| source_file3 | 1583242930.59 | 18           | {"name": "Earth"}   | 02_01_2020 |
|              |               |              |                     |            |
| source_file4 | 1583242931.33 | 18           | {"name": "Pluto"}   | 02_01_2020 |
|              |               |              |                     |            |
+--------------+---------------+--------------+---------------------+------------+

@AoJ
Copy link
Author

AoJ commented Aug 4, 2020

or from remote server you can call

scp concat.py [email protected]:/tmp/concat.py
ssh [email protected] '/tmp/concat.py /dev/stdout /data/app/files/*.json | gzip --stdout' | hadoop fs -put - /user/hive/warehouse/default/json_files/pr_day=01_01_2020/01_01_2020.json.gz

@AoJ
Copy link
Author

AoJ commented Mar 9, 2021

mp3

# only in hive/spark, impala does not support it
CREATE TABLE if not exists mp3_files_stage
(
   `filename`           STRING,
   `modified_dt`        BIGINT,
   `content_size`       INT,
   `content`            STRUCT <mp3:STRING>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\u0001'
COLLECTION ITEMS TERMINATED BY '\u0003'
MAP KEYS TERMINATED BY '\u0004'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;

# supported in hive/spark/impala
CREATE TABLE if not exists mp3_files
(
   `filename`           STRING,
   `modified_dt`        BIGINT,
   `content_size`       INT,
   `content`            STRUCT <mp3:STRING>
)
STORED AS PARQUET;

# test data in bash
dd if=/dev/urandom of=test.mp3 bs=1000 count=1024
echo -e "test.mp3\x01$(stat -c%Y test.mp3)\x01$(stat -c%s test.mp3)\x01$(cat test.mp3 | base64 -w 0)\x03" | hadoop fs -put -f - /user/hive/warehouse/mp3_files_stage/test.csv

# convert to parquet, supported in hive/spark
INSERT OVERWRITE TABLE mp3_files SELECT filename, modified_dt, content_size, content FROM mp3_files_stage;


# playing in impala
select * from mp3_files;
+----------+-------------+--------------+
| filename | modified_dt | content_size |
+----------+-------------+--------------+
| test.mp3 | 1615294387  | 38895        |
+----------+-------------+--------------+

select *, left(content.mp3, 15) from mp3_files
+----------+-------------+--------------+-----------------------+
| filename | modified_dt | content_size | left(content.mp3, 15) |
+----------+-------------+--------------+-----------------------+
| test.mp3 | 1615294387  | 38895        | MTIzNDU2Nzg5MTA       |
+----------+-------------+--------------+-----------------------+

# playing in spark
spark.sql("select * from mp3_files").show()
+--------+-----------+------------+--------------------+
|filename|modified_dt|content_size|             content|
+--------+-----------+------------+--------------------+
|test.mp3| 1615296874|     1024000|[8O/s/yTNTZTL9J5V...|
+--------+-----------+------------+--------------------+



spark.sql("select filename, content.mp3 from mp3_files").show()
+--------+-----------+------------+--------------------+
|filename|modified_dt|content_size|             content|
+--------+-----------+------------+--------------------+
|test.mp3| 1615296874|     1024000|[8O/s/yTNTZTL9J5V...|
+--------+-----------+------------+--------------------+

spark.sql("select filename, content.mp3 from mp3_files").foreachPartition(batch => batch.foreach(kv => { val mp3 = kv.getString(1); println(mp3.size)}))
1365336

# playing in pyspark

spark.sql("select filename, content.mp3 from mp3_files")
DataFrame[filename: string, mp3: string]


Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment