Skip to content

Instantly share code, notes, and snippets.

@Petelin
Created April 9, 2019 10:20
Show Gist options
  • Save Petelin/3d257ac587b5f40eeb50fd897e71bfaa to your computer and use it in GitHub Desktop.
Save Petelin/3d257ac587b5f40eeb50fd897e71bfaa to your computer and use it in GitHub Desktop.
// Copyright (c) 2012-2019 Grabtaxi Holdings PTE LTD (GRAB), All Rights Reserved. NOTICE: All information contained herein
// is, and remains the property of GRAB. The intellectual and technical concepts contained herein are confidential, proprietary
// and controlled by GRAB and may be covered by patents, patents in process, and are protected by trade secret or copyright law.
//
// You are strictly forbidden to copy, download, store (in any medium), transmit, disseminate, adapt or change this material
// in any way unless prior written permission is obtained from GRAB. Access to the source code contained herein is hereby
// forbidden to anyone except current GRAB employees or contractors with binding Confidentiality and Non-disclosure agreements
// explicitly covering such access.
//
// The copyright notice above does not evidence any actual or intended publication or disclosure of this source code,
// which includes information that is confidential and/or proprietary, and is a trade secret, of GRAB.
//
// ANY REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE, OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE
// CODE WITHOUT THE EXPRESS WRITTEN CONSENT OF GRAB IS STRICTLY PROHIBITED, AND IN VIOLATION OF APPLICABLE LAWS AND
// INTERNATIONAL TREATIES. THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION DOES NOT CONVEY
// OR IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS, OR TO MANUFACTURE, USE, OR SELL ANYTHING
// THAT IT MAY DESCRIBE, IN WHOLE OR IN PART.
package main
import (
"context"
"log"
"net/http"
"strconv"
"sync"
"time"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/aws/aws-sdk-go/service/dynamodb/expression"
"github.com/aws/aws-sdk-go/service/sqs"
)
const (
TableName = "pete-food-cron-job"
KeyName = "name"
RangeName = "execute"
HTTP = "HTTP"
Stream = "STREAM"
QueryLimit int64 = 10
)
var (
ddb *dynamodb.DynamoDB
svc *sqs.SQS
queryInput *dynamodb.QueryInput
jobC chan *Job
)
type Config struct {
Retry int `json:"retry"`
}
type HttpData struct {
Url string `json:"url"`
Data interface{} `json:"data"`
Method string `json:"method"`
Headers interface{} `json:"headers"`
}
type StreamData struct {
URL string `json:"url"`
Message string `json:"message"`
}
type Job struct {
Name string `json:"name"`
Execute int `json:"execute"`
Type string `json:"type"`
Config Config `json:"config"`
Http HttpData `json:"http"`
Stream StreamData `json:"stream"`
}
func init() {
jobC = make(chan *Job, QueryLimit*2)
sess, err := session.NewSession()
if err != nil {
log.Println("init: can not create session")
panic(err)
}
ddb = dynamodb.New(sess)
svc = sqs.New(sess)
builder := expression.Builder{}.WithKeyCondition(
expression.Key(KeyName).Equal(expression.Value("default")).
And(expression.Key(RangeName).LessThanEqual(
//expression.Value(time.Now().Add(-time.Minute*10).UnixNano()/1000000),
expression.Value(time.Now().UnixNano() / 1000000))))
exp, err := builder.Build()
if err != nil {
log.Println("panic:init: create builder failed")
panic(err)
}
queryInput = &dynamodb.QueryInput{
TableName: aws.String(TableName),
KeyConditionExpression: exp.KeyCondition(),
ExpressionAttributeNames: exp.Names(),
ExpressionAttributeValues: exp.Values(),
Limit: aws.Int64(QueryLimit),
}
log.Println("init: finish init")
}
func scheduler(ctx context.Context) error {
t := time.NewTicker(time.Second * 1)
// use to notify scheduler and job handler exit
done, cancel := context.WithTimeout(context.Background(), time.Second*50)
defer cancel()
wg := &sync.WaitGroup{}
wg.Add(1)
go handler(ctx, wg, done.Done())
for {
select {
case <-t.C:
// 如果失败了, 疯狂重试
for i := 0; i < 3; i++ {
err := callDynamoDB(ctx, cancel)
if err == nil {
break
}
}
case <-done.Done():
// wait until all job finished
log.Println("scheduler: finish")
wg.Wait()
return nil
}
}
}
func handler(ctx context.Context, wg *sync.WaitGroup, done <-chan struct{}) {
for {
select {
case <-done:
log.Println("handler: finish")
wg.Done()
return
case job := <-jobC:
log.Printf("handler: get job %v", job)
if job.Type == HTTP {
//TODO: not implement
continue
} else if job.Type == Stream {
_ = sendMessageSQS(job.Execute, job.Stream)
}
}
}
}
func sendMessageSQS(id int, data StreamData) error {
req, output := svc.SendMessageRequest(&sqs.SendMessageInput{
//MessageAttributes: map[string]*sqs.MessageAttributeValue{},
MessageBody: aws.String(data.Message),
MessageDeduplicationId: aws.String(strconv.Itoa(id)),
MessageGroupId: aws.String("default"),
QueueUrl: aws.String(data.URL),
})
req.Config.WithHTTPClient(&http.Client{
Timeout: time.Millisecond * 100,
})
err := req.Send()
if err != nil {
log.Printf("Error:sendMessageSQS: send message failed, err:%v\n", err)
return err
}
if output.MessageId != nil && output.SequenceNumber != nil {
req, _ := ddb.DeleteItemRequest(&dynamodb.DeleteItemInput{
Key: map[string]*dynamodb.AttributeValue{
KeyName: {S: aws.String("default")},
RangeName: {N: aws.String(strconv.Itoa(id))},
},
TableName: aws.String(TableName),
})
req.Config.WithHTTPClient(&http.Client{
Timeout: time.Millisecond * 100,
})
err := req.Send()
if err != nil {
log.Printf("Error:sendMessageSQS: delete item failed, err:%v\n", err)
}
return err
}
return nil
}
func callDynamoDB(ctx context.Context, cancel context.CancelFunc) error {
req, resp := ddb.QueryRequest(queryInput)
req.Config.WithHTTPClient(&http.Client{
Timeout: time.Millisecond * 100,
})
err := req.Send()
if err != nil {
log.Printf("Error:DynamoDB: get err, %v\n", err)
return err
}
log.Printf("DynamoDB: count: %d\n", *resp.Count)
if len(resp.LastEvaluatedKey) > 0 {
queryInput.SetExclusiveStartKey(resp.LastEvaluatedKey)
}
if resp.LastEvaluatedKey == nil {
log.Println("callDynamoDB: there is no more item to handle")
cancel()
}
for _, i := range resp.Items {
job := &Job{}
err = dynamodbattribute.UnmarshalMap(i, job)
if err != nil {
log.Println("failed to unmarshal map", i)
continue
}
jobC <- job
}
return nil
}
func main() {
lambda.Start(scheduler)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment