Skip to content

Instantly share code, notes, and snippets.

@athoik
Created June 20, 2014 20:02
Show Gist options
  • Save athoik/3e19feee499a97d25768 to your computer and use it in GitHub Desktop.
Save athoik/3e19feee499a97d25768 to your computer and use it in GitHub Desktop.
e2restream
#!/usr/bin/python2.7
""" e2restream Deamon """
import os
import sys
import time
import atexit
import Queue
import hashlib
from threading import Thread, Lock
from signal import SIGTERM
import requests
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
from SocketServer import ThreadingMixIn
from urllib import unquote
HOST_NAME = ""
PORT_NUMBER = 888
RELAYS = {}
MUTEX = Lock()
### http://pymotw.com/2/select/
### http://stackoverflow.com/questions/16694907/how-to-download-large-file-in-python-with-requests-py
def md5(url):
m = hashlib.md5()
m.update(url)
return m.hexdigest()
def RequestsData(url):
global RELAYS
urlhash = md5(url)
res = requests.get(url, stream=True)
for chunk in res.iter_content(chunk_size=192*1024):
if chunk: # filter out keep-alive new chunks
with MUTEX:
if not urlhash in RELAYS:
break
for q in RELAYS[urlhash]:
RELAYS[urlhash][q].put(chunk)
def Stream(wfile, url):
global RELAYS
client = wfile.fileno()
urlhash = md5(url)
if urlhash in RELAYS:
with MUTEX:
RELAYS[urlhash][client] = Queue.Queue()
else:
with MUTEX:
RELAYS[urlhash] = {}
RELAYS[urlhash][client] = Queue.Queue()
thread = Thread(target=RequestsData, args=(url,))
thread.start()
while True:
try:
data = RELAYS[urlhash][client].get(True,15)
except Queue.Empty:
with MUTEX:
del RELAYS[urlhash][client]
break
else:
try:
wfile.write(data)
except:
break
if client in RELAYS[urlhash]:
with MUTEX:
del RELAYS[urlhash][client]
if len(RELAYS[urlhash]) == 0:
del RELAYS[urlhash]
wfile.close()
class StreamHandler(BaseHTTPRequestHandler):
def do_HEAD(s):
s.send_response(200)
s.send_header("Server", "e2restream")
s.send_header("Content-type", "text/html")
s.end_headers()
def do_GET(s):
"""Respond to a GET request."""
s.send_response(200)
s.send_header("Server", "e2restream")
s.send_header("Content-type", "application/octet-stream")
s.end_headers()
url=unquote(s.path[1:])
s.log_message("URL: %s", url)
Stream(s.wfile, url)
class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
"""Handle requests in a separate thread."""
def start():
httpd = ThreadedHTTPServer((HOST_NAME, PORT_NUMBER), StreamHandler)
print time.asctime(), "Server Starts - %s:%s" % (HOST_NAME, PORT_NUMBER)
try:
httpd.serve_forever()
except KeyboardInterrupt:
pass
httpd.server_close()
print time.asctime(), "Server Stops - %s:%s" % (HOST_NAME, PORT_NUMBER)
class Daemon:
"""
A generic daemon class.
Usage: subclass the Daemon class and override the run() method
"""
def __init__(self, pidfile, stdin="/dev/null", stdout="/dev/null", stderr="/dev/null"):
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
self.pidfile = pidfile
def daemonize(self):
"""
do the UNIX double-fork magic, see Stevens' "Advanced
Programming in the UNIX Environment" for details (ISBN 0201563177)
http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16
"""
try:
pid = os.fork()
if pid > 0:
# exit first parent
sys.exit(0)
except OSError, e:
sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
# decouple from parent environment
os.chdir("/")
os.setsid()
os.umask(0)
# do second fork
try:
pid = os.fork()
if pid > 0:
# exit from second parent
sys.exit(0)
except OSError, e:
sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
# redirect standard file descriptors
sys.stdout.flush()
sys.stderr.flush()
si = file(self.stdin, "r")
so = file(self.stdout, "a+")
se = file(self.stderr, "a+", 0)
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
# write pidfile
atexit.register(self.delpid)
pid = str(os.getpid())
file(self.pidfile,"w+").write("%s\n" % pid)
def delpid(self):
os.remove(self.pidfile)
def start(self):
"""
Start the daemon
"""
# Check for a pidfile to see if the daemon already runs
try:
pf = file(self.pidfile,"r")
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
if pid:
message = "pidfile %s already exist. Daemon already running?\n"
sys.stderr.write(message % self.pidfile)
sys.exit(1)
# Start the daemon
self.daemonize()
self.run()
def stop(self):
"""
Stop the daemon
"""
# Get the pid from the pidfile
try:
pf = file(self.pidfile,"r")
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
if not pid:
message = "pidfile %s does not exist. Daemon not running?\n"
sys.stderr.write(message % self.pidfile)
return # not an error in a restart
# Try killing the daemon process
try:
while 1:
os.kill(pid, SIGTERM)
time.sleep(0.1)
except OSError, err:
err = str(err)
if err.find("No such process") > 0:
if os.path.exists(self.pidfile):
os.remove(self.pidfile)
else:
print str(err)
sys.exit(1)
def restart(self):
"""
Restart the daemon
"""
self.stop()
self.start()
def run(self):
"""
You should override this method when you subclass Daemon. It will be called after the process has been
daemonized by start() or restart().
"""
class e2restreamDaemon(Daemon):
def run(self):
start()
if __name__ == "__main__":
daemon = e2restreamDaemon("/var/run/e2restream.pid")
if len(sys.argv) == 2:
if "start" == sys.argv[1]:
daemon.start()
elif "stop" == sys.argv[1]:
daemon.stop()
elif "restart" == sys.argv[1]:
daemon.restart()
elif "manualstart" == sys.argv[1]:
start()
else:
print "Unknown command"
sys.exit(2)
sys.exit(0)
else:
print "usage: %s start|stop|restart|manualstart" % sys.argv[0]
sys.exit(2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment