Skip to content

Instantly share code, notes, and snippets.

@eriknelson
Created July 26, 2017 19:22
Show Gist options
  • Save eriknelson/54908f8c46d0ccbf1268f2306f642bf2 to your computer and use it in GitHub Desktop.
Save eriknelson/54908f8c46d0ccbf1268f2306f642bf2 to your computer and use it in GitHub Desktop.
diff --git a/pkg/broker/broker.go b/pkg/broker/broker.go
index 8f5f6445..dfb2a93e 100644
--- a/pkg/broker/broker.go
+++ b/pkg/broker/broker.go
@@ -470,8 +470,22 @@ func (a AnsibleBroker) Deprovision(instanceUUID uuid.UUID, async bool) (*Deprovi
return nil, err
}
- var token string
+ // NOTE, HACK: Check to see if there are any outstanding jobs for the
+ // requested ServiceInstance. Not perfect, but we've decided its acceptable
+ // to assume that **any** running jobs are a deprovision job, and we should
+ // therefore return a 202 to indicate that the job is still running. Avoid
+ // Spawning a new deprovision pod.
+ jobsInProgress, err := a.dao.GetSvcInstJobsByState(
+ instanceUUID.String(), apb.StateInProgress)
+ if err != nil {
+ return nil, err
+ } else if len(jobsInProgress) != 0 {
+ // HACK: Assuming this is the token, there should only be one
+ op := jobsInProgress[0].Token
+ return &DeprovisionResponse{Operation: op}, nil
+ }
+ var token string
if async {
a.log.Info("ASYNC deprovision in progress")
// asynchronously provision and return the token for the lastoperation
diff --git a/pkg/broker/deprovision_job.go b/pkg/broker/deprovision_job.go
index f4a4fea0..aef4f48c 100644
--- a/pkg/broker/deprovision_job.go
+++ b/pkg/broker/deprovision_job.go
@@ -20,6 +20,7 @@ type DeprovisionMsg struct {
JobToken string `json:"job_token"`
SpecId string `json:"spec_id"`
Error string `json:"error"`
+ Msg string `json:"msg"`
}
func (m DeprovisionMsg) Render() string {
@@ -57,5 +58,5 @@ func (p *DeprovisionJob) Run(token string, msgBuffer chan<- WorkMsg) {
// send creds
p.log.Debug("sending message to channel")
msgBuffer <- DeprovisionMsg{InstanceUUID: p.serviceInstance.Id.String(),
- JobToken: token, SpecId: p.serviceInstance.Spec.Id, Error: ""}
+ JobToken: token, SpecId: p.serviceInstance.Spec.Id, Error: "", Msg: "_deprovision"}
}
diff --git a/pkg/broker/provision_subscriber.go b/pkg/broker/provision_subscriber.go
index 30efc0db..0db05d74 100644
--- a/pkg/broker/provision_subscriber.go
+++ b/pkg/broker/provision_subscriber.go
@@ -41,9 +41,12 @@ func (p *ProvisionWorkSubscriber) Subscribe(msgBuffer <-chan WorkMsg) {
// updates one day.
p.dao.SetState(pmsg.InstanceUUID, apb.JobState{Token: pmsg.JobToken, State: apb.StateInProgress, Podname: pmsg.PodName})
} else {
- json.Unmarshal([]byte(pmsg.Msg), &extCreds)
+ if pmsg.Msg != "_deprovision" {
+ // HACK
+ json.Unmarshal([]byte(pmsg.Msg), &extCreds)
+ p.dao.SetExtractedCredentials(pmsg.InstanceUUID, extCreds)
+ }
p.dao.SetState(pmsg.InstanceUUID, apb.JobState{Token: pmsg.JobToken, State: apb.StateSucceeded, Podname: pmsg.PodName})
- p.dao.SetExtractedCredentials(pmsg.InstanceUUID, extCreds)
}
}
}()
diff --git a/pkg/dao/dao.go b/pkg/dao/dao.go
index 5c48832d..6ae6ec45 100644
--- a/pkg/dao/dao.go
+++ b/pkg/dao/dao.go
@@ -3,6 +3,7 @@ package dao
import (
"context"
"encoding/json"
+ "errors"
"fmt"
"io/ioutil"
"net/http"
@@ -259,6 +260,65 @@ func (d *Dao) FindJobStateByState(state apb.State) ([]apb.RecoverStatus, error)
return recoverstatus, nil
}
+// GetSvcInstJobsByState - Lookup all jobs of a given state for a specific instance
+func (d *Dao) GetSvcInstJobsByState(
+ instanceId string, reqState apb.State,
+) ([]apb.JobState, error) {
+ d.log.Debug("Dao::GetSvcInstJobsByState")
+ allStates, err := d.getJobsForSvcInst(instanceId)
+
+ if err != nil {
+ return nil, err
+ } else if len(allStates) == 0 {
+ return allStates, nil
+ }
+
+ filtStates := []apb.JobState{}
+ for _, state := range allStates {
+ if state.State == reqState {
+ filtStates = append(filtStates, state)
+ }
+ }
+
+ d.log.Debugf("Filtered on state: [ %s ], returning %d jobs", len(filtStates))
+
+ return filtStates, nil
+}
+
+func (d *Dao) getJobsForSvcInst(instanceId string) ([]apb.JobState, error) {
+ d.log.Debug("Dao::getJobsForSvcInst")
+
+ var res *client.Response
+ var err error
+
+ lookupKey := fmt.Sprintf("/state/%s", instanceId)
+ opts := &client.GetOptions{Recursive: true}
+ if res, err = d.kapi.Get(context.Background(), lookupKey, opts); err != nil {
+ return nil, err
+ }
+
+ jobNodes := res.Node.Nodes
+ jobsCount := len(jobNodes)
+ if jobsCount == 0 {
+ return []apb.JobState{}, nil
+ }
+
+ d.log.Debug("Successfully loaded [ %d ] jobs objects from [ %s ]",
+ jobsCount, lookupKey)
+
+ retJobs := []apb.JobState{}
+ for _, node := range jobNodes {
+ js := apb.JobState{}
+ err := apb.LoadJSON(node.Value, &js)
+ if err != nil {
+ return nil, errors.New(
+ fmt.Sprintf("An error occurred trying to parse job state of [ %s ]\n%s", node.Key, err.Error()))
+ }
+ retJobs = append(retJobs, js)
+ }
+ return retJobs, nil
+}
+
// GetServiceInstance - Retrieve specific service instance from the kvp API.
func (d *Dao) GetServiceInstance(id string) (*apb.ServiceInstance, error) {
spec := &apb.ServiceInstance{}
diff --git a/pkg/handler/handler.go b/pkg/handler/handler.go
index aead001b..75524870 100644
--- a/pkg/handler/handler.go
+++ b/pkg/handler/handler.go
@@ -185,16 +185,20 @@ func (h handler) deprovision(w http.ResponseWriter, r *http.Request, params map[
h.log.Debug("err for deprovision - %#v", err)
}
- switch err {
- case broker.ErrorNotFound:
- writeResponse(w, http.StatusGone, broker.DeprovisionResponse{})
- return
- case broker.ErrorBindingExists:
- writeResponse(w, http.StatusBadRequest, broker.DeprovisionResponse{})
- return
+ if err != nil {
+ switch err {
+ case broker.ErrorNotFound:
+ writeResponse(w, http.StatusGone, broker.DeprovisionResponse{})
+ case broker.ErrorBindingExists:
+ writeResponse(w, http.StatusBadRequest, broker.DeprovisionResponse{})
+ }
}
- writeDefaultResponse(w, http.StatusOK, resp, err)
+ if async {
+ writeDefaultResponse(w, http.StatusAccepted, resp, err)
+ } else {
+ writeDefaultResponse(w, http.StatusOK, resp, err)
+ }
}
func (h handler) bind(w http.ResponseWriter, r *http.Request, params map[string]string) {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment