- Table of Contents
- So you wanna run dbt on a Databricks job cluster
- Overview
- Existing Literature
- How this differs from the standard process
- Steps
- Tips and Tricks
If you're reading this you are probably interested in running dbt against a Databricks job cluster. This is not natively supported by Databricks, but it is possible to do so by using the dbt-spark
adapter. Using this adapter is actively recommened against by Databricks, but so far I haven't had any* issues with it.
The steps below are what I have used to get this running for my own worfklows and projects. It is possible that there are other methods or steps that would be simpler, more efficient, etc. Please comment with any suggestions or improvements.
* The dbt seed
command doesn't seem to work properly, but that is a very minor inconvenience for me and possibly fixable.
The general steps for this process are as follows:
- Update your dbt project(s) to be compatible with dbt-spark
- Use a Databricks workflow Python task to run dbt on a Databricks job cluster
- Install dbt and dbt-spark on the Databricks cluster
- Clone your dbt project repository to the Databricks cluster
- Replace the
profiles.yml
file with a dbt-spark compatible version - In your Python task using
dbtRunner
to programmatically invoke dbt
There is very little about how to accomplish this already written, but what has been written is mainly (all?) by Leo Schick. These were quite invaluable to me for getting this working, but I did make some changes to avoid notebooks and to make the process more streamlined with my company's existing Databricks helper package.
Medium - How to run dbt on a Databricks job cluster
GitHub - databricks-dbt-helper
Github - dbt-databricks issue #575
Databricks does natively support running dbt in Databricks workflows, but with the following caveats:
- You must use a Databricks SQL Warehouse
- You must start a Databricks job cluster to handle running the dbt commands
This is not ideal for a few reasons:
- SQL Warehouses are the most expensive Databricks offering
- Starting a new cluster just to run dbt commands is not efficient, it requires paying twice for the same process
- You cannot use
dbt retry
to re-run failed models, as the dbt manifest is lost between tasks
You can also run dbt on an All Purpose cluster, but this is also not ideal, mainly becaues All Purpose clusters are more expensive than job clusters. Outside of that issue, there is no real problem with this method. (Correct me if I'm wrong, I never actually bothered doing this for scheduled workflows).
This does not work for a few reasons:
- The
dbt-databricks
adapter does not have the samesession
-based authentication thatdbt-spark
has, where you can simply connect to a running Spark session and run dbt commands - To use the
dbt-databricks
adapter with a cluster requires that the cluster have an endpoint that can be provided to dbt to authenticate and send commands to the cluster. This is available for SQL Warehouse and All Purpose clusters, but not for job clusters, for security purposes (as far as I can tell).
-
Update your dbt project to be compatible with dbt-spark - this may not be that much work depending on your project. The below is what I had to do, but my project is not very complex so it is quite possible that there are more caveats than I'm aware of. If you run into any issues (and ideally find solutions) please comment and I will update this document.
-
Aside from the changes to
profiles.yml
, none of the below changes prevent you from using thedbt-databricks
adapter (and it's possible that theprofiles.yml
is fully solvable) -
Remove any references to
{{ target.catalog }}
in your models or configuration files- I had these in my sources.yml file (e.g.
database: {{ target.catalog }}
) - these were actually not required so removing them was simple - If you need to refer to the given catalog for any kind of logic you can use something like
target.name
ortarget.schema
instead
- I had these in my sources.yml file (e.g.
-
For things like incremental models to work properly you will need to add the
format
parameter to your models with the valuedelta
- I found this easiest to do in my project's
dbt_project.yml
file, by adding the following:
models: +file_format: delta
- I found this easiest to do in my project's
-
Modify your
profiles.yml
file to be compatible with dbt-spark- You can do this in your repo as different output targets or you can do this when running the job by overwriting the existing profiles.yml file - this is what I do, as I was not able to get
dbt-databricks
to work properly if I haddbt-spark
profiles in my file - What you will need, however you set it up, is the below (at minimum):
<project_name>: outputs: default: type: spark # this tells dbt to use dbt-spark method: session # this tells dbt-spark to use the current Spark session host: NA # not sure if this is necessary schema: <your schema> # not entirely sure if this is necessary target: default
- Note: With just this setup you will have issues with your models attempting to use the wrong catalog, as they will use whatever is defined as the default. We will handle this in the Python task prior to running dbt
- Note: I already had the
dbt-databricks
adapter skipping theoptimize
commands that are normally run after creating/updating models that are clustered or Z-ordered. I do not think thedbt-spark
adapter runs these. You could likely add these as a post-hook if desired, but with Databricks now offering predictive optimization it may not be necessary.
- You can do this in your repo as different output targets or you can do this when running the job by overwriting the existing profiles.yml file - this is what I do, as I was not able to get
- We will use Python tasks to run dbt, not dbt tasks
- The Python task should handle the following:
- Clone your dbt project repository to the Databricks cluster
- Replace the
profiles.yml
file with a dbt-spark compatible version- If your
profiles.yml
already has aspark
profile, you can simply provide the profile name to thedbtRunner
command
- If your
- Run dbt using
dbtRunner
to programmatically invoke dbt
- The Python task should also define the libraries it requires, which are
dbt
anddbt-spark
- I have these added to the Task's
libraries
section in the Databricks workflow
- I have these added to the Task's
The below is a simplified and not fully functional example of how my Python task is set up to run dbt on a Databricks job cluster. This is not a complete example, but should give you an idea of how to get started. It was literally cobbled together from existing code in multiple files (often with intrinsic parts left out), so don't expect to just copy/paste it and have it working.
Obviously there are things that can be done differently here. For example, if you already have a repo setup and synced to Databricks for your dbt project you can probably skip cloning the repo and just access the existing one. If your profiles.yml
already has a spark
profile you can probably skip replacing the file.
Note: The reason the below has selector_args
and args
as separate arguments is because dbt retry
does not accept selector arguments, since it simply retries the failed models from the previous run. Having the arguments in separate lists allows me to have an automatic retry mechanism that skips the selector args when invoking dbt retry
.
class PythonDbtTask:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# this fixes the issue with the dbt-spark adapter using the default catalog
self.spark.catalog.setCurrentCatalog(self.args.catalog)
def launch(self):
self.setup_dbt_repo()
self.run_dbt("run")
def setup_dbt_repo(self):
"""Clones the dbt repo to the current directory, changes current directory to the new folder, and writes
the profiles.yml file for executing with the dbt-spark adapter.
"""
repo_url = get_dbt_repo_url()
self.clone_repo(repo_url, self.args.git_ref)
self.write_dbt_profiles()
def write_dbt_profiles(self):
data = {}
data["config"] = {"use_colors": False, "send_anonymous_usage_stats": False}
data["dbx_dbt_legacy"] = {
"outputs": {
"default": {
"threads": self.args.threads,
"type": "spark",
"method": "session",
"schema": self.args.catalog,
"host": "NA",
}
},
"target": "default",
}
profiles_path = Path.cwd().joinpath("profiles.yml")
with open(profiles_path, "w") as f:
f.write(yaml.dump(data))
def run_dbt(self, command:str, selector_args:list[str]|None=None, args:list[str]|None=None):
selector_args = selector_args or []
args = args or []
all_args = ["dbt", command] + selector_args + args
res = DbtRunner().invoke(all_args)
if not res.success:
self.logger.error(f"Exception: {res.exception}")
raise DbtRunException(f"dbt {' '.join(all_args)} failed")
def clone_repo(self, git_url: str, branch: str, folder_name: str | None = None, change_dir: bool = True):
"""Clones a git repo.
Args:
git_url (str): The git url.
branch (str): The branch to clone.
folder_name (str | None, optional): The folder name to clone into. Defaults to None, in which\
case the folder name is the stem of the git url.
change_dir (bool, optional): Whether to change the current directory to the cloned repo. Defaults to True.
"""
folder_name = folder_name or Path(git_url).stem
clone_path = Path.cwd().joinpath(folder_name)
if clone_path.exists():
shutil.rmtree(clone_path)
Repo.clone_from(git_url, clone_path, branch=branch)
if change_dir:
os.chdir(clone_path)
- You can validate that the dbt command provided is valid by importing
cli
fromdbt.cli.main
and checking the available commands.
from dbt.cli.main import cli # type: ignore
return list(cli.commands)
-
It may be desirable to add a check that the libraries exist prior to running the dbt command. I do this in my
__init__
method by attempting to import the required libraries and raising an exception if they are not found. -
While I don't have this set up yet, it should be possible to run locally with the
dbt-databricks
adapter as long as you have a profile inprofiles.yml
or export the necessary environment variables. This would allow you to run dbt locally against a SQL Warehouse or All Purpose cluster as desired. This is the next item on my list to accomplish. -
Don't forget to set
threads
in yourprofiles.yml
file if you want to run dbt in parallel. Without this it will default to running in serial.
As noted above, the profiles.yml
file in my repo only has dbt-databricks
profiles. So when I'm developing locally I can still run dbt against a SQL Warehouse (or my personal all purpose cluster by exporting the endpoint to DATABRICKS_HTTP_PATH) without issue.
If you have dbt-spark
profile(s) in your profiles.yml
along with dbt-databricks
, you should be able to run dbt against a SQL Warehouse by providing a dbt-databricks
profile name when running your dbt commands.
dbt retry
will work properly here as long as your Python code is set up properly (e.g. you're not raising an exception to the point where the task fails and trying to run dbt retry
in a new task). The results from the dbtRunner.invoke
command will contain the status of the run (it will not raise an exception on it's own) so you can check this and decide whether to retry the failed models.
dbt seed
was the one thing I did not get working in my setup. When trying to run this command I got syntax errors - it seems that the whole seed file contents were provided in the sql command and the command got truncated. I haven't yet looked into a solution for this but would like to get it working, so if you have any suggestions please let me know.
This is a bit of work but I will say that it is worth it. The main goal of this was to save money and our costs have gone down significantly since moving to this setup - while actually completing the dbt runs faster. Our SQL Warehouse consumption from dbt has been reduced to 0 and our Jobs compute has not increased by any significant amount, likely because the models run so quickly on the job cluster (we were using 2X-Small SQL Warehouses previously due to cost).