functions:
crawl:
handler: index.handler
events:
- schedule: rate(2 hours)
- schedule: cron(0 12 * * ? *)
As you can see, it supports both a DSL format and a standard CRON format.
WARNINGS:
- The smallest unit is 1 minute.
- The
rate
function only accepts the following units:minute
,minutes
,hour
,hours
,day
,days
(as you can see, you have to use the singular version if the value is equal to 1).
The following lambda handler is sending a hello world message to the OTHER_FUNC
lambda. The value passed to the FunctionName
is supposed to be the other function's ARN. What's important in this example is the answer to how we got that ARN in the process.env.OTHER_FUNC
environment variable. That answer will be given in the next section about the serverless.yml.
index.js
const AWS = require('aws-sdk')
const lambda = new AWS.Lambda({apiVersion: '2015-03-31'})
exports.handler = function(event, context, callback) {
lambda.invoke({ FunctionName:process.env.OTHER_FUNC, body:JSON.stringify({ hello:'world' }) }, (err, data) => {
return callback(null, 'records processed')
})
}
serverless.yml
service: lambda_invoking_another_lambda
custom:
stage: ${opt:stage, 'dev'}
plugins:
- serverless-iam-roles-per-function
provider:
name: aws
runtime: nodejs10.x
profile: neap
stage: ${self:custom.stage}
region: ap-southeast-2
functions:
mainFunc:
handler: mainFunc.handler
events:
- http:
path: /lambda_01
method: ANY
iamRoleStatementsInherit: true
iamRoleStatements:
- Effect: 'Allow'
Action:
- lambda:InvokeFunction
Resource:
Fn::GetAtt:
- OtherFuncLambdaFunction
- Arn
environment:
OTHER_FUNC:
Fn::GetAtt:
- OtherFuncLambdaFunction
- Arn
otherFunc:
handler: otherFunc.handler
As you can see, there is this hidden convention that the CloudFormation resource name for our functions is:
<function name starting with capital letter>LambdaFunction
In our case, the CloudFormation resource for our otherFunc
function is named OtherFuncLambdaFunction
.
The principle is quite simple. Deploy an API Gateway in Websocket mode and an AWS Lambda that is triggered each time a client connects and disconnects. Each time a connection is made, a new connectionId
is issued for that that specific client. The Lambda's responsibility is to store that connectionId somewhere so that other clients can POST messages back to that client using a simple HTTP POST to the API Gateway (done via the nodeJS AWS-SDK). This recipe contains three parts:
- API Gateway and Lambda setup
- Client's code to connect to the API Gateway Websocket
- Server code that can POST messages back to the client using its associated connectionId
serverless.yml:
service: websocket-test
plugins:
- serverless-iam-roles-per-function
provider:
name: aws
runtime: nodejs10.x
region: ap-southeast-2
websocketsApiName: websocket-test-api
websocketsApiRouteSelectionExpression: $request.body.action # custom routes are selecte
functions:
connectionHandler:
handler: index.handler
events:
- websocket:
route: $connect
- websocket:
route: $disconnect
- websocket:
route: onMessage
index.js:
const { app } = require('@neap/funky')
app.all('/', (req,res) => {
const payload = req.params._awsParams || { message: 'No AWS data' }
console.log(JSON.stringify(payload, null, ' '))
return res.status(200).send('done')
})
eval(app.listen({ port:3000, host:'aws' }))
where the payload
structure is similar to this:
{
"headers": {
"Host": "abcd12345.execute-api.ap-southeast-2.amazonaws.com",
"x-api-key": "",
"X-Forwarded-For": "",
"x-restapi": ""
},
"multiValueHeaders": {
"Host": [
"abcd12345.execute-api.ap-southeast-2.amazonaws.com"
],
"x-api-key": [
""
],
"X-Forwarded-For": [
""
],
"x-restapi": [
""
]
},
"requestContext": {
"routeKey": "$disconnect",
"messageId": null,
"eventType": "DISCONNECT",
"extendedRequestId": "DBI0XHL_ywMFatg=",
"requestTime": "11/Nov/2019:23:28:02 +0000",
"messageDirection": "IN",
"stage": "dev",
"connectedAt": 1573514872512,
"requestTimeEpoch": 1573514882208,
"identity": {
"cognitoIdentityPoolId": null,
"cognitoIdentityId": null,
"principalOrgId": null,
"cognitoAuthenticationType": null,
"userArn": null,
"userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36",
"accountId": null,
"caller": null,
"sourceIp": "49.195.105.136",
"accessKey": null,
"cognitoAuthenticationProvider": null,
"user": null
},
"requestId": "DBI0XHL_ywMFatg=",
"domainName": "abcd12345.execute-api.ap-southeast-2.amazonaws.com",
"connectionId": "DBIy2dg0SwMCERQ=",
"apiId": "abcd12345"
},
"isBase64Encoded": false
}
Use this payload to determine:
- The connectionId
- The type, i.e.,
$connect
vs$disconnect
<script type="text/javascript">
let socket = new WebSocket("wss://abcd12345.execute-api.ap-southeast-2.amazonaws.com/dev")
socket.onopen = function(e) {
alert("[open] Connection established")
alert("Sending to server")
// The following request will fire back 'socket.onmessage' below. The value
// of the 'event.data' will be similar to '{"message": "Forbidden", "connectionId":"DBNjmfOmSwMCJiA=", "requestId":"DBNlXFDjSwMF3FA="}'
socket.send("My name is John")
}
/**
* Callback function fired when the server sends a message.
*
* @param {String} event.data Server's message
* @return {Void}
*/
socket.onmessage = function(event) {
alert(`[message] Data received from server: ${event.data}`)
}
socket.onclose = function(event) {
if (event.wasClean) {
alert(`[close] Connection closed cleanly, code=${event.code} reason=${event.reason}`)
} else {
// e.g. server process killed or network down
// event.code is usually 1006 in this case
alert('[close] Connection died')
}
}
socket.onerror = function(error) {
alert(`[error] ${error.message}`)
}
</script>
const { co } = require('core-async')
let api_gateways = {}
const getApiGateway = endpoint => {
if (!endpoint)
throw new Error('Missing required argument \'endpoint\'.')
let api_gw = api_gateways[endpoint]
if (!api_gw) {
const AWS = require('aws-sdk')
// apply the patch
require('./patch_apigateway.js')
api_gw = new AWS.ApiGatewayManagementApi({ apiVersion: '2018-11-29', endpoint })
api_gateways[endpoint] = api_gw
}
return api_gw
}
/**
* Posts a message to an AWS API Gateway.
*
* @param {String} endpoint API Gateway endpoint (without the protocol).
* @param {String} connectionId Websocket connection ID.
* @param {Object} data Any payload.
* @yield {Object} Empty object
*/
const post = ({ endpoint, connectionId, data }) => co(function *(){
if (!connectionId)
throw new Error('Missing required argument \'connectionId\'.')
if (!data)
return
const api_gw = getApiGateway(endpoint)
const t = typeof(data)
const payload = t == 'string' || (data instanceof Buffer)
? data
: (data instanceof Date)
? data.toISOString()
: t == 'object'
? JSON.stringify(data) : `${data}`
return yield api_gw.postToConnection({ ConnectionId: connectionId, Data: payload }).promise()
})
serverless.yml
service: sqs-trigger-demo
plugins:
- serverless-iam-roles-per-function
provider:
name: aws
runtime: nodejs10.x
functions:
sendMessageToSQS:
handler: index.handler
timeout: 300
events:
- http:
path: /send
method: POST
iamRoleStatementsInherit: true
iamRoleStatements:
- Effect: 'Allow'
Action:
- sqs:SendMessage
Resource:
Fn::GetAtt:
- YourQueue
- Arn
environment:
YOUR_QUEUE: !Ref YourQueue
processSQS:
handler: index.handler
events:
- sqs:
arn:
Fn::GetAtt:
- YourQueue
- Arn
batchSize: 1
resources:
Resources:
YourQueue:
Type: AWS::SQS::Queue
NOTICE: The optional
batchSize
property in the SQS event. If you don't specify it, the default is 10 (which is also the max). This means that a single lambda can be triggered by up to 10 messages at the same time.
processSQS/index.js:
exports.handler = function(event, context, callback) {
const records = event.Records
console.log(JSON.stringify(records, null, ' '))
return callback(null, 'records processed')
}
or, if you're using @neap/funky
:
const { app } = require('@neap/funky')
app.all('/', (req,res) => {
const payload = req.params._awsParams
console.log(payload)
return res.status(200).send('done')
})
eval(app.listen({ port:3000, host:'aws' }))
where payload
:
{
Records: [{
messageId: '20b6436d-d8d2-415d-970e-0068e90f5df6',
receiptHandle: 'AQEBw2N73wSb+3srF3QvjuTzkd/tQGmRqxJvcCinTadcE....',
body: '{"id":13482,"device_uuid":"70B3D570500046FE"}',
attributes: [Object],
messageAttributes: {},
md5OfBody: '75b13cf93658767850fe30d5ecb4a25b',
eventSource: 'aws:sqs',
eventSourceARN: 'arn:this-is-the-arn-of-the-resource-that-triggered-this-lambda',
awsRegion: 'ap-southeast-2'
}]
}
The reason and consequences of this setup are not trivial. To know more about it, please refer to this document: https://gist.github.com/nicolasdao/2693912322fab8b4be19cca8920c603e#lambda-connected-to-a-vpc
To connect a Lambda to a VPC, you must provide two additional pieces of configuration:
- VPC's Subnet IDs the lambda must have access to.
- At least one VPC security groups (the simplest one gives allows all inboud traffic in).
If any of those two settings is omitted, then the lambda is not associated to that VPC.
service: your-func-name
provider:
name: aws
runtime: nodejs10.x
memorySize: 512
region: ap-southeast-2
functions:
firstFunc:
handler: index.handler
vpc:
securityGroupIds:
- sg-1234
subnetIds:
- subnet-1234
- subnet-4567
events:
- http:
path: /
method: ANY
- http:
path: /{any+}
method: ANY
To tag, you need the serverless-iam-roles-per-function
plugin.
npm i serverless-iam-roles-per-function -D
plugins:
- serverless-plugin-resource-tagging
provider:
name: XXX
stackTags:
Tag1: "Tag1 value"
Tag2: "Tag2 value"
The trickiest part is to add a new inbound rule in the DB's security group to allow the Lambda to query the DB.
service: some-poc
custom:
stage: ${opt:stage, 'dev'}
plugins:
- serverless-iam-roles-per-function
- serverless-plugin-resource-tagging
provider:
name: aws
stackTags:
project: some-poc
runtime: nodejs10.x
memorySize: 512
profile: your-profile
stage: ${self:custom.stage}
region: ap-southeast-2
functions:
main:
handler: index.handler
timeout: 30
vpc:
securityGroupIds:
- sg-1234-lambda
subnetIds:
- subnet-1234-a-private-subnet
- subnet-1234-a-private-subnet
events:
- http:
path: /
method: ANY
- http:
path: /{any+}
method: ANY
iamRoleStatementsInherit: true
resources:
Resources:
MSSQLInboundRule:
Type: AWS::EC2::SecurityGroupIngress
Properties:
Description: Some PoC - Access from lambda to MSSQL
IpProtocol: tcp
FromPort: 1433
ToPort: 1433
SourceSecurityGroupId: sg-1234-lambda
GroupId: sg-1234-db
index.js
exports.handler = function(event, context, callback) {
const records = event.Records
console.log(JSON.stringify(records, null, ' '))
return callback(null, 'records processed')
}
Where a Record
is similar to this:
{
"eventID": "771198431cedb052fc164d24a48f62c7",
"eventName": "INSERT",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "ap-southeast-2",
"dynamodb": {
"ApproximateCreationDateTime": 1567139975,
"Keys": {
"device_id": {
"N": "1"
},
"timestamp": {
"S": "2019-08-30T04:39:35.405Z"
}
},
"NewImage": {
"device_id": {
"N": "1"
},
"value": {
"S": "2"
},
"timestamp": {
"S": "2019-08-30T04:39:35.405Z"
}
},
"SequenceNumber": "548300000000007249139847",
"SizeBytes": 94,
"StreamViewType": "NEW_IMAGE"
},
"eventSourceARN": "arn:aws:dynamodb:ap-southeast-2:123456221:table/number_dev/stream/2019-08-30T04:35:12.392"
}
serverless.yml
service: lambda_with_dynamodb_stream
custom:
stage: ${opt:stage, 'dev'}
table:
number: number_${self:custom.stage}
provider:
name: aws
runtime: nodejs10.x
profile: neap
stage: ${self:custom.stage}
region: ap-southeast-2
iamRoleStatements:
- Effect: 'Allow'
Action:
- dynamodb:DescribeStream
- dynamodb:GetRecords
- dynamodb:GetShardIterator
- dynamodb:ListStreams
Resource:
Fn::GetAtt:
- NumberTable
- Arn
functions:
stream:
handler: index.handler
events:
- stream:
type: dynamodb
arn:
Fn::GetAtt:
- NumberTable
- StreamArn
resources:
Resources:
NumberTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: ${self:custom.table.number}
AttributeDefinitions:
- AttributeName: device_id
AttributeType: N
- AttributeName: timestamp
AttributeType: S
KeySchema:
- AttributeName: device_id
KeyType: HASH
- AttributeName: timestamp
KeyType: RANGE
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
StreamSpecification:
StreamViewType: NEW_IMAGE
Where:
ProvisionedThroughput
could have been replaced withBillingMode: PAY_PER_REQUEST
if you do not want to deal with provsioning DynamoDB.StreamSpecification
is what toggles the DynamoDB stream and whereStreamViewType
can take 4 different values:- KEYS_ONLY - Only the key attributes of the modified item are written to the stream.
- NEW_IMAGE - The entire item, as it appears after it was modified, is written to the stream.
- OLD_IMAGE - The entire item, as it appeared before it was modified, is written to the stream.
- NEW_AND_OLD_IMAGES - Both the new and the old item images of the item are written to the stream.
Provision the SQS queue as follow in the serverless.yml
:
service: simpleingest
custom:
stage: ${opt:stage, 'dev'}
plugins:
- serverless-iam-roles-per-function
provider:
name: aws
runtime: nodejs10.x
profile: neap
stage: ${self:custom.stage}
region: ap-southeast-2
functions:
dequeue:
handler: lambdas/dequeue.handler
events:
- schedule: rate(1 minute)
iamRoleStatements:
- Effect: 'Allow'
Action:
- sqs:ReceiveMessage
- sqs:DeleteMessage
Resource:
Fn::GetAtt:
- YourQueue
- Arn
environment:
YOUR_QUEUE: !Ref YourQueue
resources:
Resources:
YourQueue:
Type: AWS::SQS::Queue
As you can see, the environment variable YOUR_QUEUE
is set to !Ref YourQueue
, where ref returns the queue's url. We'll need this to use AWS SDK.
const AWS = require('aws-sdk')
const sqs = new AWS.SQS({apiVersion: '2012-11-05'})
sqs.sendMessage({ QueueUrl:process.env.YOUR_QUEUE, MessageBody:JSON.stringify({ hello: 'world' }) }, (err,data) => {
if (err)
console.log('Error - ${err.stack}')
else
console.log(data)
})
// MaxNumberOfMessages: Max is 10. 10 does not mean you'll get 10, it means you'll get at most 10.
sqs.receiveMessage({ QueueUrl:process.env.YOUR_QUEUE, MaxNumberOfMessages:10 }, (err,data) => {
if (err)
console.log('Error - ${err.stack}')
else {
console.log(data.Messages[0].Body) // Body is a string containing your message.
console.log(data.Messages[0].ReceiptHandle) // used for deletion.
}
})
sqs.deleteMessage({ QueueUrl:process.env.YOUR_QUEUE, ReceiptHandle:10 }, (err,data) => {
if (err)
console.log('Error - ${err.stack}')
else {
console.log(data.Messages[0].Body) // Body is a string containing your message.
console.log(data.Messages[0].ReceiptHandle) // used for deletion.
}
})
This section was written based on this article: Lambda Concurrency Limits and SQS Triggers Don’t Mix Well (Sometimes)
- Set the queue’s visibility timeout to at least 6 times the timeout that you configure on your function. The extra time allows for Lambda to retry if your function execution is throttled while your function is processing a previous batch.
- Set the maxReceiveCount on the queue’s redrive policy to at least 5. This will help avoid sending messages to the dead-letter queue due to throttling.
- Configure the dead-letter to retain failed messages long enough that you can move them back later to be reprocessed
If the Lambda's timeout is 30 seconds (which is the max for a Lambda configured to respond to API Gateway requests), then the VisibilityTimeout
should be 600.
serverless.yml
service: sqs-trigger-demo
plugins:
- serverless-iam-roles-per-function
provider:
name: aws
runtime: nodejs10.x
functions:
sendMessageToSQS:
handler: index.handler
timeout: 300
events:
- http:
path: /send
method: POST
iamRoleStatementsInherit: true
iamRoleStatements:
- Effect: 'Allow'
Action:
- sqs:SendMessage
Resource:
Fn::GetAtt:
- YourQueue
- Arn
environment:
YOUR_QUEUE: !Ref YourQueue
processSQS:
handler: index.handler
events:
- sqs:
arn:
Fn::GetAtt:
- YourQueue
- Arn
batchSize: 1
resources:
Resources:
YourQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: your-main-queue-name
VisibilityTimeout: 600
RedrivePolicy:
deadLetterTargetArn: !GetAtt DeadLetterQueue.Arn
maxReceiveCount: 10
DeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: my-dead-letter-queue-name
Under the resources.Resources
property of your serverless.yml
, ad the following:
YourTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: ${self:custom.your-topic-name}
The TopicName
is optional.
WARNING: To configure accesses to an SNS topic, your need to use its ARN. Usually, this is achieved with the
Fn::GetAtt
intrinsic function, but for an SNS, you have to use!Ref
:iamRoleStatements: - Effect: 'Allow' Action: - sns:Publish Resource: !Ref YourTopic
exports.handler = function(event, context, callback) {
const records = event.Records
console.log(JSON.stringify(records, null, ' '))
return callback(null, 'records processed')
}
where Records
is an array with items similar to the following:
{
"EventSource": "aws:sns",
"EventVersion": "1.0",
"EventSubscriptionArn": "arn:aws:sns:ap-southeast-2:56757897",
"Sns": {
"Type": "Notification",
"MessageId": "2f38-10a-1ba-af0-79313b1c",
"TopicArn": "arn:aws:sns:ap-southeast-2",
"Subject": "Hello Candy",
"Message": "{\n Hello: 'Candy'\n}",
"Timestamp": "2018-10-15T04:18:19.761Z",
"SignatureVersion": "1",
"Signature": "jC6KCrv6U8nU2kd01PWGGBUcur9z/NWPJ9ND7EH83uI/9Oi5RyQoZS5DA2auYOYnNMtrDFB1cfP+pjnbS==",
"SigningCertUrl": "https://sns.ap-southeast-2.amazonaws.com/SimpleNotificationService-6aad65c2f99.pem",
"UnsubscribeUrl": "https://sns.ap-southeast-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:",
"MessageAttributes": {
"hello": {
"Type": "String",
"Value": "world"
}
}
}
}