Skip to content

Instantly share code, notes, and snippets.

@xiongnemo
Created March 22, 2021 05:46
Show Gist options
  • Save xiongnemo/d786c79be20e793034b697a0c51f5395 to your computer and use it in GitHub Desktop.
Save xiongnemo/d786c79be20e793034b697a0c51f5395 to your computer and use it in GitHub Desktop.
Generate Hive QL: select distinct unioned columns from given Elasticsearch index
import json
import sys
SLIENT = "set silent=on;\n"
DROP_TABLE_TEMPLATE = "DROP TABLE IF EXISTS {};\n"
CREATE_TABLE_TEMPLATE = """
create EXTERNAL TABLE {} (
{}
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = '{}', 'es.nodes'='{}', 'es.port'='{}', 'es.nodes.wan.only'='true');
"""
TABLE_COLUMN_TEMPLATE = "{} string,\n"
TABLE_COLUMN_END_TEMPLATE = "{} string"
SELECT_TEMPLATE = "SELECT {} FROM {}\n"
UNION_ALL_TAMPLATE = "UNION ALL\n"
WITH_CLAUSE_TEMPLATE = """
WITH TEMP_TABLE AS
(
{}
)
SELECT DISTINCT({}) FROM TEMP_TABLE;"""
json_str = ""
# input str:
# {
# "tables_to_inspect": [
# {
# "elastic_search_index_name": "***",
# "hive_table_name": "***",
# "columns": [
# "***",
# "***"
# ],
# "es.nodes": "****",
# "es.port": 9200
# },
# ...
# ]
# }
for line in sys.stdin:
json_str += line
config_dict = json.loads(json_str)
table_list = config_dict["tables_to_inspect"]
print(table_list)
hql_content = ""
# PROCESS BEGAN
# SET SLIENT
hql_content += SLIENT
# CONSTRUCT DROP TABLE CLAUSE
for table_to_inspect in table_list:
hive_table_name = table_to_inspect["hive_table_name"]
real_drop_table_clause = DROP_TABLE_TEMPLATE.format(hive_table_name)
hql_content += real_drop_table_clause
# CONSTRUCT CREATE TABLE CLAUSE
for table_to_inspect in table_list:
elastic_search_index_name = table_to_inspect["elastic_search_index_name"]
hive_table_name = table_to_inspect["hive_table_name"]
columns = table_to_inspect["columns"]
es_nodes = table_to_inspect["es.nodes"]
es_port = table_to_inspect["es.port"]
# CONSTRUCT COLUMNS
columns_string = ""
column_count = len(columns)
for i in range(column_count):
if i == column_count - 1:
columns_string += TABLE_COLUMN_END_TEMPLATE.format(columns[i])
break
columns_string += TABLE_COLUMN_TEMPLATE.format(columns[i])
real_create_table_clause = CREATE_TABLE_TEMPLATE.format(
hive_table_name, columns_string, elastic_search_index_name, es_nodes, es_port)
hql_content += real_create_table_clause
# CONSTRUCT QUERY
column_list_to_query = []
for table_to_inspect in table_list:
hive_table_name = table_to_inspect["hive_table_name"]
columns = table_to_inspect["columns"]
for column in columns:
column_list_to_query.append([column, hive_table_name])
column_count = len(column_list_to_query)
sub_query_str = ""
for i in range(column_count):
column_name = column_list_to_query[i][0]
hive_table_name = column_list_to_query[i][1]
sub_query_str += SELECT_TEMPLATE.format(column_name, hive_table_name)
if i != column_count - 1:
sub_query_str += UNION_ALL_TAMPLATE
unioned_column_name = column_list_to_query[0][0]
real_with_clause = WITH_CLAUSE_TEMPLATE.format(
sub_query_str, unioned_column_name)
hql_content += real_with_clause
print(hql_content)
with open("out.hql", "w", encoding="utf-8") as hql_out:
hql_out.write(hql_content)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment