Created
March 22, 2021 05:46
-
-
Save xiongnemo/d786c79be20e793034b697a0c51f5395 to your computer and use it in GitHub Desktop.
Generate Hive QL: select distinct unioned columns from given Elasticsearch index
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import sys | |
SLIENT = "set silent=on;\n" | |
DROP_TABLE_TEMPLATE = "DROP TABLE IF EXISTS {};\n" | |
CREATE_TABLE_TEMPLATE = """ | |
create EXTERNAL TABLE {} ( | |
{} | |
) | |
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' | |
TBLPROPERTIES('es.resource' = '{}', 'es.nodes'='{}', 'es.port'='{}', 'es.nodes.wan.only'='true'); | |
""" | |
TABLE_COLUMN_TEMPLATE = "{} string,\n" | |
TABLE_COLUMN_END_TEMPLATE = "{} string" | |
SELECT_TEMPLATE = "SELECT {} FROM {}\n" | |
UNION_ALL_TAMPLATE = "UNION ALL\n" | |
WITH_CLAUSE_TEMPLATE = """ | |
WITH TEMP_TABLE AS | |
( | |
{} | |
) | |
SELECT DISTINCT({}) FROM TEMP_TABLE;""" | |
json_str = "" | |
# input str: | |
# { | |
# "tables_to_inspect": [ | |
# { | |
# "elastic_search_index_name": "***", | |
# "hive_table_name": "***", | |
# "columns": [ | |
# "***", | |
# "***" | |
# ], | |
# "es.nodes": "****", | |
# "es.port": 9200 | |
# }, | |
# ... | |
# ] | |
# } | |
for line in sys.stdin: | |
json_str += line | |
config_dict = json.loads(json_str) | |
table_list = config_dict["tables_to_inspect"] | |
print(table_list) | |
hql_content = "" | |
# PROCESS BEGAN | |
# SET SLIENT | |
hql_content += SLIENT | |
# CONSTRUCT DROP TABLE CLAUSE | |
for table_to_inspect in table_list: | |
hive_table_name = table_to_inspect["hive_table_name"] | |
real_drop_table_clause = DROP_TABLE_TEMPLATE.format(hive_table_name) | |
hql_content += real_drop_table_clause | |
# CONSTRUCT CREATE TABLE CLAUSE | |
for table_to_inspect in table_list: | |
elastic_search_index_name = table_to_inspect["elastic_search_index_name"] | |
hive_table_name = table_to_inspect["hive_table_name"] | |
columns = table_to_inspect["columns"] | |
es_nodes = table_to_inspect["es.nodes"] | |
es_port = table_to_inspect["es.port"] | |
# CONSTRUCT COLUMNS | |
columns_string = "" | |
column_count = len(columns) | |
for i in range(column_count): | |
if i == column_count - 1: | |
columns_string += TABLE_COLUMN_END_TEMPLATE.format(columns[i]) | |
break | |
columns_string += TABLE_COLUMN_TEMPLATE.format(columns[i]) | |
real_create_table_clause = CREATE_TABLE_TEMPLATE.format( | |
hive_table_name, columns_string, elastic_search_index_name, es_nodes, es_port) | |
hql_content += real_create_table_clause | |
# CONSTRUCT QUERY | |
column_list_to_query = [] | |
for table_to_inspect in table_list: | |
hive_table_name = table_to_inspect["hive_table_name"] | |
columns = table_to_inspect["columns"] | |
for column in columns: | |
column_list_to_query.append([column, hive_table_name]) | |
column_count = len(column_list_to_query) | |
sub_query_str = "" | |
for i in range(column_count): | |
column_name = column_list_to_query[i][0] | |
hive_table_name = column_list_to_query[i][1] | |
sub_query_str += SELECT_TEMPLATE.format(column_name, hive_table_name) | |
if i != column_count - 1: | |
sub_query_str += UNION_ALL_TAMPLATE | |
unioned_column_name = column_list_to_query[0][0] | |
real_with_clause = WITH_CLAUSE_TEMPLATE.format( | |
sub_query_str, unioned_column_name) | |
hql_content += real_with_clause | |
print(hql_content) | |
with open("out.hql", "w", encoding="utf-8") as hql_out: | |
hql_out.write(hql_content) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment