- Table of Contents
- So you wanna run dbt on a Databricks job cluster
- Overview
- Existing Literature
- How this differs from the standard process
- Steps
- Tips and Tricks
If you're reading this you are probably interested in running dbt against a Databricks job cluster. This is not natively supported by Databricks, but it is possible to do so by using the dbt-spark adapter. Using this adapter is actively recommened against by Databricks, but so far I haven't had any* issues with it.
The steps below are what I have used to get this running for my own worfklows and projects. It is possible that there are other methods or steps that would be simpler, more efficient, etc. Please comment with any suggestions or improvements.
* The dbt seed command doesn't seem to work properly, but that is a very minor inconvenience for me and possibly fixable.
The general steps for this process are as follows:
- Update your dbt project(s) to be compatible with dbt-spark
- Use a Databricks workflow Python task to run dbt on a Databricks job cluster
- Install dbt and dbt-spark on the Databricks cluster
- Clone your dbt project repository to the Databricks cluster
- Replace the
profiles.ymlfile with a dbt-spark compatible version - In your Python task using
dbtRunnerto programmatically invoke dbt
There is very little about how to accomplish this already written, but what has been written is mainly (all?) by Leo Schick. These were quite invaluable to me for getting this working, but I did make some changes to avoid notebooks and to make the process more streamlined with my company's existing Databricks helper package.
Medium - How to run dbt on a Databricks job cluster
GitHub - databricks-dbt-helper
Github - dbt-databricks issue #575
Databricks does natively support running dbt in Databricks workflows, but with the following caveats:
- You must use a Databricks SQL Warehouse
- You must start a Databricks job cluster to handle running the dbt commands
This is not ideal for a few reasons:
- SQL Warehouses are the most expensive Databricks offering
- Starting a new cluster just to run dbt commands is not efficient, it requires paying twice for the same process
- You cannot use
dbt retryto re-run failed models, as the dbt manifest is lost between tasks
You can also run dbt on an All Purpose cluster, but this is also not ideal, mainly becaues All Purpose clusters are more expensive than job clusters. Outside of that issue, there is no real problem with this method. (Correct me if I'm wrong, I never actually bothered doing this for scheduled workflows).
This does not work for a few reasons:
- The
dbt-databricksadapter does not have the samesession-based authentication thatdbt-sparkhas, where you can simply connect to a running Spark session and run dbt commands - To use the
dbt-databricksadapter with a cluster requires that the cluster have an endpoint that can be provided to dbt to authenticate and send commands to the cluster. This is available for SQL Warehouse and All Purpose clusters, but not for job clusters, for security purposes (as far as I can tell).
-
Update your dbt project to be compatible with dbt-spark - this may not be that much work depending on your project. The below is what I had to do, but my project is not very complex so it is quite possible that there are more caveats than I'm aware of. If you run into any issues (and ideally find solutions) please comment and I will update this document.
-
Aside from the changes to
profiles.yml, none of the below changes prevent you from using thedbt-databricksadapter (and it's possible that theprofiles.ymlis fully solvable) -
Remove any references to
{{ target.catalog }}in your models or configuration files- I had these in my sources.yml file (e.g.
database: {{ target.catalog }}) - these were actually not required so removing them was simple - If you need to refer to the given catalog for any kind of logic you can use something like
target.nameortarget.schemainstead
- I had these in my sources.yml file (e.g.
-
For things like incremental models to work properly you will need to add the
formatparameter to your models with the valuedelta- I found this easiest to do in my project's
dbt_project.ymlfile, by adding the following:
models: +file_format: delta
- I found this easiest to do in my project's
-
Modify your
profiles.ymlfile to be compatible with dbt-spark- You can do this in your repo as different output targets or you can do this when running the job by overwriting the existing profiles.yml file - this is what I do, as I was not able to get
dbt-databricksto work properly if I haddbt-sparkprofiles in my file - What you will need, however you set it up, is the below (at minimum):
<project_name>: outputs: default: type: spark # this tells dbt to use dbt-spark method: session # this tells dbt-spark to use the current Spark session host: NA # not sure if this is necessary schema: <your schema> # not entirely sure if this is necessary target: default
- Note: With just this setup you will have issues with your models attempting to use the wrong catalog, as they will use whatever is defined as the default. We will handle this in the Python task prior to running dbt
- Note: I already had the
dbt-databricksadapter skipping theoptimizecommands that are normally run after creating/updating models that are clustered or Z-ordered. I do not think thedbt-sparkadapter runs these. You could likely add these as a post-hook if desired, but with Databricks now offering predictive optimization it may not be necessary.
- You can do this in your repo as different output targets or you can do this when running the job by overwriting the existing profiles.yml file - this is what I do, as I was not able to get
- We will use Python tasks to run dbt, not dbt tasks
- The Python task should handle the following:
- Clone your dbt project repository to the Databricks cluster
- Replace the
profiles.ymlfile with a dbt-spark compatible version- If your
profiles.ymlalready has asparkprofile, you can simply provide the profile name to thedbtRunnercommand
- If your
- Run dbt using
dbtRunnerto programmatically invoke dbt
- The Python task should also define the libraries it requires, which are
dbtanddbt-spark- I have these added to the Task's
librariessection in the Databricks workflow
- I have these added to the Task's
The below is a simplified and not fully functional example of how my Python task is set up to run dbt on a Databricks job cluster. This is not a complete example, but should give you an idea of how to get started. It was literally cobbled together from existing code in multiple files (often with intrinsic parts left out), so don't expect to just copy/paste it and have it working.
Obviously there are things that can be done differently here. For example, if you already have a repo setup and synced to Databricks for your dbt project you can probably skip cloning the repo and just access the existing one. If your profiles.yml
already has a spark profile you can probably skip replacing the file.
Note: The reason the below has selector_args and args as separate arguments is because dbt retry does not accept selector arguments, since it simply retries the failed models from the previous run. Having the arguments in separate lists allows me to have an automatic retry mechanism that skips the selector args when invoking dbt retry.
class PythonDbtTask:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# this fixes the issue with the dbt-spark adapter using the default catalog
self.spark.catalog.setCurrentCatalog(self.args.catalog)
def launch(self):
self.setup_dbt_repo()
self.run_dbt("run")
def setup_dbt_repo(self):
"""Clones the dbt repo to the current directory, changes current directory to the new folder, and writes
the profiles.yml file for executing with the dbt-spark adapter.
"""
repo_url = get_dbt_repo_url()
self.clone_repo(repo_url, self.args.git_ref)
self.write_dbt_profiles()
def write_dbt_profiles(self):
data = {}
data["config"] = {"use_colors": False, "send_anonymous_usage_stats": False}
data["dbx_dbt_legacy"] = {
"outputs": {
"default": {
"threads": self.args.threads,
"type": "spark",
"method": "session",
"schema": self.args.catalog,
"host": "NA",
}
},
"target": "default",
}
profiles_path = Path.cwd().joinpath("profiles.yml")
with open(profiles_path, "w") as f:
f.write(yaml.dump(data))
def run_dbt(self, command:str, selector_args:list[str]|None=None, args:list[str]|None=None):
selector_args = selector_args or []
args = args or []
all_args = ["dbt", command] + selector_args + args
res = DbtRunner().invoke(all_args)
if not res.success:
self.logger.error(f"Exception: {res.exception}")
raise DbtRunException(f"dbt {' '.join(all_args)} failed")
def clone_repo(self, git_url: str, branch: str, folder_name: str | None = None, change_dir: bool = True):
"""Clones a git repo.
Args:
git_url (str): The git url.
branch (str): The branch to clone.
folder_name (str | None, optional): The folder name to clone into. Defaults to None, in which\
case the folder name is the stem of the git url.
change_dir (bool, optional): Whether to change the current directory to the cloned repo. Defaults to True.
"""
folder_name = folder_name or Path(git_url).stem
clone_path = Path.cwd().joinpath(folder_name)
if clone_path.exists():
shutil.rmtree(clone_path)
Repo.clone_from(git_url, clone_path, branch=branch)
if change_dir:
os.chdir(clone_path)- You can validate that the dbt command provided is valid by importing
clifromdbt.cli.mainand checking the available commands.
from dbt.cli.main import cli # type: ignore
return list(cli.commands)-
It may be desirable to add a check that the libraries exist prior to running the dbt command. I do this in my
__init__method by attempting to import the required libraries and raising an exception if they are not found. -
While I don't have this set up yet, it should be possible to run locally with the
dbt-databricksadapter as long as you have a profile inprofiles.ymlor export the necessary environment variables. This would allow you to run dbt locally against a SQL Warehouse or All Purpose cluster as desired. This is the next item on my list to accomplish. -
Don't forget to set
threadsin yourprofiles.ymlfile if you want to run dbt in parallel. Without this it will default to running in serial.
As noted above, the profiles.yml file in my repo only has dbt-databricks profiles. So when I'm developing locally I can still run dbt against a SQL Warehouse (or my personal all purpose cluster by exporting the endpoint to DATABRICKS_HTTP_PATH) without issue.
If you have dbt-spark profile(s) in your profiles.yml along with dbt-databricks, you should be able to run dbt against a SQL Warehouse by providing a dbt-databricks profile name when running your dbt commands.
dbt retry will work properly here as long as your Python code is set up properly (e.g. you're not raising an exception to the point where the task fails and trying to run dbt retry in a new task). The results from the dbtRunner.invoke command will contain the status of the run (it will not raise an exception on it's own) so you can check this and decide whether to retry the failed models.
dbt seed was the one thing I did not get working in my setup. When trying to run this command I got syntax errors - it seems that the whole seed file contents were provided in the sql command and the command got truncated. I haven't yet looked into a solution for this but would like to get it working, so if you have any suggestions please let me know.
This is a bit of work but I will say that it is worth it. The main goal of this was to save money and our costs have gone down significantly since moving to this setup - while actually completing the dbt runs faster. Our SQL Warehouse consumption from dbt has been reduced to 0 and our Jobs compute has not increased by any significant amount, likely because the models run so quickly on the job cluster (we were using 2X-Small SQL Warehouses previously due to cost).