Created
January 4, 2019 20:53
-
-
Save c-w/5cbf03f9578aaf45f4eb8c04664d24d1 to your computer and use it in GitHub Desktop.
Running notebooks in parallel on Azure Databricks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// define the name of the Azure Databricks notebook to run | |
val notebookToRun = ??? | |
// define some way to generate a sequence of workloads to run | |
val jobArguments = ??? | |
// define the number of workers per job | |
val workersPerJob = ??? | |
import java.util.concurrent.Executors | |
import scala.concurrent.{Await, ExecutionContext, Future} | |
import scala.concurrent.duration.Duration | |
// look up the number of workers in the cluster | |
val workersAvailable = sc.getExecutorMemoryStatus.size | |
// determine number of jobs we can run each with the desired worker count | |
val totalJobs = workersAvailable / workersPerJob | |
// look up required context for parallel run calls | |
val context = dbutils.notebook.getContext() | |
// create threadpool for parallel runs | |
implicit val executionContext = ExecutionContext.fromExecutorService( | |
Executors.newFixedThreadPool(totalJobs)) | |
try { | |
val futures = jobArguments.zipWithIndex.map { case (args, i) => | |
Future({ | |
// ensure thread knows about databricks context | |
dbutils.notebook.setContext(context) | |
// define up to maxJobs separate scheduler pools | |
sc.setLocalProperty("spark.scheduler.pool", s"pool${i % totalJobs}") | |
// start the job in the scheduler pool | |
dbutils.notebook.run(notebookToRun, timeoutSeconds = 0, args) | |
})} | |
// wait for all the jobs to finish processing | |
Await.result(Future.sequence(futures), atMost = Duration.Inf) | |
} finally { | |
// ensure to clean up the threadpool | |
executionContext.shutdownNow() | |
} |
Well, I tried to change it but for example, when trying to update ctx.extraConfigs["notebook_path"] = "new_path"
it is complaining that update
is not implemented for that property - so simply changing it does not work
my idea would now be to create a complete new context and use it for dbutils.notebook.setContext(ctx)
but I have no idea how to create a new instance of the ctx
object (and I am even more concerned if that works if you are saying its a singleton, which apparently we cannot update)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I haven't tried it (and haven't been too close to the Databricks world in a while) so the direct answer is: I don't know. However, I assume that it should be possible as long as the context doesn't rely on configurations specific to each future (it's a singleton and we can only have one active context per JVM). Could you try it out and report back?