Created
July 26, 2017 19:22
-
-
Save eriknelson/54908f8c46d0ccbf1268f2306f642bf2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/pkg/broker/broker.go b/pkg/broker/broker.go | |
index 8f5f6445..dfb2a93e 100644 | |
--- a/pkg/broker/broker.go | |
+++ b/pkg/broker/broker.go | |
@@ -470,8 +470,22 @@ func (a AnsibleBroker) Deprovision(instanceUUID uuid.UUID, async bool) (*Deprovi | |
return nil, err | |
} | |
- var token string | |
+ // NOTE, HACK: Check to see if there are any outstanding jobs for the | |
+ // requested ServiceInstance. Not perfect, but we've decided its acceptable | |
+ // to assume that **any** running jobs are a deprovision job, and we should | |
+ // therefore return a 202 to indicate that the job is still running. Avoid | |
+ // Spawning a new deprovision pod. | |
+ jobsInProgress, err := a.dao.GetSvcInstJobsByState( | |
+ instanceUUID.String(), apb.StateInProgress) | |
+ if err != nil { | |
+ return nil, err | |
+ } else if len(jobsInProgress) != 0 { | |
+ // HACK: Assuming this is the token, there should only be one | |
+ op := jobsInProgress[0].Token | |
+ return &DeprovisionResponse{Operation: op}, nil | |
+ } | |
+ var token string | |
if async { | |
a.log.Info("ASYNC deprovision in progress") | |
// asynchronously provision and return the token for the lastoperation | |
diff --git a/pkg/broker/deprovision_job.go b/pkg/broker/deprovision_job.go | |
index f4a4fea0..aef4f48c 100644 | |
--- a/pkg/broker/deprovision_job.go | |
+++ b/pkg/broker/deprovision_job.go | |
@@ -20,6 +20,7 @@ type DeprovisionMsg struct { | |
JobToken string `json:"job_token"` | |
SpecId string `json:"spec_id"` | |
Error string `json:"error"` | |
+ Msg string `json:"msg"` | |
} | |
func (m DeprovisionMsg) Render() string { | |
@@ -57,5 +58,5 @@ func (p *DeprovisionJob) Run(token string, msgBuffer chan<- WorkMsg) { | |
// send creds | |
p.log.Debug("sending message to channel") | |
msgBuffer <- DeprovisionMsg{InstanceUUID: p.serviceInstance.Id.String(), | |
- JobToken: token, SpecId: p.serviceInstance.Spec.Id, Error: ""} | |
+ JobToken: token, SpecId: p.serviceInstance.Spec.Id, Error: "", Msg: "_deprovision"} | |
} | |
diff --git a/pkg/broker/provision_subscriber.go b/pkg/broker/provision_subscriber.go | |
index 30efc0db..0db05d74 100644 | |
--- a/pkg/broker/provision_subscriber.go | |
+++ b/pkg/broker/provision_subscriber.go | |
@@ -41,9 +41,12 @@ func (p *ProvisionWorkSubscriber) Subscribe(msgBuffer <-chan WorkMsg) { | |
// updates one day. | |
p.dao.SetState(pmsg.InstanceUUID, apb.JobState{Token: pmsg.JobToken, State: apb.StateInProgress, Podname: pmsg.PodName}) | |
} else { | |
- json.Unmarshal([]byte(pmsg.Msg), &extCreds) | |
+ if pmsg.Msg != "_deprovision" { | |
+ // HACK | |
+ json.Unmarshal([]byte(pmsg.Msg), &extCreds) | |
+ p.dao.SetExtractedCredentials(pmsg.InstanceUUID, extCreds) | |
+ } | |
p.dao.SetState(pmsg.InstanceUUID, apb.JobState{Token: pmsg.JobToken, State: apb.StateSucceeded, Podname: pmsg.PodName}) | |
- p.dao.SetExtractedCredentials(pmsg.InstanceUUID, extCreds) | |
} | |
} | |
}() | |
diff --git a/pkg/dao/dao.go b/pkg/dao/dao.go | |
index 5c48832d..6ae6ec45 100644 | |
--- a/pkg/dao/dao.go | |
+++ b/pkg/dao/dao.go | |
@@ -3,6 +3,7 @@ package dao | |
import ( | |
"context" | |
"encoding/json" | |
+ "errors" | |
"fmt" | |
"io/ioutil" | |
"net/http" | |
@@ -259,6 +260,65 @@ func (d *Dao) FindJobStateByState(state apb.State) ([]apb.RecoverStatus, error) | |
return recoverstatus, nil | |
} | |
+// GetSvcInstJobsByState - Lookup all jobs of a given state for a specific instance | |
+func (d *Dao) GetSvcInstJobsByState( | |
+ instanceId string, reqState apb.State, | |
+) ([]apb.JobState, error) { | |
+ d.log.Debug("Dao::GetSvcInstJobsByState") | |
+ allStates, err := d.getJobsForSvcInst(instanceId) | |
+ | |
+ if err != nil { | |
+ return nil, err | |
+ } else if len(allStates) == 0 { | |
+ return allStates, nil | |
+ } | |
+ | |
+ filtStates := []apb.JobState{} | |
+ for _, state := range allStates { | |
+ if state.State == reqState { | |
+ filtStates = append(filtStates, state) | |
+ } | |
+ } | |
+ | |
+ d.log.Debugf("Filtered on state: [ %s ], returning %d jobs", len(filtStates)) | |
+ | |
+ return filtStates, nil | |
+} | |
+ | |
+func (d *Dao) getJobsForSvcInst(instanceId string) ([]apb.JobState, error) { | |
+ d.log.Debug("Dao::getJobsForSvcInst") | |
+ | |
+ var res *client.Response | |
+ var err error | |
+ | |
+ lookupKey := fmt.Sprintf("/state/%s", instanceId) | |
+ opts := &client.GetOptions{Recursive: true} | |
+ if res, err = d.kapi.Get(context.Background(), lookupKey, opts); err != nil { | |
+ return nil, err | |
+ } | |
+ | |
+ jobNodes := res.Node.Nodes | |
+ jobsCount := len(jobNodes) | |
+ if jobsCount == 0 { | |
+ return []apb.JobState{}, nil | |
+ } | |
+ | |
+ d.log.Debug("Successfully loaded [ %d ] jobs objects from [ %s ]", | |
+ jobsCount, lookupKey) | |
+ | |
+ retJobs := []apb.JobState{} | |
+ for _, node := range jobNodes { | |
+ js := apb.JobState{} | |
+ err := apb.LoadJSON(node.Value, &js) | |
+ if err != nil { | |
+ return nil, errors.New( | |
+ fmt.Sprintf("An error occurred trying to parse job state of [ %s ]\n%s", node.Key, err.Error())) | |
+ } | |
+ retJobs = append(retJobs, js) | |
+ } | |
+ return retJobs, nil | |
+} | |
+ | |
// GetServiceInstance - Retrieve specific service instance from the kvp API. | |
func (d *Dao) GetServiceInstance(id string) (*apb.ServiceInstance, error) { | |
spec := &apb.ServiceInstance{} | |
diff --git a/pkg/handler/handler.go b/pkg/handler/handler.go | |
index aead001b..75524870 100644 | |
--- a/pkg/handler/handler.go | |
+++ b/pkg/handler/handler.go | |
@@ -185,16 +185,20 @@ func (h handler) deprovision(w http.ResponseWriter, r *http.Request, params map[ | |
h.log.Debug("err for deprovision - %#v", err) | |
} | |
- switch err { | |
- case broker.ErrorNotFound: | |
- writeResponse(w, http.StatusGone, broker.DeprovisionResponse{}) | |
- return | |
- case broker.ErrorBindingExists: | |
- writeResponse(w, http.StatusBadRequest, broker.DeprovisionResponse{}) | |
- return | |
+ if err != nil { | |
+ switch err { | |
+ case broker.ErrorNotFound: | |
+ writeResponse(w, http.StatusGone, broker.DeprovisionResponse{}) | |
+ case broker.ErrorBindingExists: | |
+ writeResponse(w, http.StatusBadRequest, broker.DeprovisionResponse{}) | |
+ } | |
} | |
- writeDefaultResponse(w, http.StatusOK, resp, err) | |
+ if async { | |
+ writeDefaultResponse(w, http.StatusAccepted, resp, err) | |
+ } else { | |
+ writeDefaultResponse(w, http.StatusOK, resp, err) | |
+ } | |
} | |
func (h handler) bind(w http.ResponseWriter, r *http.Request, params map[string]string) { |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment