-
-
Save cdgraff/6f85add91c094ffa7f15e4e1ccd8cd19 to your computer and use it in GitHub Desktop.
Script for live video streaming monitoring with notifications
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
''' | |
Script to monitor live streams and send an Amazon SNS if the stream is down (and possibly take restorative action) | |
Future: | |
Black detect: ffmpeg -i out.mp4 -vf blackdetect -f null - | |
Note the following doesn't seem to fully work without looking at the debug logs | |
On live ffmpeg -y -i rtmp://cp30129.live.edgefcs.net/live/videoops-videoops@50541 -vf blackdetect -t 10 -loglevel debug -f null - | |
''' | |
from boto import boto | |
import sys | |
import os | |
from subprocess import PIPE, Popen | |
from threading import Thread | |
import time | |
import logging | |
try: | |
from Queue import Queue, Empty | |
except ImportError: | |
from queue import Queue, Empty # python 3.x | |
logging.basicConfig(level=logging.DEBUG, | |
format='%(asctime)s %(message)s', | |
handlers=[logging.FileHandler("encoder.log"), | |
logging.StreamHandler()]) | |
logging.info("Starting") | |
# You will need to go into your Amazon Console and create a new topic ARN for this | |
AWS_TOPIC_ARN = "" | |
AWS_KEY = "" | |
AWS_SECRET = "" | |
# Set how often you want to check in seconds | |
SLEEP_TIME = 30 | |
FFPROBE = "/usr/local/bin/ffprobe" | |
# List of streams to check | |
CHECK_STREAMS = ['http://wowsyd.sinclairmediatech.com/live/canberra/playlist.m3u8', 'http://repackage.video.news.com.au/live/canberra/playlist.m3u8'] | |
ON_POSIX = 'posix' in sys.builtin_module_names | |
def process_line(std, q): | |
partialLine = "" | |
tmpLines=[] | |
end_of_message = False | |
while (True): | |
data = std.read(10) | |
#print repr(data) | |
#break when there is no more data | |
if len(data) == 0: | |
end_of_message = True | |
#data needs to be added to previous line | |
if ((not "\n" in data) and (not end_of_message)): | |
partialLine += data | |
#lines are terminated in this string | |
else: | |
tmpLines = [] | |
#split by \n | |
split = data.split("\n") | |
#add the rest of partial line to first item of array | |
if partialLine != "": | |
split[0] = partialLine + split[0] | |
partialLine = "" | |
#add every item apart from last to tmpLines array | |
if len(split) > 1: | |
for i in range(len(split)-1): | |
tmpLines.append(split[i]) | |
#last item is '' if data string ends in \r | |
#last line is partial, save for temporary storage | |
if split[-1] != "": | |
partialLine = split[-1] | |
#last line is terminated | |
else: | |
tmpLines.append(split[-1]) | |
#print split[0] | |
q.put(split[0]) | |
if (end_of_message): | |
#print partialLine | |
break | |
def enqueue_output(stdout, queue): | |
#for line in iter(stdout.readline, b''): | |
# queue.put(line) | |
process_line(stdout, queue) | |
stdout.close() | |
def run(q, ffcmd, thread_name): | |
logging.info("Running " + ffcmd) | |
p = Popen(ffcmd,shell=True, stderr=PIPE, stdin=PIPE, bufsize=1, close_fds=ON_POSIX) | |
#q = Queue() | |
#t = Thread(target=enqueue_output, args=(p.stdout, q)) | |
#t.daemon = True # thread dies with the program | |
#t.start() | |
t = Thread(target=enqueue_output, name=thread_name, args=(p.stderr, q)) | |
t.daemon = True | |
t.start() | |
return p | |
def probe(stream): | |
probeq = Queue() | |
logging.info("Checking " + stream) | |
probeproc = run(probeq, FFPROBE + " " + stream, "probethread") | |
# Need to read from the queue until the queue is empty and process has exited | |
while (True): | |
#print "Queue not empty" | |
# print probeq.join() | |
try: | |
line = probeq.get_nowait() | |
logging.info(line) | |
''' | |
Possible error responses: | |
Server error: Failed to play stream | |
Input/output error | |
Need a regex to match for video found! | |
Sample: Stream #0:0: Video: h264 (Baseline), yuv420p, 640x360 [SAR 1:1 DAR 16:9], 655 kb/s, 25 tbr, 1k tbn, 50 tbc | |
Should also look for audio | |
''' | |
if ("Stream #0:0: Video" in line): | |
logging.info("Found stream " + stream) | |
logging.info(line) | |
return True | |
if ("Stream #0:1: Video" in line): | |
logging.info("Found stream " + stream) | |
logging.info(line) | |
return True | |
elif ("error" in line): | |
return False | |
elif (probeproc.poll() > 0): | |
return False | |
# Make sure we don't spin out of control | |
#time.sleep(1) | |
except Empty: | |
pass | |
return False | |
def send_message(msg): | |
sns = boto.connect_sns(aws_access_key_id=AWS_KEY, aws_secret_access_key=AWS_SECRET) | |
print sns | |
subj = "Problem with streaming" | |
res = sns.publish(AWS_TOPIC_ARN, msg, subj) | |
def live_probe(): | |
pass | |
# Endless loop for monitoring | |
while(True): | |
for stream in CHECK_STREAMS: | |
if (probe(stream)): | |
pass | |
else: | |
logging.info("Error with " + stream + " sending alert") | |
''' | |
This needs to check if there has already been an error message sent | |
If so it shouldn't send another one for x mins | |
Also needs to send a message when the problem is cleared | |
''' | |
send_message(stream) | |
time.sleep(SLEEP_TIME) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment