Last active
November 12, 2020 20:45
-
-
Save vepetkov/b6985f47f7ea201db9859e18b89fedf7 to your computer and use it in GitHub Desktop.
Convert a CSV to Hive DDL + AVRO Schema (with type inference)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import pandas | |
import sys | |
import argparse | |
import string | |
import subprocess | |
import json | |
import textwrap | |
import re | |
debug = True | |
def debug(str): | |
if debug: | |
print(str) | |
def error(str): | |
print(str) | |
def convNumpy2Avro(dtype): | |
dataTypes = { | |
'object': 'string', | |
'int': 'boolean', | |
'int': 'int', | |
'int64': 'long', | |
'float': 'float', | |
'float64': 'double' | |
} | |
return dataTypes[dtype] if dtype in dataTypes else 'string' | |
def convNumpy2Hive(dtype): | |
dataTypes = { | |
'object': 'STRING', | |
'int': 'BOOLEAN', | |
'int': 'INT', | |
'int64': 'BIGINT', | |
'float': 'FLOAT', | |
'float64': 'DOUBLE' | |
} | |
return dataTypes[dtype] if dtype in dataTypes else 'STRING' | |
def genAvroSchema(name, colMetadata, ns="com.example.test.avro"): | |
fields = [{"name": x[0], "type": convNumpy2Avro(x[1])} for x in colMetadata] | |
schemaJson = { | |
"namespace": ns, | |
"type": "record", | |
"name": name, | |
"fields": fields | |
} | |
schema = json.dumps(schemaJson, indent=2) | |
debug(schema) | |
return schema | |
def genCsvDDL(db, table, colMetadata, sep): | |
pattern = re.compile(r"[.:\s]") | |
cols = ",\n ".join(['`%s` %s COMMENT \'\'' % (pattern.sub("_", x[0]), convNumpy2Hive(x[1])) | |
for x in colMetadata]) | |
ddl = f""" | |
CREATE EXTERNAL TABLE {db}.{table}_csv | |
( {cols} ) | |
COMMENT 'This table is ...' | |
ROW FORMAT DELIMITED | |
FIELDS TERMINATED BY '{sep}' | |
STORED AS TEXTFILE | |
LOCATION '{args.hdfsCSVDir}'; | |
""" | |
ddl = textwrap.dedent(ddl).strip() | |
debug(ddl) | |
return ddl | |
def genOrcDDL(db, table, colMetadata, sep): | |
pattern = re.compile(r"[.:\s]") | |
cols = ",\n ".join(['`%s` %s COMMENT \'\'' % (pattern.sub("_", x[0]), convNumpy2Hive(x[1])) | |
for x in colMetadata]) | |
ddl = f""" | |
CREATE TABLE {db}.{table} | |
( {cols} ) | |
COMMENT 'This table is ...' | |
ROW FORMAT DELIMITED | |
FIELDS TERMINATED BY '{sep}' | |
STORED AS ORC | |
LOCATION '{args.hdfsTableDir}' | |
TBLPROPERTIES ("orc.compress"="SNAPPY"); | |
""" | |
ddl = textwrap.dedent(ddl).strip() | |
debug(ddl) | |
return ddl | |
def genCsv2AvroMergeDDL(db, table, colMetadata): | |
# TODO | |
cols = ",\n ".join(['`%s %s' % (x[0], convNumpy2Hive(x[1])) | |
for x in colMetadata]) | |
sel_join_cols = ", \n".join(['gt.%s' % x[0] for x in colMetadata]) | |
sel_cols = ", \n".join(['%s' % x[0] for x in colMetadata]) | |
joincond = " and ".join(['st.%s = gt.%s ' % (x, x) | |
for x in args.primarykeys.split(',')]) | |
filter = " and ".join([' st.%s is null ' % | |
x for x in args.primarykeys.split(',')]) | |
ddl = f""" | |
CREATE EXTERNAL TABLE IF NOT EXISTS {db}.{table} | |
( {cols} ) | |
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' | |
STORED AS | |
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' | |
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' | |
LOCATION '{args.hdfsTableDir}' | |
TBLPROPERTIES ('avro.schema.url'='{args.hdfsSchemaPath}'); | |
INSERT OVERWRITE table {db}.{table} | |
SELECT * FROM ( | |
SELECT {sel_join_cols} | |
FROM {db}.{table} gt | |
LEFT OUTER JOIN {db}.{table}_csv st | |
ON {joincond} | |
WHERE {filter} | |
UNION ALL | |
SELECT {sel_cols} | |
FROM {db}.{table}_csv) T; | |
USE {db}; | |
ALTER TABLE {table} SET LOCATION '{args.hdfsTableDir}'; | |
""" | |
ddl = textwrap.dedent(ddl).strip() | |
debug(ddl) | |
ddl2 = """ | |
DROP TABLE staging.%s_merged; | |
CREATE external TABLE staging.%s_merged | |
(%s) | |
ROW FORMAT SERDE | |
'org.apache.hadoop.hive.serde2.avro.AvroSerDe' | |
STORED AS INPUTFORMAT | |
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' | |
OUTPUTFORMAT | |
'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' | |
LOCATION '%s' | |
TBLPROPERTIES ( | |
'avro.schema.url'='%s'); | |
insert overwrite table staging.%s_merged | |
SELECT * FROM ( | |
SELECT %s | |
FROM %s.%s gt | |
LEFT OUTER JOIN staging.%s_csv st | |
ON %s | |
WHERE %s | |
UNION ALL | |
SELECT %s | |
FROM staging.%s_csv) T; | |
USE %s; | |
ALTER TABLE %s SET LOCATION '%s'; | |
DROP TABLE staging.%s_merged; | |
""" % (args.table, args.table, cols, args.hdfsTableDir, args.hdfsSchemaPath, | |
args.table, sel_join_cols, args.dbname, args.table, args.table, joincond, filter, sel_cols, args.table, | |
args.dbname, args.table, args.hdfsTableDir, args.table) | |
debug(ddl2) | |
return ddl | |
def streamStdOut2Hdfs(data, hdfsPath): | |
subprocess.call(['hdfs', 'dfs', '-rm', hdfsPath], shell=False) | |
p = subprocess.Popen( | |
['hdfs', 'dfs', '-put', '-', hdfsPath], stdin=subprocess.PIPE) | |
p.communicate(input=data) | |
if (p.returncode != 0): | |
error(f"Failed streaming to HDFS: {hdfsPath}") | |
exit(1) | |
def uploadHdfs(localPath, hdfsPath): | |
subprocess.call(['hdfs', 'dfs', '-rm', hdfsPath], shell=False) | |
p = subprocess.Popen(['hdfs', 'dfs', '-put', localPath, | |
hdfsPath], stdin=subprocess.PIPE) | |
p.wait(timeout=600) # Wait for 10min a | |
if (p.returncode != 0): | |
error(f"Failed uploading file to HDFS: {localPath} --> {hdfsPath}") | |
exit(1) | |
############################################ | |
# Parse the CMD Args | |
parser = argparse.ArgumentParser( | |
fromfile_prefix_chars='@', description='Generate an Avro schema and Hive DDLs from a CSV') | |
parser.add_argument('--filename', help='Path to CSV file', required=True) | |
parser.add_argument( | |
'--delimiter', help='delimiter used in the CSV file', default=',') | |
parser.add_argument( | |
'--table', help='Base name of Hive table', default='sample') | |
parser.add_argument( | |
'--db', help='Database where Hive tables will be located', default='default') | |
parser.add_argument( | |
'--primarykeys', help='Comma-separated list of columns of the primary key', default='id') | |
parser.add_argument( | |
'--csvDDL', help='Path to output with DDL for external CSV table', default='csv_tbl.hql') | |
parser.add_argument( | |
'--avroDDL', help='Path to output with DDL for Avro table', default='avro_tbl.hql') | |
parser.add_argument('--hdfsSchemaPath', help='Path in HDFS to output avro schema file', | |
default='~/hive/staging/schema.avsc') | |
parser.add_argument( | |
'--hdfsCSVDir', help='Location of CSV file in HDFS for loading to Hive', default='~/hive/staging/sample') | |
parser.add_argument( | |
'--hdfsTableDir', help='Location of real table file in HDFS', default='~/hive/dw/sample') | |
args = parser.parse_args() | |
# Data samples | |
args = parser.parse_args([ | |
'--filename', 'sample_data.csv', | |
'--delimiter', ',', | |
'--db', '', | |
'--table', 'sample_data', | |
'--primarykeys', 'ItemID', | |
'--hdfsCSVDir', 'hdfs://default/user/user1/samples/csv/', | |
'--hdfsTableDir', 'hdfs://default/user/user1/samples/hive']) | |
# Read the data | |
# Using the standard csv parser (no type inference) | |
# f = open(args.filename,'r') | |
# data = f.readlines() | |
# debug(data[0:5]) | |
# csv.field_size_limit(sys.maxsize) | |
# reader = csv.DictReader(data) | |
# debug(reader.fieldnames) | |
# colNames = [x.replace('/','_') for x in reader.fieldnames] | |
# colMetadata = [(x, 'string') for x in colNames] | |
# Using pandas (basic type inference) | |
df = pandas.read_csv(args.filename, sep=args.delimiter, nrows=1000) | |
colNames = list(df.columns) | |
colTypes = [x.name for x in df.dtypes] | |
colMetadata = list(zip(colNames, colTypes)) | |
debug(colMetadata) | |
# Generate an Avro schema | |
#schema = genAvroSchema(args.table, colMetadata) | |
#streamStdOut2Hdfs(schema, args.hdfsSchemaPath) | |
# Generate the DDL for the CSV table | |
ddl = genCsvDDL(args.db, args.table, colMetadata, args.delimiter) | |
# Generate the DDL for the final table in avro format | |
#genCsv2AvroMergeDDL(args.db, args.table, colMetadata) | |
print(textwrap.dedent(ddl).strip()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
numpy==1.14.3 | |
pandas==0.22.0 | |
python-dateutil==2.7.2 | |
pytz==2018.4 | |
six==1.11.0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment