Last active
October 13, 2021 18:35
-
-
Save nfx/473e07d6aff0ffb84b8fc8c2acf537e8 to your computer and use it in GitHub Desktop.
databricks_job_task resource WIP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/compute/model.go b/compute/model.go | |
index 1a2b43a2..b71afb6a 100644 | |
--- a/compute/model.go | |
+++ b/compute/model.go | |
@@ -612,6 +612,21 @@ func (j Job) ID() string { | |
return fmt.Sprintf("%d", j.JobID) | |
} | |
+func (j Job) hasTask(taskKey string) bool { | |
+ if j.Settings == nil { | |
+ return false | |
+ } | |
+ if !j.Settings.isMultiTask() { | |
+ return false | |
+ } | |
+ for _, task := range j.Settings.Tasks { | |
+ if task.TaskKey == taskKey { | |
+ return true | |
+ } | |
+ } | |
+ return false | |
+} | |
+ | |
// RunParameters ... | |
type RunParameters struct { | |
// a shortcut field to reuse this type for RunNow | |
diff --git a/provider/provider.go b/provider/provider.go | |
index 0f7b4f9e..eae77bd5 100644 | |
--- a/provider/provider.go | |
+++ b/provider/provider.go | |
@@ -51,6 +51,7 @@ func DatabricksProvider() *schema.Provider { | |
"databricks_cluster_policy": compute.ResourceClusterPolicy(), | |
"databricks_instance_pool": compute.ResourceInstancePool(), | |
"databricks_job": compute.ResourceJob(), | |
+ "databricks_job_task": compute.ResourceJobTask(), | |
"databricks_pipeline": compute.ResourcePipeline(), | |
"databricks_group": identity.ResourceGroup(), |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package compute | |
import ( | |
"context" | |
"fmt" | |
"github.com/databrickslabs/terraform-provider-databricks/common" | |
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" | |
) | |
func ResourceJobTask() *schema.Resource { | |
// TODO: add mutex on job creation | |
taskSchema := common.StructToSchema(JobTaskSettings{}, func(t map[string]*schema.Schema) map[string]*schema.Schema { | |
jobSettingsSchema(&t, "") | |
t["job_id"] = &schema.Schema{ | |
Type: schema.TypeString, | |
Required: true, | |
} | |
// depends_on is Terraform reserved keyword | |
t["depends"] = t["depends_on"] | |
delete(t, "depends_on") | |
return t | |
}) | |
p := common.NewPairID("job_id", "task_key") | |
return common.Resource{ | |
Schema: taskSchema, | |
Create: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error { | |
jobID := d.Get("job_id").(string) | |
jobsAPI := NewJobsAPI(ctx, c) | |
job, err := jobsAPI.Read(jobID) | |
if err != nil { | |
return fmt.Errorf("cannot load job %s: %w", jobID, err) | |
} | |
var task JobTaskSettings | |
err = common.DataToStructPointer(d, taskSchema, &task) | |
if err != nil { | |
return err | |
} | |
if job.hasTask(task.TaskKey) { | |
return fmt.Errorf("Job '%s' already has '%s' task", job.Settings.Name, task.TaskKey) | |
} | |
job.Settings.Tasks = append(job.Settings.Tasks, task) | |
err = jobsAPI.Update(jobID, *job.Settings) | |
if err != nil { | |
return fmt.Errorf("cannot update job %s: %w", jobID, err) | |
} | |
p.Pack(d) | |
return nil | |
}, | |
Read: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error { | |
jobID, taskKey, err := p.Unpack(d) | |
if err != nil { | |
return err | |
} | |
jobsAPI := NewJobsAPI(ctx, c) | |
job, err := jobsAPI.Read(jobID) | |
if err != nil { | |
return err | |
} | |
if !job.Settings.isMultiTask() { | |
return fmt.Errorf("Job '%s' is of %s format", job.Settings.Name, job.Settings.Format) | |
} | |
for _, task := range job.Settings.Tasks { | |
if task.TaskKey == taskKey { | |
return common.StructToData(task, taskSchema, d) | |
} | |
} | |
return common.NotFound(fmt.Sprintf("Task %s not found", taskKey)) | |
}, | |
Delete: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error { | |
jobID, taskKey, err := p.Unpack(d) | |
if err != nil { | |
return err | |
} | |
jobsAPI := NewJobsAPI(ctx, c) | |
job, err := jobsAPI.Read(jobID) | |
if err != nil { | |
return err | |
} | |
if job.hasTask(taskKey) { | |
return common.NotFound(fmt.Sprintf("Task %s not found", taskKey)) | |
} | |
newTasks := []JobTaskSettings{} | |
for _, task := range job.Settings.Tasks { | |
if task.TaskKey == taskKey { | |
continue | |
} | |
newTasks = append(newTasks, task) | |
} | |
job.Settings.Tasks = newTasks | |
return jobsAPI.Update(jobID, *job.Settings) | |
}, | |
}.ToResource() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
terraform { | |
required_providers { | |
databricks = { | |
source = "databrickslabs/databricks" | |
} | |
} | |
} | |
provider "databricks" { | |
} | |
resource "databricks_job" "this" { | |
name = "Terraform Demo (placeholder)" | |
// need an API to create a placeholder job, otherwise subsequent diffs override changes of each other | |
format = "MULTI_TASK" | |
} | |
resource "databricks_job_task" "a" { | |
job_id = databricks_job.this.id | |
task_key = "a" | |
new_cluster { | |
num_workers = 8 | |
spark_version = "9.1.x-scala2.12" | |
node_type_id = "Standard_F4s" | |
} | |
notebook_task { | |
notebook_path = "/Users/[email protected]/Fourth" | |
} | |
} | |
resource "databricks_job_task" "b" { | |
job_id = databricks_job.this.id | |
task_key = "b" | |
depends { | |
task_key = databricks_job_task.a.task_key | |
} | |
existing_cluster_id = "x" | |
notebook_task { | |
notebook_path = "/Users/[email protected]/Fifth" | |
} | |
} | |
output "job_url" { | |
value = "${databricks_job.this.url}/tasks" | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment