Skip to content

Instantly share code, notes, and snippets.

@hirokazumiyaji
Last active September 20, 2015 11:26
Show Gist options
  • Save hirokazumiyaji/90a541248d99c92febd0 to your computer and use it in GitHub Desktop.
Save hirokazumiyaji/90a541248d99c92febd0 to your computer and use it in GitHub Desktop.
AWS SQS Worker
package main
import (
"encoding/base64"
"os"
"os/signal"
"sync"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"
)
type HandlerFunc func([]byte) error
type Worker struct {
wg *sync.WaitGroup
connection *sqs.SQS
handler HandleFunc
signal chan os.Signal
}
func NewWorker(wg *sync.WaitGroup, connection *sqs.SQS, queueUrl string, handler HandleFunc) *Worker {
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGTERM)
return &Worker{
wg: wg,
connection: connection,
queueUrl: queueUrl,
handler: handler,
signal: sig,
}
}
func (w *Worker) Start() {
w.wg.Add(1)
go w.backendLoop()
}
func (w *Worker) Shutdown() {
defer w.wg.Done()
}
func (w *Worker) backendLoop() {
loop:
for {
select {
case _ = <-w.signal:
signal.Stop(w.signal)
break loop
default:
receiveMessage, err := w.connection.ReceiveMessage(
&sqs.ReceiveMessageInput{
QueueUrl: w.QueueUrl,
AttributeNames: []*string{
aws.String("QueueAttributeName"),
},
MaxNumberOfMessages: aws.Int64(1),
MessageAttributeNames: []*string{
aws.String("MessageAttributeName"),
},
VisibilityTimeout: aws.Int64(1),
WaitTimeSeconds: aws.Int64(1),
},
)
if err != nil {
fmt.Println("SQS Receive Message Error: ", err.Error())
continue
}
if len(receiveMessage.Message) == 0 {
fmt.Println("SQS Not Message")
continue
}
data, err := base64.StdEncoding.Decode(*receiveMessage.Messages[0].Body)
if err != nil {
fmt.Println("SQS Message Base64 Decode Error: ", err.Error())
continue
}
if err := w.handler(); err != nil {
fmt.Println("HandlerError: ", err.Error())
} else {
deleteMessage, err := w.connection.DeleteMessage(
&sqs.DeleteMessage{
},
)
if err != nil {
fmt.Println("SQS Delete Message Error: ", err.Error())
}
}
}
}
w.ShutDown()
}
func NewConfig() *aws.Config {
creds := aws.IAMCreds()
return aws.NewConfig().WithRegion("ap-northeast-1").WithCredentials(creds)
}
func main() {
config := NewConfig()
connection := sqs.New(config)
var wg sync.WaitGroup
w := NewWorker(
wg,
connection,
"",
func(data []byte) error {
fmt.Println(string(data))
return nil
},
)
w.Start()
wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment