Skip to content

Instantly share code, notes, and snippets.

@morsapaes
Last active May 23, 2022 19:58
Show Gist options
  • Save morsapaes/03cb046ba4a984da644263c7e22cf094 to your computer and use it in GitHub Desktop.
Save morsapaes/03cb046ba4a984da644263c7e22cf094 to your computer and use it in GitHub Desktop.

Evolving the dbt-materialize adapter

Tracking issue: #10600

Some things to consider:

Although sources and sinks are inverse concepts, sources have a one-to-many relationship with downstream relations, while sinks have a one-to-one relationship with upstream relations. Relations have a zero-to-many relationship with downstream sinks, though, which gets in the way of implementing them as inverse dbt concepts (e.g. using pre- and post-hooks).

Something else to consider is that source and sink configuration might have different ownership than model development in the wild (e.g. data engineers vs. analytics engineers), so it'd be preferable not to tightly couple them.

Sources

Option 1: dbt-external-tables

From a developer perspective, this would require us to implement Materialize-specific versions of the following macros from dbt-external-tables:

  • create_external_table.sql
  • get_external_build_plan.sql
  • dropif.sql

Our implementation wouldn't need to live in the dbt-external-tables package, we could simply override the macros within dbt-materialize (for reference, see Firebolt's implementation).

and then:

User workflow

From a user perspective, defining sources as external tables would have the following workflow:

1. Setting up dbt-external-tables

Add the dbt-external-tables package to packages.yml:

packages:
  - package: dbt-labs/dbt_external_tables
    version: <version>

Modify dbt_project.yml to include:

dispatch:
  - macro_namespace: dbt_external_tables
    search_order: ['dbt', 'dbt_external_tables']

Install the dbt-external-tables package dependency:

dbt deps

2. Defining a source

Define a table as external in dbt_project.yml:

sources:
  - name: kafka_source
    loader: kafka

    tables:
      - name: sometable
        external:
          host: 'kafka:9092'
          topic: 'sometopic'
          ...

Run stage_external_sources, the entrypoint macro of the dbt-external-tables package:

dbt run-operation stage_external_sources

The biggest downside is that this adds a bunch of overhead to what is the entrypoint of users to Materialize. It's not a straightforward workflow.

Option 2: pre-hook on models

From a developer perspective, this would require:

  • Implement a create_source macro

This option sounds borked from the get-go, since it would tightly couple sources with models (when the relationship between them might not be one-to-one).

User workflow

1. Defining a pre-hook in a(n entry?) model

{{
  config({
    "materialized":"materializedview",
    "pre-hook": [
      "{{ materialize.create_source(...
                                  host='kafka:9092',
                                  topic='sink_topic',
                                  ...) }}"
    ]
  })
}}

Sinks

Option 1: post-hook on models

From a developer perspective, this would require:

  • Implement a create_sink macro (similar to the unload_table macro in dbt-redshift)
  • Consider (automatically) creating an exposure for lineage (see Option 2 👇)

and then:

  • Deprecate the custom sink materialization (codebase+documentation)
  • Adapt the dbt integration guide
  • Consider adapting the MZ Hack Day demo
  • Add a new section to the Materialize configurations page in the dbt documentation

User workflow

From a user perspective, defining sinks as post-hooks would have the following workflow:

1. Defining a post-hook in the model to sink

{{
  config({
    "materialized":"materializedview",
    "post-hook": [
      "{{ materialize.create_sink(...
                                  this.materializedview,
                                  host='kafka:9092',
                                  topic='sink_topic',
                                  ...) }}"
    ]
  })
}}

Option 2: custom metadata on exposures

From a developer perspective, it's a bit unclear how this could be implemented since exposures seem like...a purely metadata/documentation-based feature. According to Jeremy from dbt Labs, it might be possible to go this route using the meta configuration and some custom macros.

TBH, I'm not sure how this would work since exposures aren't compilable or executable, but maybe we can figure it out based on these two helpful threads:

It's also not possible to use a custom string as the exposure type (at least yet, see dbt #2835), so we'd have to go with one of the accepted values: dashboard, notebook, analysis, ml or application; this mainly dictates how exposures are surfaced in the dbt documentation, and having sinks listed under any of these options isn't ideal.

One of the benefits of using exposures would be having sinks as end nodes in the DAG. In contrast, with post-hooks we'd lose track of lineage information (AFAIU). Maybe there's a way to combine Option 1 and Option 2 (i.e. define a sink as a post-hook and automatically create an exposure for lineage), so we get the best of both worlds?

User workflow

1. Defining an exposure

Define an exposure with a custom meta configuration in dbt_project.yml:

exposures:
  - name: kafka_sink
    type: <exposure-type>
    description: >
      Some description.
    
    depends_on:
      - ref('mv_churn_prediction')
      
    meta: ...
      
    owner:
      email: [email protected]

Handling credentials

For all cases, credentials should be handled as (secret) environment variables that are inherited from the development environment users are running dbt against.

@ahelium
Copy link

ahelium commented May 18, 2022

PREVIEW would be super neat! One way to accomplish previewing a non materialized source is to use a TAIL command within a transaction to limit the number of rows returned. And now that TAIL can have internal queries, you can run any dev materialized view statement to peep your data:

materialize=> BEGIN;
BEGIN
materialize=> DECLARE c CURSOR for TAIL (SELECT convert_from(data, 'utf8') AS data FROM rp_flight_information);
DECLARE CURSOR
materialize=> FETCH 2 c;
mz_timestamp  | mz_diff |                                                                                                                                                                            data
---------------+---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 1652882799999 |       1 | {"icao24": "345682", "callsign": "", "origin_country": "Spain", "time_position": null, "last_contact": 1652829778, "longitude": null, "latitude": null, "baro_altitude": null, "on_ground": false, "velocity": 260.21, "true_track": 100.71, "vertical_rate": 0, "sensors": null, "geo_altitude": null, "squawk": null, "spi": false, "position_source": 0}
 1652882799999 |       1 | {"icao24": "38a1db", "callsign": "", "origin_country": "France", "time_position": null, "last_contact": 1652821139, "longitude": null, "latitude": null, "baro_altitude": null, "on_ground": true, "velocity": 0, "true_track": 123.75, "vertical_rate": null, "sensors": null, "geo_altitude": null, "squawk": "7776", "spi": false, "position_source": 0}
(2 rows)

I wonder if we could codify that process somehow for users, to make things simpler.

@morsapaes
Copy link
Author

Thanks for writing that down, @jwills! Your comment + having a chat with @dataders (+ some 🚿 time) made me look at things from more of a workflow separation perspective. The TL;DR for the refactor (as we follow the progress of things like external nodes and the short-term plans to revamp the programmatic interfaces of dbt itself) sounds like:

Both sources and sinks should be completely decoupled from SQL models, which also means the creation of these objects should not happen at any point during dbt run, but as a separate staging step (much like what happens in dbt-external-tables). These should be YAML-ified (since they're pure DDL statements) and created using something like dbt stage --sources/dbt stage --sinks.

Does this sound reasonable? This separation should also make it easier to integrate with CI/CD pipelines that can trigger the creation of the right objects at the right time.


+1 that something like PREVIEW would be useful, but I'm having trouble understanding how that could help in context of the adapter. It'd be cool to wrap what @ahelium pointed out in a dry run-like command to preview the results of a (transformation) model, though!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment