-
-
Save rmtuckerphx/fcbe28ef4b44f3645e22fa4fa75b8814 to your computer and use it in GitHub Desktop.
Aws Kinesis stream upload example in JS - get rtsp stream with ffmpeg and use putMedia to upload it on Amazon Kinesis
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const ffmpeg = require('fluent-ffmpeg') | |
const parseUrl = require('url').parse | |
// Polyfill, modifying the global Object | |
require('es6-object-assign').polyfill() | |
global.Promise = require('es6-promise').Promise | |
const aws4 = require('aws4') | |
const axios = require('axios') | |
const CancelToken = axios.CancelToken | |
const fs = require('fs') | |
const PipeViewer = require('pv') | |
function noop(){} | |
const HANGING_TIMEOUT = 20 * 1000 | |
/* | |
const STREAM_NAME = 'test-stream' | |
const DATA_ENDPOINT = 's-1e413xxx.kinesisvideo.us-east-1.amazonaws.com' // returned by getDataEndpoint | |
const pwd = { | |
endpoint: 'kinesisvideo.us-east-1.amazonaws.com', | |
region: 'us-east-1', | |
accessKeyId: " ... ", | |
secretAccessKey: " ... ", | |
} | |
*/ | |
function sendStream({mkvFile, streamName, dataEndpoint, pwd}, onStart, onFinish){ | |
onFinish = onFinish || noop | |
const opts = { | |
service: 'kinesisvideo', | |
host: dataEndpoint, | |
path: '/putMedia', | |
method: 'POST', // NB. required! | |
// body: Buffer.from('UNSIGNED-PAYLOAD'), | |
headers: { | |
'Content-Type': 'application/json', | |
'x-amzn-stream-name': streamName, | |
"x-amzn-fragment-timecode-type": "ABSOLUTE", | |
"x-amz-content-sha256":"UNSIGNED-PAYLOAD" | |
} | |
} | |
const signed = aws4.sign(opts, pwd) | |
console.log(signed) | |
const reqUri = 'https://' + opts.host+opts.path | |
let cancel; | |
axios({ | |
method:'POST', | |
timeout: 40*1000, | |
url: reqUri, | |
headers: signed.headers, | |
data: mkvFile, | |
responseType: 'stream', | |
maxContentLength: Infinity, // required for stream! | |
cancelToken: new CancelToken(function executor(c) { | |
// An executor function receives a cancel function as a parameter | |
cancel = c | |
}) | |
}).then(function(res) { | |
onStart && onStart(cancel) | |
res.data.on('data', function(fragRes){ | |
console.log(fragRes.toString()) | |
fragRes = JSON.parse(fragRes.toString()) | |
const EventType = fragRes.EventType | |
const ErrorCode = fragRes.ErrorCode | |
if (EventType === 'ERROR') { | |
// request aborted | |
onFinish(new Error('Fragment event type: ERROR - ' + ErrorCode)) | |
} | |
}) | |
}).catch(onFinish) | |
} | |
function startUpload ({rtspUri, streamName, dataEndpoint, pwd}){ | |
// get stream | |
const ffmpegVideo = getMkvStream(rtspUri, null, function(err){ | |
// rtsp error | |
if (err) { | |
startUpload({rtspUri, streamName, dataEndpoint}) // start again | |
} | |
}) | |
const pv = PipeViewer() | |
pv.on('info', function(str){ | |
console.log('Speed: '+str.speed+' - Transferred: '+str.transferred) | |
}) | |
// start send stream | |
sendStream({mkvFile: ffmpegVideo.pipe(pv), streamName, dataEndpoint, pwd}, function started(cancel){ | |
setTimeout(function () { | |
cancel() // stop upload | |
ffmpegVideo.kill() // stop stream and raise err cb above | |
}, 40*60*1000) // 40 min | |
}, function finished(err) { | |
if (err) console.error(err) | |
}) | |
} | |
function getMkvStream(uri, onStart, onFinished){ | |
let processHanging = null | |
const isRtsp = parseUrl(uri).protocol === 'rtsp:' | |
const video = ffmpeg(uri) // also file '/media/desk/Test.mkv' | |
//.videoBitrate(500) | |
.inputOptions(isRtsp ? ['-rtsp_transport tcp', '-re'] : []) | |
.format('matroska') | |
.videoCodec('libx264').fps(15) | |
.outputOptions(['-map 0:0', '-an', '-g 15', '-vsync 1', '-profile:v baseline', | |
'-reset_timestamps 1', | |
'-quality good', | |
'-cpu-used 2', | |
'-pix_fmt yuv420p' | |
]) | |
.on('progress', function(progress) { | |
console.log('Processing:', progress) | |
clearTimeout(processHanging) | |
processHanging = setTimeout(function(){ | |
console.error('Hanging for more than 20 sec.') | |
video.kill() | |
}, HANGING_TIMEOUT) // hanging for more than x sec | |
}) | |
.on('start', function(commandLine) { | |
console.log('Spawned Ffmpeg with command: ' + commandLine) | |
onStart && onStart() | |
}) | |
.on('end', function() { | |
console.log('Finished processing') | |
onFinished && onFinished() | |
}) | |
.on('error', function(err, stdout, stderr) { | |
console.log('Cannot process video: ' + err.message) | |
clearTimeout(processHanging) | |
onFinished && onFinished(err) // eventually restart | |
}) | |
return video | |
} | |
/* | |
// save stream on file: | |
// getMkvStream('rtsp://admin:@192.168.69.103:554/live1.sdp').pipe().pipe(require('fs').createWriteStream('./test.mkv'), null, console.error) | |
// Kinesis upload USAGE: | |
startUpload({ | |
rtspUri: 'rtsp://admin:@192.168.88.9:554/live1.sdp', | |
streamName: 'demo-stream', | |
dataEndpoint: 's-4010bf70.kinesisvideo.us-west-2.amazonaws.com', | |
pwd: { | |
endpoint: 'kinesisvideo.us-west-2.amazonaws.com', | |
region: 'us-west-2', | |
accessKeyId: " ... ", | |
secretAccessKey: " ... ", | |
} | |
}) | |
// it spawns ffmpeg and upload stream | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment