Created
June 10, 2018 23:54
-
-
Save th0ma5w/d7e69454a24dcadda89a226252421705 to your computer and use it in GitHub Desktop.
Single Page Web Application and REST API for TzumiMagicTV
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# th0ma5w at github | |
# | |
# requires: | |
# http://archive.openwrt.org/attitude_adjustment/12.09/ar71xx/generic/packages/zlib_1.2.7-1_ar71xx.ipk | |
# http://archive.openwrt.org/attitude_adjustment/12.09/ar71xx/generic/packages/python-mini_2.7.3-1_ar71xx.ipk | |
# | |
# python ./tzumi_server.py | |
# | |
import socket | |
tvs = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
tvs.settimeout(10) | |
tvs.connect(('127.0.0.1',6000)) | |
tvs.send('<msg type="login_req"><params protocol="TCP" port="8000"/></msg>') | |
tv_response = tvs.recv(4096) | |
def tune(freq, prog): | |
global tv_response, tvs | |
tvs.send('<msg type="tune_req"><params tv_type="ATSC" freq="%s" bandwidth="8000" plp="0" programNumber="%s"/></msg>' % (freq, prog)) | |
tv_response = tvs.recv(4096) | |
return tv_response | |
def check(): | |
global tv_response, tvs | |
tvs.send('<msg type="Check_Lock_req"></msg>') | |
tv_response = tvs.recv(4096) | |
return tv_response | |
MAX_PACKET = 32768 | |
server_sock = socket.socket( | |
socket.AF_INET, | |
socket.SOCK_STREAM, | |
socket.IPPROTO_TCP) | |
server_sock.bind(('0.0.0.0',80)) | |
server_sock.listen(1) | |
def recv_all(sock): | |
prev_timeout = sock.gettimeout() | |
try: | |
sock.settimeout(0.01) | |
rdata=[] | |
while True: | |
try: | |
rdata.append(sock.recv(MAX_PACKET)) | |
except socket.timeout: | |
return ''.join(rdata) | |
finally: | |
sock.settimeout(prev_timeout) | |
def normalize_line_endings(s): | |
return ''.join((line + '\n') for line in s.splitlines()) | |
html_header = "<html>" | |
html_footer = "</html>" | |
head_header = "<head>" | |
head_footer = "</head>" | |
body_header = "<body>" | |
body_footer = "</body>" | |
xml_header = "<xml>" | |
xml_footer = "</xml>" | |
#p=new DOMParser(); | |
#d=p.parseFromString('<xml><msg>hello</msg></xml>','application/xml'); | |
#console.log(d.getElementsByTagName('msg')[0].textContent) | |
css_styles = """ | |
<style> | |
</style> | |
""" | |
javascript_source = """ | |
<script> | |
dom_parser = new DOMParser(); | |
var TUNING = false; | |
var TUNED = false; | |
tune_url = function(freq,prog) { return "/tune/" + freq + "/" + prog; } | |
check_url = "/check" | |
channels = { | |
2:57000, | |
3:63000, | |
4:69000, | |
5:79000, | |
6:85000, | |
7:177000, | |
8:183000, | |
9:189000, | |
10:195000, | |
11:201000, | |
12:207000, | |
13:213000, | |
14:473000, | |
15:479000, | |
16:485000, | |
17:491000, | |
18:497000, | |
19:503000, | |
20:509000, | |
21:515000, | |
22:521000, | |
23:527000, | |
24:533000, | |
25:539000, | |
26:545000, | |
27:551000, | |
28:557000, | |
29:563000, | |
30:559000, | |
31:575000, | |
32:581000, | |
33:587000, | |
34:593000, | |
35:559000, | |
36:605000, | |
37:611000, | |
38:617000, | |
39:613000, | |
40:629000, | |
41:635000, | |
42:641000, | |
43:647000, | |
44:653000, | |
45:659000, | |
46:665000, | |
47:671000, | |
48:677000, | |
49:683000, | |
50:689000, | |
51:695000 | |
} | |
var channeldiv = document.getElementById('channeldiv'); | |
update_freq = function(f){ | |
freq=document.getElementById('frequency').value=f; | |
return true; | |
} | |
var channel_block_count = 0; | |
for (c in channels){ | |
existing = channeldiv.innerHTML; | |
newhtml = existing + '<input type="radio" id="c'+c+'" name="channel" value="'+channels[c]+'" onclick="update_freq('+channels[c]+');">' + c + ' '; | |
channel_block_count = channel_block_count + 1; | |
if (channel_block_count % 10 === 0) { | |
newhtml = newhtml + "<br />"; | |
} | |
channeldiv.innerHTML = newhtml; | |
} | |
set_tv_tuned = function() {document.getElementById('status').innerHTML="Tuned"}; | |
set_tv_untuned = function() {document.getElementById('status').innerHTML="Untuned"}; | |
process_check_response = function(xml_response){ | |
d=dom_parser.parseFromString(xml_response,'application/xml'); | |
ack_info=d.getElementsByTagName('ack_info')[0] | |
ret_attrib=ack_info.attributes['ret'] | |
ret=ret_attrib.value | |
if (ret==="0") { | |
TUNED=true; | |
console.log("TUNED"); | |
set_tv_tuned(); | |
} | |
if (ret==="-1") { | |
TUNED=false; | |
console.log("UNTUNED"); | |
set_tv_untuned(); | |
} | |
TUNING = false; | |
} | |
send_request = function(url){ | |
var r = new XMLHttpRequest(); | |
r.open('GET', url, true); | |
r.onreadystatechange=function(){ | |
if (r.readyState==4 && r.status==200){ | |
process_check_response(r.responseText); | |
} | |
}; | |
r.send(); | |
} | |
check_tv = function(){ | |
if (! TUNING){ | |
send_request(check_url); | |
} | |
} | |
tune_tv_f = function(freq,prog) { | |
TUNING = true; | |
send_request(tune_url(freq,prog)); | |
} | |
tune_tv = function() { | |
TUNING = true; | |
freq=document.getElementById('frequency').value; | |
prog=document.getElementById('program').value; | |
send_request(tune_url(freq,prog)); | |
} | |
let check_timer = setInterval(check_tv, 1000); | |
</script> | |
""" | |
def html_head(): | |
return head_header + css_styles + head_footer | |
def body_wrap(s): | |
return body_header + s + body_footer | |
def html_wrap(s): | |
return html_header + body_wrap(s) + html_footer | |
def html_app_wrap(s): | |
return html_header + html_head() + body_wrap(s+javascript_source) + html_footer | |
def xml_wrap(s): | |
return xml_header + s + xml_footer | |
html_headers = { | |
'Content-Type': 'text/html; encoding=utf8', | |
} | |
xml_headers = { | |
'Content-Type': 'application/xml; encoding=utf8', | |
} | |
json_headers = { | |
'Content-Type': 'application/json; encoding=utf8', | |
} | |
playlist_headers = { | |
'Content-Type': 'application/vnd.apple.mpegurl; encoding=utf8', | |
} | |
def tune_response(r): | |
parts = r.uri.split('/') | |
freq = parts[2] | |
prog = parts[3] | |
response = tune(freq,prog) | |
return (xml_headers, xml_wrap(response)) | |
def check_response(r): | |
response = check() | |
return (xml_headers, xml_wrap(response)) | |
def make_response_debug(r): | |
br = "<br/>" | |
response_body = [ | |
"URI: " + r.uri + br, | |
"Method: " + r.method + br, | |
"Proto: " + r.proto + br, | |
"Body: " + r.body + br, | |
br, | |
"<ul>" | |
] | |
for k,v in r.headers.iteritems(): | |
response_body.append('<li><b>%s</b> : %s</li>' % (k,v)) | |
response_body.append('</ul>') | |
return (html_headers, html_wrap(''.join(response_body))) | |
def test_xml(r): | |
global tv_response | |
tvs.send('<msg type="login_req"><params protocol="TCP" port="8000"/></msg>') | |
tv_response = tvs.recv(4096) | |
#fix response | |
tv_response = tv_response.replace('"<','"/><') | |
tv_response = tv_response.replace('"device_type','" device_type') | |
return (xml_headers, xml_wrap(tv_response)) | |
def web_app(r): | |
return (html_headers, html_app_wrap(""" | |
<b>TV Tuner</b><br/> | |
<table> | |
<tr> | |
<td>Status:</td> | |
<td><span id="status"></status></td> | |
</tr> | |
<tr> | |
<td>Frequency:</td> | |
<td><input id="frequency" type="text"></input></td> | |
</tr> | |
<tr> | |
<td>Program:</td> | |
<td><input id="program" type="text"></input></td> | |
</tr> | |
</table> | |
<button type="button" onClick="tune_tv()">Tune</button><br/> | |
<div id="channeldiv"></div> | |
""")) | |
routes = { | |
'/tune/' : tune_response, | |
'/check' : check_response, | |
'/xml' : test_xml, | |
"/" : web_app | |
} | |
class Request: | |
headers=None | |
body=None | |
method=None | |
uri=None | |
proto=None | |
def resolve_route(headers,body,method,uri,proto): | |
r=Request() | |
r.headers, r.body, r.method, r.uri, r.proto = headers,body,method,uri,proto | |
matched = False | |
for k,v in routes.iteritems(): | |
if (uri[:len(k)] == k): | |
matched = True | |
return v(r) | |
if not matched: | |
return make_response_debug(r) | |
while True: | |
client_sock, clinet_addr = server_sock.accept() | |
try: | |
request = normalize_line_endings(recv_all(client_sock)) | |
request_head, request_body = request.split('\n\n',1) | |
request_head = request_head.splitlines() | |
request_headline = request_head[0] | |
request_headers = dict(x.split(': ', 1) for x in request_head[1:]) | |
request_method, request_uri, request_proto = request_headline.split( | |
' ', 3) | |
response_headers, response_body = resolve_route( | |
request_headers, | |
request_body, | |
request_method, | |
request_uri, | |
request_proto) | |
response_headers.update({ | |
'Content-Length': len(response_body), | |
'Connection': 'close' | |
}) | |
response_headers_raw = ''.join('%s: %s\n' % (k,v) for k,v in | |
response_headers.iteritems()) | |
response_proto = 'HTTP/1.1' | |
response_status = '200' | |
response_status_text = 'OK' | |
client_sock.send('%s %s %s' % ( | |
response_proto, | |
response_status, | |
response_status_text)) | |
client_sock.send(response_headers_raw) | |
client_sock.send('\n') | |
client_sock.send(response_body) | |
except: | |
pass | |
client_sock.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment