Last active
April 1, 2024 14:13
-
-
Save ronnyroeller/6d244c5bcee2d71c346bf39c7fa04bad to your computer and use it in GitHub Desktop.
Lambda function to provide webrtc signaling server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { ApiGatewayManagementApi, DynamoDB } from 'aws-sdk'; | |
import { scanItems } from './dynamodb'; | |
const { AWS_REGION, TOPICS_TABLE } = process.env; | |
const dynamoDb = new DynamoDB({ | |
apiVersion: '2012-08-10', | |
region: AWS_REGION, | |
}); | |
// Message structure and protocol flow taken from y-webrtc/bin/server.js | |
interface YWebRtcSubscriptionMessage { | |
type: 'subscribe' | 'unsubscribe'; | |
topics?: string[]; | |
} | |
interface YWebRtcPingMessage { | |
type: 'ping'; | |
} | |
interface YWebRtcPublishMessage { | |
type: 'publish'; | |
topic?: string; | |
[k: string]: any; | |
} | |
async function subscribe(topic: string, connectionId: string) { | |
try { | |
return await dynamoDb | |
.updateItem({ | |
TableName: TOPICS_TABLE, | |
Key: { name: { S: topic } }, | |
UpdateExpression: 'ADD receivers :r', | |
ExpressionAttributeValues: { | |
':r': { SS: [connectionId] }, | |
}, | |
}) | |
.promise(); | |
} catch (err) { | |
console.log(`Cannot update topic ${topic}: ${err.message}`); | |
} | |
} | |
async function unsubscribe(topic: string, connectionId: string) { | |
try { | |
return await dynamoDb | |
.updateItem({ | |
TableName: TOPICS_TABLE, | |
Key: { name: { S: topic } }, | |
UpdateExpression: 'DELETE receivers :r', | |
ExpressionAttributeValues: { | |
':r': { SS: [connectionId] }, | |
}, | |
}) | |
.promise(); | |
} catch (err) { | |
console.log(`Cannot update topic ${topic}: ${err.message}`); | |
} | |
} | |
async function getReceivers(topic: string) { | |
try { | |
const { Item: item } = await dynamoDb | |
.getItem({ | |
TableName: TOPICS_TABLE, | |
Key: { name: { S: topic } }, | |
}) | |
.promise(); | |
return item?.receivers ? item.receivers.SS : []; | |
} catch (err) { | |
console.log(`Cannot get topic ${topic}: ${err.message}`); | |
return []; | |
} | |
} | |
async function handleYWebRtcMessage( | |
connectionId: string, | |
message: | |
| YWebRtcSubscriptionMessage | |
| YWebRtcPublishMessage | |
| YWebRtcPingMessage, | |
send: (receiver: string, message: any) => Promise<void>, | |
) { | |
const promises = []; | |
if (message && message.type) { | |
switch (message.type) { | |
case 'subscribe': | |
(message.topics || []).forEach(topic => { | |
promises.push(subscribe(topic, connectionId)); | |
}); | |
break; | |
case 'unsubscribe': | |
(message.topics || []).forEach(topic => { | |
promises.push(unsubscribe(topic, connectionId)); | |
}); | |
break; | |
case 'publish': | |
if (message.topic) { | |
const receivers = await getReceivers(message.topic); | |
receivers.forEach(receiver => { | |
promises.push(send(receiver, message)); | |
}); | |
} | |
break; | |
case 'ping': | |
promises.push(send(connectionId, { type: 'pong' })); | |
break; | |
} | |
} | |
await Promise.all(promises); | |
} | |
function handleConnect(connectionId: string) { | |
// Nothing to do | |
console.log(`Connected: ${connectionId}`); | |
} | |
async function handleDisconnect(connectionId: string) { | |
console.log(`Disconnected: ${connectionId}`); | |
// Remove the connection from all topics | |
// This is quite expensive, as we need to go through all topics in the table | |
const promises = []; | |
for await (const item of scanItems(dynamoDb, TOPICS_TABLE)) { | |
const receivers = item.receivers?.SS ?? []; | |
if (receivers.includes(connectionId)) { | |
promises.push(unsubscribe(item.name.S, connectionId)); | |
} | |
} | |
await Promise.all(promises); | |
} | |
export async function handler( | |
event: HttpV2WebsocketEvent, | |
): Promise<HttpV2Response> { | |
if (!TOPICS_TABLE) { | |
return { statusCode: 502, body: 'Not configured' }; | |
} | |
// The AWS "simple chat" example uses event.requestContext.domainName/...stage, but that doesn't work with custom domain | |
// names. It also doesn't matter, this is anyways an internal (AWS->AWS) call. | |
const apigwManagementApi = new ApiGatewayManagementApi({ | |
apiVersion: '2018-11-29', | |
endpoint: `https://${event.requestContext.apiId}.execute-api.${AWS_REGION}.amazonaws.com/${event.requestContext.stage}`, | |
}); | |
const send = async (connectionId: string, message: any) => { | |
try { | |
await apigwManagementApi | |
.postToConnection({ | |
ConnectionId: connectionId, | |
Data: JSON.stringify(message), | |
}) | |
.promise(); | |
} catch (err) { | |
if (err.statusCode === 410) { | |
console.log(`Found stale connection, deleting ${connectionId}`); | |
await handleDisconnect(connectionId); | |
} else { | |
// Log, but otherwise ignore: There's not much we can do, really. | |
console.log(`Error when sending to ${connectionId}: ${err.message}`); | |
} | |
} | |
}; | |
try { | |
switch (event.requestContext.routeKey) { | |
case '$connect': | |
handleConnect(event.requestContext.connectionId); | |
break; | |
case '$disconnect': | |
await handleDisconnect(event.requestContext.connectionId); | |
break; | |
case '$default': | |
await handleYWebRtcMessage( | |
event.requestContext.connectionId, | |
JSON.parse(event.body), | |
send, | |
); | |
break; | |
} | |
return { statusCode: 200 }; | |
} catch (err) { | |
console.log(`Error ${event.requestContext.connectionId}`, err); | |
return { statusCode: 500, body: err.message }; | |
} | |
} |
My serverless.yml
looks like this:
org: signalling-server
app: signalling-server
service: signalling-server
frameworkVersion: '3'
plugins:
- serverless-plugin-typescript
functions:
YWebRtcSignaling:
handler: handler.handler
events:
- websocket:
route: $connect
- websocket:
route: $disconnect
- websocket:
route: $default
provider:
name: aws
runtime: nodejs14.x
environment:
TOPICS_TABLE:
Ref: YWebRtcTopicsTable
iam:
role:
statements:
- Effect: Allow
Action:
- dynamodb:Query
- dynamodb:Scan
- dynamodb:GetItem
- dynamodb:PutItem
- dynamodb:UpdateItem
- dynamodb:DeleteItem
Resource:
- Fn::GetAtt: [ YWebRtcTopicsTable, Arn ]
resources:
Resources:
YWebRtcTopicsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: y-webrtc-topics-${sls:stage}
KeySchema:
- AttributeName: name
KeyType: HASH
AttributeDefinitions:
- AttributeName: name
AttributeType: S
BillingMode: PAY_PER_REQUEST
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
scanItems is listed here: