Created
February 2, 2018 22:50
-
-
Save abelsonlive/112f03e6ea54fa64abe3e47f50affdae to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
profile_dir: '~/.dbt' # the location of dbt configurations | |
target: prod # the dbt profile to use for connection | |
source_schema: fivetran_app # the tables' source schema (where fivetran imports your data to) | |
materialized: view # how to materialize these views | |
target_schema: app # the schema to materialize the base models under (where users will access the data from) | |
table_prefix: app # a prefix to prepend to each base model/view | |
directory: models/base/ # the directory under which to save the base models | |
empty_as_null: true # whether or not to apply EMPTYASNULL logic to all text fields | |
incl_fivetran_deleted: false # whether or not to include records marked as "deleted" by fivetran | |
excl_fivetran_synced: true # whether or not to include the time fivetran synced each record | |
tables: # the tables to generate base models for | |
- users | |
- orders |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import argparse | |
import yaml | |
import copy | |
import psycopg2 | |
from jinja2 import Template | |
TEMPLATE = """ | |
{{ '{{' }} | |
config( | |
schema='{{ target_schema }}', | |
materialized='{{ materialized }}', | |
bind=False | |
) | |
{{ '}}' }} | |
SELECT | |
{{ select_statements }} | |
FROM | |
"{{ table['schema'] }}"."{{ table['table'] }}" | |
{% if not incl_fivetran_deleted %} | |
WHERE NOT "_fivetran_deleted" | |
{% endif %} | |
""" | |
class DbtRedshiftGen(object): | |
def __init__(self, **kwargs): | |
self.profiles = self._get_profile(kwargs.pop('profile_dir')) | |
self.target = kwargs.pop('target') | |
self.source_schema = kwargs.pop('source_schema') | |
self.tables = kwargs.pop('tables') | |
self.all_tables = kwargs.pop('all_tables', False) | |
self.table_prefix = kwargs.pop('table_prefix', '') | |
self.directory = kwargs.pop('directory', 'models/base') | |
self.incl_fivetran_deleted = kwargs.get('incl_fivetran_deleted') | |
self.excl_fivetran_synced = kwargs.get('excl_fivetran_synced') | |
self.empty_as_null = kwargs.get('empty_as_null') | |
self.template_args = kwargs | |
self.template = Template(TEMPLATE) | |
def _get_profile(self, profile_dir): | |
""" | |
Load dbt profile. | |
""" | |
return yaml.safe_load(open(os.path.join(os.path.expanduser(profile_dir), 'profiles.yml'))) | |
@property | |
def credentials(self): | |
""" | |
Load dbt credentials | |
""" | |
return self.profiles['default']['outputs'][self.target] | |
def get_connection(self): | |
return psycopg2.connect( | |
dbname=self.credentials.get('dbname'), | |
user=self.credentials.get('user'), | |
host=self.credentials.get('host'), | |
password=self.credentials.get('pass'), | |
port=self.credentials.get('port'), | |
connect_timeout=10) | |
@property | |
def source_schema_sql(self): | |
# determine query filters | |
if self.all_tables: | |
table_filter = '1=1' | |
else: | |
table_filter = "table_name IN ('{0}')".format("','".join(self.tables)) | |
if self.incl_fivetran_deleted: | |
fivetran_deleted_filter = "1=1" | |
else: | |
fivetran_deleted_filter = "column_name != '_fivetran_deleted'" | |
if not self.excl_fivetran_synced: | |
fivetran_synced_filter = "1=1" | |
else: | |
fivetran_synced_filter = "column_name != '_fivetran_synced'" | |
return \ | |
"""SELECT | |
table_schema as "schema", | |
table_name as "table", | |
column_name as "name", | |
data_type as "type", | |
CASE | |
WHEN data_type IN ('character varying', 'text') | |
THEN TRUE | |
ELSE FALSE | |
END as "is_text" | |
FROM | |
information_schema.columns | |
WHERE table_schema = '{source_schema}' | |
AND {table_filter} | |
AND {fivetran_deleted_filter} | |
AND {fivetran_synced_filter} | |
ORDER BY table_name, ordinal_position ASC; | |
""".format(source_schema=self.source_schema, | |
table_filter=table_filter, | |
fivetran_synced_filter=fivetran_synced_filter, | |
fivetran_deleted_filter=fivetran_deleted_filter) | |
def get_tables(self): | |
""" | |
Run query and fetch tables. | |
""" | |
cursor = self.get_connection().cursor() | |
cursor.execute(self.source_schema_sql) | |
tables = {} | |
for schema, table, name, type, is_text in cursor.fetchall(): | |
col = { 'name': name, 'type': type, 'is_text': is_text } | |
if table not in tables: | |
tables[table] = { | |
'table': table, | |
'schema': schema, | |
'columns':[col] | |
} | |
else: | |
tables[table]['columns'].append(col) | |
print(tables) | |
return tables | |
def gen_select_statements_for_table(self, table): | |
""" | |
Select columns in a table. | |
""" | |
statements = [] | |
for col in table['columns']: | |
if col['is_text'] and self.empty_as_null: | |
statement = 'CASE WHEN "{name}" = \'\' THEN NULL ELSE "{name}" END AS "{name}"'.format(**col) | |
else: | |
statement = '"{name}"'.format(**col) | |
statements.append(statement) | |
if self.incl_fivetran_deleted: | |
statements.append("_fivetran_deleted") | |
if not self.excl_fivetran_synced: | |
statements.append("_fivetran_synced") | |
return ",\n\t".join(statements) | |
def gen_model_for_table(self, table): | |
kwargs = copy.copy(self.template_args) | |
kwargs.update({ | |
'select_statements':self.gen_select_statements_for_table(table), | |
'table': table | |
}) | |
return self.template.render(**kwargs) | |
def gen_filepath_for_model(self, table): | |
""" | |
Create a filepath to write this model to. | |
""" | |
name = "{table}.sql".format(**table) | |
if self.table_prefix: | |
name = "_".join([self.table_prefix, name]) | |
return os.path.join(self.directory, name) | |
def write_model_for_table(self, table): | |
""" | |
Write a base model for a table | |
""" | |
fp = self.gen_filepath_for_model(table) | |
print("Generating model: {0}".format(fp)) | |
with open(fp, 'w') as f: | |
f.write(self.gen_model_for_table(table)) | |
def run(self): | |
""" | |
Generate the models. | |
""" | |
if not os.path.exists(self.directory): | |
os.mkdir(self.directory) | |
for table_name, table in self.get_tables().items(): | |
self.write_model_for_table(table) | |
def cli(): | |
""" | |
Command line interface. | |
""" | |
parser = argparse.ArgumentParser(prog="dbt-gen") | |
parser.add_argument("--yaml", default=None, | |
help="A yaml file of arguments to pass to this program.") | |
parser.add_argument("--profile-dir", default="~/.dbt", | |
help="The location of your dbt profile directory. Defaults to '~/.dbt'.") | |
parser.add_argument("--target", default="dev", | |
help="Which target to load for the given profile.") | |
parser.add_argument("--source-schema", | |
help="The schema to generate base models for.") | |
parser.add_argument("--materialize", nargs="+", default="ephemeral", | |
help="How to materialize the models: ephemeral|view|table") | |
parser.add_argument("--target-schema", | |
help="If materialize == view, The schema to materialize base models as views under.") | |
parser.add_argument("--tables", nargs="+", | |
help="A list of space-separated tables under --schema to create base models for.") | |
parser.add_argument("--all-tables", action="store_true", default=False, | |
help="Import all tables under --schema.") | |
parser.add_argument("--table-prefix", default=None, | |
help="A string to prefix all generated dbt model names with.") | |
parser.add_argument("--directory", default="models/base/", | |
help="The directory to store generated models under.") | |
parser.add_argument("--empty-as-null", action="store_true", default=False, | |
help="Whether or not to apply EMPTYASNULL logic to character fields.") | |
parser.add_argument("--incl-fivetran-deleted", action="store_true", default=False, | |
help="Whether or not to include records that have been marked deleted by fivetran.") | |
parser.add_argument("--excl-fivetran-synced", action="store_true", default=False, | |
help="Whether or not to include fivetran's synced column.") | |
args = parser.parse_args() | |
if args.yaml: | |
kwargs = yaml.safe_load(open(args.yaml)) | |
else: | |
kwargs = vars(args) | |
DbtRedshiftGen(**kwargs).run() | |
if __name__ == '__main__': | |
cli() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Usage: