Skip to content

Instantly share code, notes, and snippets.

@NodeJSmith
Last active November 5, 2024 14:17
Show Gist options
  • Save NodeJSmith/d2fc2e9a289360180ebaa9d7e452e285 to your computer and use it in GitHub Desktop.
Save NodeJSmith/d2fc2e9a289360180ebaa9d7e452e285 to your computer and use it in GitHub Desktop.
Running dbt against Databricks job cluster

Table of Contents

So you wanna run dbt on a Databricks job cluster

If you're reading this you are probably interested in running dbt against a Databricks job cluster. This is not natively supported by Databricks, but it is possible to do so by using the dbt-spark adapter. Using this adapter is actively recommened against by Databricks, but so far I haven't had any* issues with it.

The steps below are what I have used to get this running for my own worfklows and projects. It is possible that there are other methods or steps that would be simpler, more efficient, etc. Please comment with any suggestions or improvements.

* The dbt seed command doesn't seem to work properly, but that is a very minor inconvenience for me and possibly fixable.

Overview

The general steps for this process are as follows:

  1. Update your dbt project(s) to be compatible with dbt-spark
  2. Use a Databricks workflow Python task to run dbt on a Databricks job cluster
    1. Install dbt and dbt-spark on the Databricks cluster
    2. Clone your dbt project repository to the Databricks cluster
    3. Replace the profiles.yml file with a dbt-spark compatible version
    4. In your Python task using dbtRunner to programmatically invoke dbt

Existing Literature

There is very little about how to accomplish this already written, but what has been written is mainly (all?) by Leo Schick. These were quite invaluable to me for getting this working, but I did make some changes to avoid notebooks and to make the process more streamlined with my company's existing Databricks helper package.

Medium - How to run dbt on a Databricks job cluster

GitHub - databricks-dbt-helper

Github - dbt-databricks issue #575

How this differs from the standard process

Databricks SQL Warehouse

Databricks does natively support running dbt in Databricks workflows, but with the following caveats:

  • You must use a Databricks SQL Warehouse
  • You must start a Databricks job cluster to handle running the dbt commands

This is not ideal for a few reasons:

  • SQL Warehouses are the most expensive Databricks offering
  • Starting a new cluster just to run dbt commands is not efficient, it requires paying twice for the same process
  • You cannot use dbt retry to re-run failed models, as the dbt manifest is lost between tasks

Databricks All Purpose Cluster

You can also run dbt on an All Purpose cluster, but this is also not ideal, mainly becaues All Purpose clusters are more expensive than job clusters. Outside of that issue, there is no real problem with this method. (Correct me if I'm wrong, I never actually bothered doing this for scheduled workflows).

Databricks Job Cluster

This does not work for a few reasons:

  • The dbt-databricks adapter does not have the same session-based authentication that dbt-spark has, where you can simply connect to a running Spark session and run dbt commands
  • To use the dbt-databricks adapter with a cluster requires that the cluster have an endpoint that can be provided to dbt to authenticate and send commands to the cluster. This is available for SQL Warehouse and All Purpose clusters, but not for job clusters, for security purposes (as far as I can tell).

Steps

DBT project configuration

  • Update your dbt project to be compatible with dbt-spark - this may not be that much work depending on your project. The below is what I had to do, but my project is not very complex so it is quite possible that there are more caveats than I'm aware of. If you run into any issues (and ideally find solutions) please comment and I will update this document.

  • Aside from the changes to profiles.yml, none of the below changes prevent you from using the dbt-databricks adapter (and it's possible that the profiles.yml is fully solvable)

  • Remove any references to {{ target.catalog }} in your models or configuration files

    • I had these in my sources.yml file (e.g. database: {{ target.catalog }}) - these were actually not required so removing them was simple
    • If you need to refer to the given catalog for any kind of logic you can use something like target.name or target.schema instead
  • For things like incremental models to work properly you will need to add the format parameter to your models with the value delta

    • I found this easiest to do in my project's dbt_project.yml file, by adding the following:
    models:
      +file_format: delta
  • Modify your profiles.yml file to be compatible with dbt-spark

    • You can do this in your repo as different output targets or you can do this when running the job by overwriting the existing profiles.yml file - this is what I do, as I was not able to get dbt-databricks to work properly if I had dbt-spark profiles in my file
    • What you will need, however you set it up, is the below (at minimum):
    <project_name>:
        outputs:
            default:
                type: spark # this tells dbt to use dbt-spark
                method: session # this tells dbt-spark to use the current Spark session
                host: NA # not sure if this is necessary
                schema: <your schema> # not entirely sure if this is necessary
        target: default
    • Note: With just this setup you will have issues with your models attempting to use the wrong catalog, as they will use whatever is defined as the default. We will handle this in the Python task prior to running dbt
    • Note: I already had the dbt-databricks adapter skipping the optimize commands that are normally run after creating/updating models that are clustered or Z-ordered. I do not think the dbt-spark adapter runs these. You could likely add these as a post-hook if desired, but with Databricks now offering predictive optimization it may not be necessary.

Databricks workflow configuration

  • We will use Python tasks to run dbt, not dbt tasks
  • The Python task should handle the following:
    • Clone your dbt project repository to the Databricks cluster
    • Replace the profiles.yml file with a dbt-spark compatible version
      • If your profiles.yml already has a spark profile, you can simply provide the profile name to the dbtRunner command
    • Run dbt using dbtRunner to programmatically invoke dbt
  • The Python task should also define the libraries it requires, which are dbt and dbt-spark
    • I have these added to the Task's libraries section in the Databricks workflow

Databricks workflow steps

The below is a simplified and not fully functional example of how my Python task is set up to run dbt on a Databricks job cluster. This is not a complete example, but should give you an idea of how to get started. It was literally cobbled together from existing code in multiple files (often with intrinsic parts left out), so don't expect to just copy/paste it and have it working.

Obviously there are things that can be done differently here. For example, if you already have a repo setup and synced to Databricks for your dbt project you can probably skip cloning the repo and just access the existing one. If your profiles.yml already has a spark profile you can probably skip replacing the file.

Note: The reason the below has selector_args and args as separate arguments is because dbt retry does not accept selector arguments, since it simply retries the failed models from the previous run. Having the arguments in separate lists allows me to have an automatic retry mechanism that skips the selector args when invoking dbt retry.

    class PythonDbtTask:
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)

            # this fixes the issue with the dbt-spark adapter using the default catalog
            self.spark.catalog.setCurrentCatalog(self.args.catalog)

        def launch(self):
            self.setup_dbt_repo()
            self.run_dbt("run")

        def setup_dbt_repo(self):
            """Clones the dbt repo to the current directory, changes current directory to the new folder, and writes
            the profiles.yml file for executing with the dbt-spark adapter.
            """
            repo_url = get_dbt_repo_url()
            self.clone_repo(repo_url, self.args.git_ref)
            self.write_dbt_profiles()

        def write_dbt_profiles(self):
            data = {}
            data["config"] = {"use_colors": False, "send_anonymous_usage_stats": False}
            data["dbx_dbt_legacy"] = {
                "outputs": {
                    "default": {
                        "threads": self.args.threads,
                        "type": "spark",
                        "method": "session",
                        "schema": self.args.catalog,
                        "host": "NA",
                    }
                },
                "target": "default",
            }

            profiles_path = Path.cwd().joinpath("profiles.yml")

            with open(profiles_path, "w") as f:
                f.write(yaml.dump(data))

        def run_dbt(self, command:str, selector_args:list[str]|None=None, args:list[str]|None=None):
            selector_args = selector_args or []
            args = args or []

            all_args = ["dbt", command] + selector_args + args

            res = DbtRunner().invoke(all_args)

            if not res.success:
                self.logger.error(f"Exception: {res.exception}")
                raise DbtRunException(f"dbt {' '.join(all_args)} failed")

        def clone_repo(self, git_url: str, branch: str, folder_name: str | None = None, change_dir: bool = True):
            """Clones a git repo.

            Args:
                git_url (str): The git url.
                branch (str): The branch to clone.
                folder_name (str | None, optional): The folder name to clone into. Defaults to None, in which\
                case the folder name is the stem of the git url.
                change_dir (bool, optional): Whether to change the current directory to the cloned repo. Defaults to True.
            """

            folder_name = folder_name or Path(git_url).stem

            clone_path = Path.cwd().joinpath(folder_name)

            if clone_path.exists():
                shutil.rmtree(clone_path)

            Repo.clone_from(git_url, clone_path, branch=branch)

            if change_dir:
                os.chdir(clone_path)

Tips and Tricks

  • You can validate that the dbt command provided is valid by importing cli from dbt.cli.main and checking the available commands.
from dbt.cli.main import cli  # type: ignore

return list(cli.commands)
  • It may be desirable to add a check that the libraries exist prior to running the dbt command. I do this in my __init__ method by attempting to import the required libraries and raising an exception if they are not found.

  • While I don't have this set up yet, it should be possible to run locally with the dbt-databricks adapter as long as you have a profile in profiles.yml or export the necessary environment variables. This would allow you to run dbt locally against a SQL Warehouse or All Purpose cluster as desired. This is the next item on my list to accomplish.

  • Don't forget to set threads in your profiles.yml file if you want to run dbt in parallel. Without this it will default to running in serial.

Still run against SQL Warehouse as desired

As noted above, the profiles.yml file in my repo only has dbt-databricks profiles. So when I'm developing locally I can still run dbt against a SQL Warehouse (or my personal all purpose cluster by exporting the endpoint to DATABRICKS_HTTP_PATH) without issue.

If you have dbt-spark profile(s) in your profiles.yml along with dbt-databricks, you should be able to run dbt against a SQL Warehouse by providing a dbt-databricks profile name when running your dbt commands.

dbt retry for failed jobs

dbt retry will work properly here as long as your Python code is set up properly (e.g. you're not raising an exception to the point where the task fails and trying to run dbt retry in a new task). The results from the dbtRunner.invoke command will contain the status of the run (it will not raise an exception on it's own) so you can check this and decide whether to retry the failed models.

dbt seed

dbt seed was the one thing I did not get working in my setup. When trying to run this command I got syntax errors - it seems that the whole seed file contents were provided in the sql command and the command got truncated. I haven't yet looked into a solution for this but would like to get it working, so if you have any suggestions please let me know.

Conclusion

This is a bit of work but I will say that it is worth it. The main goal of this was to save money and our costs have gone down significantly since moving to this setup - while actually completing the dbt runs faster. Our SQL Warehouse consumption from dbt has been reduced to 0 and our Jobs compute has not increased by any significant amount, likely because the models run so quickly on the job cluster (we were using 2X-Small SQL Warehouses previously due to cost).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment