Created
August 11, 2017 15:07
-
-
Save aprice/0f8472524bc4df9dff18dbf0925adbe5 to your computer and use it in GitHub Desktop.
busboy cleans up after Chef nodes terminated in EC2 by listening for CloudWatch termination events on an SQS queue.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/json" | |
"fmt" | |
"io" | |
"io/ioutil" | |
"log" | |
"os" | |
"time" | |
"github.com/aws/aws-sdk-go/aws" | |
"github.com/aws/aws-sdk-go/aws/session" | |
"github.com/aws/aws-sdk-go/service/sqs" | |
"github.com/go-chef/chef" | |
) | |
// Config for the service | |
type Config struct { | |
// ChefURL is the base URL to the Chef server, as in knife.rb, *MUST* end in a slash! | |
ChefURL string | |
// ChefKeyPath is the path to the Chef key on local disk | |
ChefKeyPath string | |
// ChefName is the user name associated with key | |
ChefName string | |
// QueueRegion is the AWS region where the SQS queue lives | |
QueueRegion string | |
// QueueURL is the URL to the SQS queue to read termination events from | |
QueueURL string | |
} | |
/* CloudWatch event pattern: | |
{ | |
"source": [ | |
"aws.ec2" | |
], | |
"detail-type": [ | |
"EC2 Instance State-change Notification" | |
], | |
"detail": { | |
"state": [ | |
"terminated" | |
] | |
} | |
} | |
Target = dedicated SQS queue | |
*/ | |
func main() { | |
configFile, err := os.Open("config.json") | |
if err != nil { | |
log.Fatal("Failed to open config.json: ", err) | |
} | |
var config Config | |
err = json.NewDecoder(configFile).Decode(&config) | |
if err != nil { | |
log.Fatal("Failed to parse config.json: ", err) | |
} | |
// Init SQS client | |
sess, err := session.NewSessionWithOptions(session.Options{ | |
Config: aws.Config{ | |
Region: aws.String(config.QueueRegion), | |
}, | |
}) | |
if err != nil { | |
log.Fatal("Failed to get AWS session: ", err) | |
} | |
svc := sqs.New(sess) | |
// Init Chef client | |
key, err := ioutil.ReadFile(config.ChefKeyPath) | |
if err != nil { | |
log.Fatal("Couldn't read Chef key: ", err) | |
} | |
client, err := chef.NewClient(&chef.Config{ | |
Name: config.ChefName, | |
Key: string(key), | |
BaseURL: config.ChefURL, | |
}) | |
if err != nil { | |
log.Fatal("Issue setting up client: ", err) | |
} | |
// Poll for messages | |
for { | |
result, err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{ | |
AttributeNames: []*string{ | |
aws.String(sqs.MessageSystemAttributeNameSentTimestamp), | |
}, | |
MessageAttributeNames: []*string{ | |
aws.String(sqs.QueueAttributeNameAll), | |
}, | |
QueueUrl: aws.String(config.QueueURL), | |
MaxNumberOfMessages: aws.Int64(1), | |
VisibilityTimeout: aws.Int64(0), | |
WaitTimeSeconds: aws.Int64(20), | |
}) | |
if err != nil { | |
log.Fatal("Error retrieving message: ", err) | |
} | |
if len(result.Messages) == 0 { | |
time.Sleep(1 * time.Second) | |
continue | |
} | |
var notif ec2InstanceStateChangeNotification | |
err = json.Unmarshal([]byte(*result.Messages[0].Body), ¬if) | |
if err != nil { | |
log.Fatal("Failed to decode message: ", err, *result.Messages[0].Body) | |
} | |
qstr := "ec2_instance_id:" + notif.Detail.InstanceID | |
log.Print("Query: ", qstr) | |
nodeQuery, err := client.Search.NewQuery("node", qstr) | |
if err != nil { | |
log.Fatal("Failed to build query: ", err) | |
} | |
nodeQuery.SortBy = "name asc" | |
// Can't just use search.Exec/Do/whatever because it doesn't actually | |
// decode the repsonse into a type, you just get a map[string]interface{} | |
// back which is really stupid. | |
fullPath := fmt.Sprintf("search/%s", nodeQuery) | |
searchPayload := struct { | |
Total int | |
Start int | |
Rows []chef.Node | |
}{} | |
req, err := client.NewRequest("GET", fullPath, nil) | |
if err != nil { | |
log.Fatal("Failed to build request: ", err) | |
} | |
// Don't know why this returns the response when it's already decoded | |
// the payload, nor why it doesn't empty & close the body itself | |
res, err := client.Do(req, &searchPayload) | |
if res != nil { | |
defer func() { | |
io.Copy(ioutil.Discard, res.Body) | |
res.Body.Close() | |
}() | |
} | |
if err != nil { | |
log.Fatal("Failed to query nodes: ", err) | |
} else if searchPayload.Total == 0 || len(searchPayload.Rows) == 0 { | |
log.Print("Instance not found: ", notif.Detail.InstanceID) | |
} else { | |
node := searchPayload.Rows[0] | |
err = client.Nodes.Delete(node.Name) | |
if err != nil { | |
log.Fatal("Failed to delete node: ", err) | |
} | |
log.Print("Chef node deleted: ", node.Name) | |
err = client.Clients.Delete(node.Name) | |
if err != nil { | |
log.Fatal("Failed to delete client: ", err) | |
} | |
log.Print("Chef client deleted: ", node.Name) | |
} | |
_, err = svc.DeleteMessage(&sqs.DeleteMessageInput{ | |
QueueUrl: aws.String(config.QueueURL), | |
ReceiptHandle: result.Messages[0].ReceiptHandle, | |
}) | |
if err != nil { | |
log.Fatal("Delete Error:", err) | |
} | |
} | |
} | |
type ec2InstanceStateChangeNotification struct { | |
Region string | |
Resources []string | |
Detail struct { | |
InstanceID string `json:"instance-id"` | |
State string | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment