Created
May 24, 2019 04:17
-
-
Save danshev/0ecfe66e5c28b156e1b4bff08c77cc13 to your computer and use it in GitHub Desktop.
TeslaCam-Identify-Sets-and-Kickoff (for AWS Lambda)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Function: TeslaCam-Identify-Sets-and-Kickoff | |
Runtime: Node.js 10.x | |
Environment: AWS Lambda | |
Description: | |
This function is meant to be triggered by a periodic CloudWatch Event. | |
Upon execution, it will scan an S3 bucket (containing TeslaCam files), | |
identify videos sets which: | |
a) have not been previously processed, and | |
b) have a "full set" of TeslaCam videos to merge | |
Environment Variables: | |
- FULL_SET = 3 (b/c TeslaCam currently outputs three streams) | |
- S3_BUCKET = the.name.of.your.bucket | |
- MERGE_LAMBDA = the-name-of-your-ffmpeg-merging-AWS-Lambda-function | |
- LAMBDA_REGION = your-aws-region | |
Assumptions: | |
1. Your S3 bucket is only used for TeslaCam work | |
2. Video files are stored on S3 in the same manner they are saved to | |
disk under the TeslaCam/SavedClips folder -- that is, a S3 object's | |
key value == '2019-04-20_20-45-36/2019-04-20_20-35-left_repeater.mp4' | |
3. This Lambda function has a permissions Role enabling it to: | |
a. ReadS3 | |
b. Invoke other Lambdas | |
c. (Recommended) Write to CloudWatch Logs | |
*/ | |
let AWS = require('aws-sdk'); | |
let s3 = new AWS.S3(); | |
exports.handler = async (event) => { | |
var myProxyResponseObj = { | |
statusCode: 500, | |
body: JSON.stringify('It worked!'), | |
}; | |
// Assuming they're stored with the prefix 'fullscreen', retrieve a list of | |
// previously-processed videos. | |
var data; | |
try { data = await s3.listObjectsV2({ Bucket: process.env.S3_BUCKET, Prefix: 'fullscreen' }).promise(); } | |
catch (err) { | |
console.log(err); | |
myProxyResponseObj.statusCode = err.statusCode; | |
myProxyResponseObj.body = JSON.stringify({ "message": err.message }); | |
} | |
// Extract the datetime values from the previously-processed list | |
var processed = []; | |
for (const obj of data.Contents) { | |
// example: fullscreen/2019-04-20_20-38.mp4 ==> 2019-04-20_20-38 | |
let identifier = obj.Key.split("/")[1].match(/(.*)(?=.mp4)/g)[0]; | |
processed.push(identifier); | |
} | |
// Retrieve a list of videos prefixed by '20' (e.g., '2019-05-20') ... the raw videos | |
try { data = await s3.listObjectsV2({ Bucket: process.env.S3_BUCKET, Prefix: '20'}).promise(); } | |
catch (err) { | |
console.log(err); | |
myProxyResponseObj.statusCode = err.statusCode; | |
myProxyResponseObj.body = JSON.stringify({ "message": err.message }); | |
} | |
// Filter the list of raw files by the list of previously-processed files | |
var toProcess = []; | |
for (const obj of data.Contents) { | |
var match = false; | |
for (const prev of processed) { | |
let res = obj.Key.indexOf(prev); | |
if (res >= 0) { | |
match = true; | |
break; | |
} | |
} | |
if (!match) toProcess.push(obj.Key); | |
} | |
// Scan the list of toProcess files; count each occurence | |
var parts = {}; | |
for (const obj of toProcess) { | |
// example obj: 2019-04-20_20-45-36/2019-04-20_20-35-left_repeater.mp4 | |
let pieces = obj.split('/'); | |
let event = pieces[0]; | |
let filename = pieces[1]; | |
let timewindow = filename.match(/^(.*[\\-])/g)[0]; | |
if (!parts[timewindow]) parts[timewindow] = { 'count': 1, 'event': event }; | |
else parts[timewindow]['count']++; | |
} | |
// Filter to those that have a "full set" of constituent files needed to make a composite. | |
var qualified = []; | |
for (const timewindow of Object.keys(parts)) { | |
if (parts[timewindow]['count'] == process.env.FULL_SET) { | |
qualified.push(parts[timewindow]['event'] +"/"+ timewindow); | |
} | |
} | |
// Parallelize processing by handing off each qualified set to its own "processing" Lambda | |
var promises = []; | |
for (const key of qualified) { | |
let lambda = new AWS.Lambda({ region: process.env.LAMBDA_REGION }); | |
let lambdaParams = { | |
FunctionName: process.env.MERGE_LAMBDA, | |
Payload: JSON.stringify({ 'event': key }, null, 2), | |
InvocationType: 'Event' | |
}; | |
promises.push(lambda.invoke(lambdaParams).promise()); | |
} | |
try { await Promise.all(promises); } | |
catch (err) { | |
console.log(err); | |
return myProxyResponseObj; | |
} | |
myProxyResponseObj.statusCode = 200; | |
return myProxyResponseObj; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment