Created
April 9, 2019 10:20
-
-
Save Petelin/3d257ac587b5f40eeb50fd897e71bfaa to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright (c) 2012-2019 Grabtaxi Holdings PTE LTD (GRAB), All Rights Reserved. NOTICE: All information contained herein | |
// is, and remains the property of GRAB. The intellectual and technical concepts contained herein are confidential, proprietary | |
// and controlled by GRAB and may be covered by patents, patents in process, and are protected by trade secret or copyright law. | |
// | |
// You are strictly forbidden to copy, download, store (in any medium), transmit, disseminate, adapt or change this material | |
// in any way unless prior written permission is obtained from GRAB. Access to the source code contained herein is hereby | |
// forbidden to anyone except current GRAB employees or contractors with binding Confidentiality and Non-disclosure agreements | |
// explicitly covering such access. | |
// | |
// The copyright notice above does not evidence any actual or intended publication or disclosure of this source code, | |
// which includes information that is confidential and/or proprietary, and is a trade secret, of GRAB. | |
// | |
// ANY REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE, OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE | |
// CODE WITHOUT THE EXPRESS WRITTEN CONSENT OF GRAB IS STRICTLY PROHIBITED, AND IN VIOLATION OF APPLICABLE LAWS AND | |
// INTERNATIONAL TREATIES. THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION DOES NOT CONVEY | |
// OR IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS, OR TO MANUFACTURE, USE, OR SELL ANYTHING | |
// THAT IT MAY DESCRIBE, IN WHOLE OR IN PART. | |
package main | |
import ( | |
"context" | |
"log" | |
"net/http" | |
"strconv" | |
"sync" | |
"time" | |
"github.com/aws/aws-lambda-go/lambda" | |
"github.com/aws/aws-sdk-go/aws" | |
"github.com/aws/aws-sdk-go/aws/session" | |
"github.com/aws/aws-sdk-go/service/dynamodb" | |
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" | |
"github.com/aws/aws-sdk-go/service/dynamodb/expression" | |
"github.com/aws/aws-sdk-go/service/sqs" | |
) | |
const ( | |
TableName = "pete-food-cron-job" | |
KeyName = "name" | |
RangeName = "execute" | |
HTTP = "HTTP" | |
Stream = "STREAM" | |
QueryLimit int64 = 10 | |
) | |
var ( | |
ddb *dynamodb.DynamoDB | |
svc *sqs.SQS | |
queryInput *dynamodb.QueryInput | |
jobC chan *Job | |
) | |
type Config struct { | |
Retry int `json:"retry"` | |
} | |
type HttpData struct { | |
Url string `json:"url"` | |
Data interface{} `json:"data"` | |
Method string `json:"method"` | |
Headers interface{} `json:"headers"` | |
} | |
type StreamData struct { | |
URL string `json:"url"` | |
Message string `json:"message"` | |
} | |
type Job struct { | |
Name string `json:"name"` | |
Execute int `json:"execute"` | |
Type string `json:"type"` | |
Config Config `json:"config"` | |
Http HttpData `json:"http"` | |
Stream StreamData `json:"stream"` | |
} | |
func init() { | |
jobC = make(chan *Job, QueryLimit*2) | |
sess, err := session.NewSession() | |
if err != nil { | |
log.Println("init: can not create session") | |
panic(err) | |
} | |
ddb = dynamodb.New(sess) | |
svc = sqs.New(sess) | |
builder := expression.Builder{}.WithKeyCondition( | |
expression.Key(KeyName).Equal(expression.Value("default")). | |
And(expression.Key(RangeName).LessThanEqual( | |
//expression.Value(time.Now().Add(-time.Minute*10).UnixNano()/1000000), | |
expression.Value(time.Now().UnixNano() / 1000000)))) | |
exp, err := builder.Build() | |
if err != nil { | |
log.Println("panic:init: create builder failed") | |
panic(err) | |
} | |
queryInput = &dynamodb.QueryInput{ | |
TableName: aws.String(TableName), | |
KeyConditionExpression: exp.KeyCondition(), | |
ExpressionAttributeNames: exp.Names(), | |
ExpressionAttributeValues: exp.Values(), | |
Limit: aws.Int64(QueryLimit), | |
} | |
log.Println("init: finish init") | |
} | |
func scheduler(ctx context.Context) error { | |
t := time.NewTicker(time.Second * 1) | |
// use to notify scheduler and job handler exit | |
done, cancel := context.WithTimeout(context.Background(), time.Second*50) | |
defer cancel() | |
wg := &sync.WaitGroup{} | |
wg.Add(1) | |
go handler(ctx, wg, done.Done()) | |
for { | |
select { | |
case <-t.C: | |
// 如果失败了, 疯狂重试 | |
for i := 0; i < 3; i++ { | |
err := callDynamoDB(ctx, cancel) | |
if err == nil { | |
break | |
} | |
} | |
case <-done.Done(): | |
// wait until all job finished | |
log.Println("scheduler: finish") | |
wg.Wait() | |
return nil | |
} | |
} | |
} | |
func handler(ctx context.Context, wg *sync.WaitGroup, done <-chan struct{}) { | |
for { | |
select { | |
case <-done: | |
log.Println("handler: finish") | |
wg.Done() | |
return | |
case job := <-jobC: | |
log.Printf("handler: get job %v", job) | |
if job.Type == HTTP { | |
//TODO: not implement | |
continue | |
} else if job.Type == Stream { | |
_ = sendMessageSQS(job.Execute, job.Stream) | |
} | |
} | |
} | |
} | |
func sendMessageSQS(id int, data StreamData) error { | |
req, output := svc.SendMessageRequest(&sqs.SendMessageInput{ | |
//MessageAttributes: map[string]*sqs.MessageAttributeValue{}, | |
MessageBody: aws.String(data.Message), | |
MessageDeduplicationId: aws.String(strconv.Itoa(id)), | |
MessageGroupId: aws.String("default"), | |
QueueUrl: aws.String(data.URL), | |
}) | |
req.Config.WithHTTPClient(&http.Client{ | |
Timeout: time.Millisecond * 100, | |
}) | |
err := req.Send() | |
if err != nil { | |
log.Printf("Error:sendMessageSQS: send message failed, err:%v\n", err) | |
return err | |
} | |
if output.MessageId != nil && output.SequenceNumber != nil { | |
req, _ := ddb.DeleteItemRequest(&dynamodb.DeleteItemInput{ | |
Key: map[string]*dynamodb.AttributeValue{ | |
KeyName: {S: aws.String("default")}, | |
RangeName: {N: aws.String(strconv.Itoa(id))}, | |
}, | |
TableName: aws.String(TableName), | |
}) | |
req.Config.WithHTTPClient(&http.Client{ | |
Timeout: time.Millisecond * 100, | |
}) | |
err := req.Send() | |
if err != nil { | |
log.Printf("Error:sendMessageSQS: delete item failed, err:%v\n", err) | |
} | |
return err | |
} | |
return nil | |
} | |
func callDynamoDB(ctx context.Context, cancel context.CancelFunc) error { | |
req, resp := ddb.QueryRequest(queryInput) | |
req.Config.WithHTTPClient(&http.Client{ | |
Timeout: time.Millisecond * 100, | |
}) | |
err := req.Send() | |
if err != nil { | |
log.Printf("Error:DynamoDB: get err, %v\n", err) | |
return err | |
} | |
log.Printf("DynamoDB: count: %d\n", *resp.Count) | |
if len(resp.LastEvaluatedKey) > 0 { | |
queryInput.SetExclusiveStartKey(resp.LastEvaluatedKey) | |
} | |
if resp.LastEvaluatedKey == nil { | |
log.Println("callDynamoDB: there is no more item to handle") | |
cancel() | |
} | |
for _, i := range resp.Items { | |
job := &Job{} | |
err = dynamodbattribute.UnmarshalMap(i, job) | |
if err != nil { | |
log.Println("failed to unmarshal map", i) | |
continue | |
} | |
jobC <- job | |
} | |
return nil | |
} | |
func main() { | |
lambda.Start(scheduler) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment