Last active
December 20, 2018 00:51
-
-
Save elexisvenator/a26035ee2227fc5915608089cbce8b0e to your computer and use it in GitHub Desktop.
Basic parts for generating load models from a csv list of tables/columns
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class DbtBaseTransformGenerator : IGenerator | |
{ | |
private const string SourceSchema = "load"; | |
public string Generate(ConfigurationModel model) | |
{ | |
var sb = new StringBuilder(); | |
AppendGeneratedRow(sb, model, -1, "NULL"); | |
sb.AppendLine("UNION ALL"); | |
AppendGeneratedRow(sb, model, -2, "UNKNOWN"); | |
sb.AppendLine("UNION ALL"); | |
sb.Append("SELECT "); | |
sb.Append(model.Columns.Select(c => c.Destination.Name).Aggregate((str, col) => str + ", " + col)); | |
sb.AppendLine(", data_pipeline_change_version, data_pipeline_is_deleted, data_pipeline_timestamp"); | |
sb.AppendFormat("FROM {0}.{1}", SourceSchema, model.LoadTable); | |
return sb.ToString(); | |
} | |
private static void AppendGeneratedRow(StringBuilder sb, ConfigurationModel model, int rowId, string rowName) | |
{ | |
sb.Append("SELECT "); | |
foreach (var column in model.Columns) | |
{ | |
if (model.Columns.First() != column) | |
{ | |
sb.Append(", "); | |
} | |
if (column.Destination.PrimaryKey) | |
{ | |
sb.AppendFormat("{0}::INT AS {1}", rowId, column.Destination.Name); | |
continue; | |
} | |
if (column.Destination.ForeignKey) | |
{ | |
sb.AppendFormat("{0}::INT AS {1}", rowId, column.Destination.Name); | |
continue; | |
} | |
if (column.Destination.Nullable) | |
{ | |
switch (column.Destination.Type) | |
{ | |
case DataType.@string: | |
sb.AppendFormat("null::CITEXT AS {0}", column.Destination.Name); | |
break; | |
case DataType.@int: | |
sb.AppendFormat("null::INT AS {0}", column.Destination.Name); | |
break; | |
case DataType.datetime: | |
sb.AppendFormat("null::TIMESTAMP AS {0}", column.Destination.Name); | |
break; | |
case DataType.numeric: | |
sb.AppendFormat("null::NUMERIC AS {0}", column.Destination.Name); | |
break; | |
case DataType.boolean: | |
sb.AppendFormat("null::BOOLEAN AS {0}", column.Destination.Name); | |
break; | |
case DataType.Unknown: | |
sb.AppendFormat("null::UNKNOWN AS {0}", column.Destination.Name); | |
break; | |
default: | |
throw new ArgumentOutOfRangeException(nameof(column.Destination.Type), column.Destination.Type, "Unsupported data type found."); | |
} | |
continue; | |
} | |
switch (column.Destination.Type) | |
{ | |
case DataType.@string: | |
sb.AppendFormat("'{0}'::CITEXT AS {1}", rowName, column.Destination.Name); | |
break; | |
case DataType.@int: | |
sb.AppendFormat("0::INT AS {0}", column.Destination.Name); | |
break; | |
case DataType.datetime: | |
sb.AppendFormat("{{{{ default_timestamp() }}}} AS {0}", column.Destination.Name); | |
break; | |
case DataType.numeric: | |
sb.AppendFormat("0::NUMERIC AS {0}", column.Destination.Name); | |
break; | |
case DataType.boolean: | |
sb.AppendFormat("0::BOOLEAN AS {0}", column.Destination.Name); | |
break; | |
case DataType.Unknown: | |
sb.AppendFormat("'{0}'::UNKNOWN AS {0}", column.Destination.Name); | |
break; | |
default: | |
throw new ArgumentOutOfRangeException(nameof(column.Destination.Type), column.Destination.Type, "Unsupported data type found."); | |
} | |
} | |
sb.AppendLine( | |
@", null::BIGINT AS data_pipeline_change_version, 0::BOOLEAN AS data_pipeline_is_deleted, {{ default_timestamp() }} AS data_pipeline_timestamp"); | |
} | |
public string GetFileName(ConfigurationModel model) | |
{ | |
return $"base_{model.SourceTable.Name.ToLowerInvariant()}.sql"; | |
} | |
public string GetFolderName() | |
{ | |
return "transform\\models\\base"; | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The basic format of the csv is
Our source data naming conventions meant you could determine if a column was a primary key, foreign key and the columns datatype just from the column name. If your tables are named differently then add more columns to provide this information. (I am not a fan of this naming convention myself, it's just what we have.
The raw csv is converted to a list and grouped by table. Each table is fed into this generator class to create the sql model. (we have other generators for other parts of our pipeline). The generator then
A macro is used to generate default timestamps, the macro looks as follows:
The other thing you may notice is
CITEXT
, this is a custom data type which is a case-insensitive version of Postgres'TEXT
type. This is because the source system is Mssql and reporting is used to case-insensitive queries.