Created
December 20, 2020 20:06
-
-
Save tanepiper/873e5332a2f98ba0c0e5bf876b68b807 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Video Transcoder Daemon | |
@author Tane Piper | |
The Transcoder Daemon is an application that runs as a background process | |
on a unix server. Using web.py, it acts as a web application listening on | |
a set port. It takes incoming /queue requests which contain POST information. | |
The simple queue, just takes in a filename for input and output and some | |
quality information. | |
A complex request (yet to be implemented) takes a full range of settings in | |
order to enable more complex encoding in situations such as a file failing | |
to be read. | |
The main encoder is a 3rd party application, mencoder, and the daemon creates threads | |
to handle these requests. | |
Contents | |
0. DON'T TOUCH | |
1. Header Area | |
2. Thread and Queue Classes & Methods | |
3. Video Class | |
4. The Main Daemon class | |
5. Startup stuff | |
""" | |
""" | |
0. DON'T TOUCH | |
This section below is where we set up our imports and module information such | |
as the version and app name | |
""" | |
import logging, sys | |
import os, time, atexit | |
from signal import SIGTERM | |
import threading, Queue, time | |
import web | |
import subprocess | |
import urllib | |
import urllib2 | |
import json | |
import optparse | |
__version_info__ = ('0', '3', '0') | |
__version__ = '.'.join(__version_info__) | |
__str__ = "VidioWiki Video Transcoder Daemon" | |
APP_ID = "vwdaemon" | |
APP_LOGGING = "/var/log/%s.log" % APP_ID | |
PID_FILE = "/var/run/%s.pid" % APP_ID | |
# This is a hack to fix the race condition in the subprocess module | |
subprocess._cleanup = lambda: None | |
URLS = ( | |
"/queue", "QueueIncoming", | |
) | |
# The inputs this line takes, with defaults are: input, output, abr:rb=56, | |
# flv:bbitrate=500, srate=22050 | |
ENCODER_ONE_PASS =""" | |
mencoder %s -o %s -of lavf -oac mp3lame -lameopts abr:br=%d -ovc lavc \ | |
-lavcopts vcodec=flv:vbitrate=%d:mbd=2:mv0:trell:v4mv:cbp:last_pred=3 \ | |
-srate %d | |
""" | |
# The inputs this line takes, with defaults are: input, output, abr:rb=56, | |
# flv:bbitrate=500, vpass=2, srate=22050 | |
ENCODER_MULTI_PASS = """ | |
mencoder %s -o %s -of lavf -oac mp3lame -lameopts abr:br=%d -ovc lavc \ | |
-lavcopts vcodec=flv:vbitrate=%d:mbd=2:mv0:trell:v4mv:cbp:last_pred=3:vpass=%d \ | |
-srate %d | |
""" | |
############################################################################### | |
""" | |
1. Header Area | |
""" | |
# The Base path of the video content is | |
BASE_PATH = '/home/tanep/vwtranscoder/' | |
# The location of where the incoming videos are | |
VIDEO_PATH = BASE_PATH + 'videos/' | |
# The location to put the output video | |
OUTPUT_PATH = BASE_PATH + 'output/' | |
# The port this application runs on | |
PORT = "1337" | |
# The URL to post to when a video encode is complete | |
POST_URL = 'http://localhost' | |
# The maximum number of threads this app should create for encoding | |
MAX_THREADS = 4 | |
# The Debugging level for output. Debugging is logging.DEBUG while production is | |
# logging.INFO | |
LOGGING_LEVEL = logging.DEBUG | |
""" | |
The values below are the default values for video encoding in one pass | |
""" | |
DEFAULT_ABR_RB = 56 | |
DEFAULT_FLV_BITRATE = 500 | |
DEFAULT_VPASS = 0 | |
DEFAULT_SRATE = 22050 | |
############################################################################### | |
# TRY NOT TO TOUCH BELOW THIS LINE # | |
############################################################################### | |
""" | |
2. Thread and Queue Classes & Methods | |
""" | |
task_queue = Queue.Queue() | |
done_queue = Queue.Queue() | |
task_pool = [] | |
done_pool = [] | |
FLAG_INIT = 0 | |
FLAG_STOP = -1 | |
FLAG_STAGE_1 = 1 | |
FLAG_STAGE_2 = 2 | |
def start_threads(in_threads=5, out_threads=3): | |
""" | |
This method is called to start the number of threads required | |
""" | |
for i in range(in_threads): | |
thread = threading.Thread(target=process_incoming) | |
thread.start() | |
task_pool.append(thread) | |
for j in range(out_threads): | |
thread = threading.Thread(target=process_done) | |
thread.start() | |
done_pool.append(thread) | |
def stop_task_threads(): | |
""" | |
A method to stop all threads running | |
""" | |
for i in range(len(task_pool)): | |
put(task_queue, None, FLAG_STOP) | |
while task_pool: | |
time.sleep(1) | |
for index, the_thread in enumerate(task_pool): | |
if the_thread.isAlive(): | |
continue | |
else: | |
del task_pool[index] | |
break | |
def stop_done_threads(): | |
""" | |
A method to stop all threads running | |
""" | |
for i in range(len(done_pool)): | |
put(done_queue, None, FLAG_STOP) | |
while done_pool: | |
time.sleep(1) | |
for index, the_thread in enumerate(done_pool): | |
if the_thread.isAlive(): | |
continue | |
else: | |
del done_pool[index] | |
break | |
def put(target_queue, video, flag=FLAG_INIT): | |
""" | |
Method called to put an item into the queue with a flag | |
""" | |
try: | |
logger.debug("Putting item %s in queue at stage %d" % (video.in_file, flag)) | |
target_queue.put( [ flag, video ] ) | |
except: | |
logger.error("Failed to put item %s into queue" % video.in_file) | |
def process_incoming(): | |
""" | |
This method is the main thread method for taking incoming items | |
in a post request and handing them off to be processed. | |
""" | |
flag = FLAG_INIT | |
while flag != FLAG_STOP: | |
try: | |
#Get the flag and the video from the queue | |
flag, video = task_queue.get() | |
logger.debug("Processing %s at stage %d" % (video.in_file, flag)) | |
#Ok, now lets check the flag | |
if flag == FLAG_STAGE_1: | |
do_subprocess(video) | |
else: | |
logger.error("Unknown flag received in incoming queue") | |
except: | |
pass | |
def process_done(): | |
""" | |
This method is the main thread method for passing out done items | |
""" | |
flag = FLAG_INIT | |
while flag != FLAG_STOP: | |
try: | |
#Get the flag and the video from the queue | |
flag, video = done_queue.get() | |
logger.debug("Processing %s at stage %d" % (video.in_file, flag)) | |
#Ok, now lets check the flag | |
if flag == FLAG_STAGE_2: | |
report_status(video) | |
else: | |
logger.error("Unknown flag received in done queue") | |
except: | |
pass | |
def do_subprocess(video, report=True, | |
video_path=VIDEO_PATH, output_path=OUTPUT_PATH): | |
video.success = False | |
try: | |
in_file = "%s%s" % (video_path, video.in_file) | |
out_file = "%s%s" % (output_path, video.out_file) | |
command_line = ENCODER_ONE_PASS % (in_file, out_file, | |
video.abr_rb, video.flv_bitrate, | |
video.srate) | |
logger.debug("%s" % command_line) | |
ret = subprocess.Popen(command_line, | |
shell=True, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, | |
close_fds=True) | |
#ret.wait() | |
results = ret.communicate()[0] | |
video.success = True | |
except: | |
logger.error("There was an error processing the video using the following: %s" % | |
command_line) | |
pass | |
if report: | |
put(done_queue, video, FLAG_STAGE_2) | |
def report_status(video): | |
if video.report_attempt < 3: | |
try: | |
data = urllib.urlencode(video.getObject()) | |
req = urllib2.Request(POST_URL, data) | |
response = urllib2.urlopen(req) | |
logger.debug("Successfully reported back %s" % video.in_file) | |
except: | |
video.report_attempt += 1 | |
time.sleep(5) | |
put(done_queue, video, FLAG_STAGE_2) | |
else: | |
logger.error("Failed to report back video %s" % video.in_file) | |
############################################################################### | |
# 3. Web.py Classes and Methods # | |
############################################################################### | |
class QueueIncoming: | |
def GET(self): | |
try: | |
i = web.input(in_file="None") | |
video = Video(in_file=i.in_file, abr_rb=i.abr_rb, | |
flv_bitrate=i.flv_bitrate, vpass=i.vpass, | |
srate=i.srate) | |
put(task_queue, video, FLAG_STAGE_1) | |
except: | |
logger.error("Get Failed") | |
return "%" % "Received" | |
def POST(self): | |
try: | |
i = web.input(in_file="None", abr_rb=DEFAULT_ABR_RB, | |
flv_bitrate=DEFAULT_FLV_BITRATE, vpass=DEFAULT_VPASS, | |
srate=DEFAULT_SRATE) | |
video = Video(in_file=i.in_file, abr_rb=i.abr_rb, | |
flv_bitrate=i.flv_bitrate, vpass=i.vpass, | |
srate=i.srate) | |
put(task_queue, video, FLAG_STAGE_1) | |
except: | |
logger.error("Post Failed") | |
return "%s" % "Received" | |
""" | |
3. Video Class | |
""" | |
class Video: | |
def __init__(self, **kwargs): | |
self.in_file = kwargs['in_file'] | |
self.success = False | |
self.report_attempt = 0 | |
self.abr_rb = kwargs['abr_rb'] | |
self.flv_bitrate = kwargs['flv_bitrate'] | |
self.vpass = kwargs['vpass'] | |
self.srate = kwargs['srate'] | |
self.splitInFile() | |
def __str__(self): | |
return "%s" % self.getObject() | |
def getObject(self): | |
values = { | |
"in_file": self.in_file, | |
"out_file": self.out_file, | |
"success": self.success, | |
"report_attempt": self.report_attempt, | |
"abr_rb": self.abr_rb, | |
"flv_bitrate": self.flv_bitrate, | |
"vpass": self.vpass, | |
"srate": self.srate | |
} | |
return values | |
def splitInFile(self): | |
words = self.in_file.split(".") | |
ext = words[len(words) - 1] | |
self.out_file = words[0] + ".flv" | |
self.in_ext = ext | |
""" | |
4. The Main Daemon class | |
""" | |
class Daemon(object): | |
""" | |
The base vm_encoder deamon | |
Usage: subclass the Daemon class and override the run() method | |
""" | |
def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'): | |
self.stdin = stdin | |
self.stdout = stdout | |
self.stderr = stderr | |
self.pidfile = pidfile | |
def daemonize(self): | |
""" | |
do the UNIX double-fork magic, see Stevens' "Advanced | |
Programming in the UNIX Environment" for details (ISBN 0201563177) | |
http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16 | |
""" | |
try: | |
pid = os.fork() | |
if pid > 0: | |
# exit first parent | |
sys.exit(0) | |
except OSError, e: | |
sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror)) | |
sys.exit(1) | |
# decouple from parent environment | |
os.chdir("/") | |
os.setsid() | |
os.umask(0) | |
# do second fork | |
try: | |
pid = os.fork() | |
if pid > 0: | |
# exit from second parent | |
sys.exit(0) | |
except OSError, e: | |
sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror)) | |
sys.exit(1) | |
# redirect standard file descriptors | |
sys.stdout.flush() | |
sys.stderr.flush() | |
si = file(self.stdin, 'r') | |
so = file(self.stdout, 'a+') | |
se = file(self.stderr, 'a+', 0) | |
os.dup2(si.fileno(), sys.stdin.fileno()) | |
os.dup2(so.fileno(), sys.stdout.fileno()) | |
os.dup2(se.fileno(), sys.stderr.fileno()) | |
# write pidfile | |
atexit.register(self.delpid) | |
pid = str(os.getpid()) | |
file(self.pidfile,'w+').write("%s\n" % pid) | |
def delpid(self): | |
os.remove(self.pidfile) | |
def start(self): | |
""" | |
Start the daemon | |
""" | |
# Check for a pidfile to see if the daemon already runs | |
try: | |
pf = file(self.pidfile,'r') | |
pid = int(pf.read().strip()) | |
pf.close() | |
except IOError: | |
pid = None | |
if pid: | |
message = "pidfile %s already exist. Daemon already running?\n" | |
sys.stderr.write(message % self.pidfile) | |
sys.exit(1) | |
# Start the daemon | |
self.daemonize() | |
self.run() | |
def stop(self): | |
""" | |
Stop the daemon | |
""" | |
# Get the pid from the pidfile | |
try: | |
pf = file(self.pidfile,'r') | |
pid = int(pf.read().strip()) | |
pf.close() | |
except IOError: | |
pid = None | |
if not pid: | |
message = "pidfile %s does not exist. Daemon not running?\n" | |
sys.stderr.write(message % self.pidfile) | |
return # not an error in a restart | |
# Try killing the daemon process | |
try: | |
while 1: | |
os.kill(pid, SIGTERM) | |
time.sleep(0.1) | |
except OSError, err: | |
err = str(err) | |
if err.find("No such process") > 0: | |
if os.path.exists(self.pidfile): | |
os.remove(self.pidfile) | |
else: | |
print str(err) | |
sys.exit(1) | |
def restart(self): | |
""" | |
Restart the daemon | |
""" | |
self.stop() | |
self.start() | |
def run(self): | |
""" | |
You should override this method when you subclass Daemon. It will be called | |
after the process has been daemonized by start() or restart(). | |
""" | |
""" | |
5. Startup stuff | |
""" | |
def setup_logging(): | |
""" | |
This method is called from the line below and sets up a global logger to | |
use within the application | |
""" | |
logger = logging.getLogger(APP_ID) | |
hdlr = logging.FileHandler(APP_LOGGING) | |
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') | |
hdlr.setFormatter(formatter) | |
logger.addHandler(hdlr) | |
logger.setLevel(LOGGING_LEVEL) | |
return logger | |
logger = setup_logging() | |
class VideoQueue(Daemon): | |
""" | |
This class overides the daemon run method and allows us to execute code after | |
the daemon has been created | |
""" | |
def run(self): | |
""" | |
In this method we create the threading and web.py app | |
""" | |
try: | |
start_threads(MAX_THREADS, MAX_THREADS) | |
app = web.run(URLS, globals()) | |
except: | |
logger.critical("Cannot start web app or threads, daemon not started") | |
stop_threads() | |
sys.exit(1) | |
def start(self): | |
logger.info("Starting Daemon") | |
super(VideoQueue, self).start() | |
def stop(self): | |
logger.info("Stopping Daemon") | |
stop_task_threads() | |
stop_done_threads() | |
super(VideoQueue, self).stop() | |
def main(): | |
""" | |
We deal with the incoming arguments on startup | |
""" | |
video_queue = VideoQueue(PID_FILE) | |
p = optparse.OptionParser("usage: %prog [options] arg", version="%s %s" % | |
(__str__, __version__)) | |
p.add_option('-d', '--start', help="Starts the application as a Daemon", | |
action="store_true", default=False) | |
p.add_option('-s', '--stop', help="Stops the application runnin as a Daemon", | |
action="store_true", default=False) | |
p.add_option('-r', '--restart', help="Restarts Daemon", action="store_true", | |
default=False) | |
p.add_option('-p', '--port', help="Allows you to overide the port the Daemon runs on", | |
default=PORT, type="string", dest="port") | |
(options, args) = p.parse_args() | |
sys.argv[1] = options.port | |
if options.start: | |
video_queue.start() | |
elif options.stop: | |
video_queue.stop() | |
elif options.restart: | |
video_queue.restart() | |
elif options.in_file: | |
do_offline(options.in_file) | |
else: | |
p.error("Unknown command, use -h to get help") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment