Last active
January 14, 2023 09:40
-
-
Save RicterZ/ed3517b944a04e18c91aacfe3d9aff24 to your computer and use it in GitHub Desktop.
Bilibili TV Controller written by Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Bilibili TV Controller | |
# | |
# Requirements | |
# - qrencode_ascii | |
# - zio | |
import threading | |
import json | |
import time | |
import logging | |
import sys | |
import requests | |
import re | |
import socket | |
import struct | |
import random | |
import os | |
import hashlib | |
import qrencode_ascii | |
from urllib.parse import urlparse | |
from functools import wraps | |
from enum import Enum | |
from zio import zio, b8, b32 | |
logger = logging.getLogger() | |
SETUP_MSG = '''SETUP /projection NVA/1.0\r | |
Session: deadbeef-1337-1337-1337-deadbeef1337\r | |
Connection: Keep-Alive\r | |
\r | |
''' | |
class ColorFormatter(logging.Formatter): | |
grey = "\x1b[38;20m" | |
blue = "\x1b[34;20m" | |
yellow = "\x1b[33;20m" | |
red = "\x1b[31;20m" | |
bold_red = "\x1b[31;1m" | |
reset = "\x1b[0m" | |
format = "[%(levelname)s]: %(message)s" | |
FORMATS = { | |
logging.DEBUG: grey + format + reset, | |
logging.INFO: blue + format + reset, | |
logging.WARNING: yellow + format + reset, | |
logging.ERROR: red + format + reset, | |
logging.CRITICAL: bold_red + format + reset | |
} | |
def format(self, record): | |
log_fmt = self.FORMATS.get(record.levelno) | |
formatter = logging.Formatter(log_fmt) | |
return formatter.format(record) | |
class Flag(Enum): | |
# ping command from TV every 1 second | |
PING = b'\xe4' | |
# flag of commands sent from client | |
COMMAND = b'\xe0' | |
# flag of commands sent from TV | |
RESPONSE = b'\xc0' | |
def check_settle(playing=True): | |
def _func(func): | |
def _wrapper(func): | |
@wraps(func) | |
def wrapper(*args, **kwargs): | |
if not args[0].settle: | |
logger.info('TV not settle, please run setup first') | |
return | |
if playing and not args[0].playing: | |
logger.info('Video not playing, cannot change states') | |
return | |
return func(*args, **kwargs) | |
return wrapper | |
return _wrapper(func) | |
return _func | |
def get_epid_and_ssid(aid): | |
url = 'https://www.bilibili.com/video/av{}/'.format(aid) | |
headers = requests.head(url, allow_redirects=False).headers | |
if not 'location' in headers: | |
return 0, 0 | |
epid = headers['location'].split('/')[-1] | |
if not epid.startswith('ep'): | |
return 0, 0 | |
resp = requests.get(headers['location']).text | |
ssid = re.findall('https://www.bilibili.com/bangumi/play/ss(\d+)/', resp) | |
if ssid: | |
return epid[2:], ssid[0] | |
return 0, 0 | |
def bv_decode(code): | |
table = 'fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF' | |
tr = {} | |
for i in range(58): | |
tr[table[i]] = i | |
s = [11, 10, 3, 8, 4, 6] | |
xor = 177451812 | |
add = 8728348608 | |
r = 0 | |
for i in range(6): | |
r += tr[code[s[i]]] * 58 ** i | |
return (r - add) ^ xor | |
def restore(): | |
pass | |
def save(obj): | |
pass | |
class BilibiliTV(): | |
io = None | |
target = None | |
discovered = [] | |
danmaku = False | |
# is video playing | |
playing = False | |
# video position | |
position = 0 | |
# video info | |
info = None | |
# video speed | |
speed = 1 | |
supported_speeds = None | |
# video quality | |
quality = None | |
supported_qualities = None | |
# video volume | |
volume = 0 | |
# setup | |
settle = False | |
dispatcher_thread = None | |
command_response = None | |
access_key = None | |
def __init__(self, addr=None): | |
self.target = addr | |
self.command_response = {} | |
def ping(self): | |
msg = Flag.RESPONSE.value | |
msg += b'\x00' | |
msg += b32(0x1337) | |
self.io.write(msg) | |
def make_command(self, command, args=None): | |
# message flag | |
while True: | |
serial = random.randint(0, 0xffffffff) | |
if serial not in self.command_response: | |
break | |
logger.debug('Make command {} with serial {} ...'.format(command, serial)) | |
msg = Flag.COMMAND.value | |
# message parts count | |
if args is None: | |
args = () | |
else: | |
# for now, only 1 argument been used | |
args = [json.dumps(args)] | |
msg += b8(len(args) + 2) | |
# message serial number | |
msg += b32(serial) | |
msg += b'\x01' | |
# command string | |
msg += b'\x07Command' | |
msg += b8(len(command)) | |
msg += command.encode() | |
# arguments | |
if not isinstance(args, (list, tuple)): | |
args = (args, ) | |
for arg in args: | |
if not isinstance(arg, bytes): | |
arg = str(arg).encode() | |
msg += b32(len(arg)) | |
msg += arg | |
return serial, msg | |
def dispatch(self): | |
while getattr(self.dispatcher_thread, 'not_stop', True): | |
flag = self.io.read(1) | |
if flag not in Flag._value2member_map_: | |
logger.error('Unknown message flag: {}'.format(repr(flag))) | |
raise Exception('Unknown message flag') | |
parts = int.from_bytes(self.io.read(1), 'big') | |
serial = int.from_bytes(self.io.read(4), 'big') | |
logger.debug('Get {} message with serial {}'.format(Flag._value2member_map_[flag].name, serial)) | |
if flag == Flag.PING.value: | |
self.settle = True | |
if parts == 0: | |
self.command_response[serial] = None | |
continue | |
# one more byte in commands from client | |
if flag == Flag.COMMAND.value: | |
_ = self.io.read(1) | |
result = [] | |
for i in range(parts): | |
# skip Command XXXX which length is only 1 byte | |
c = 1 if i <= 1 and flag != Flag.RESPONSE.value else 4 | |
length = int.from_bytes(self.io.read(c), 'big') | |
data = self.io.read(length) | |
result.append(data) | |
self.command_response[serial] = result | |
if len(result) == 1: | |
ret = json.loads(result[0]) | |
if 'volume' in ret: | |
self.volume = ret['volume'] | |
else: | |
logger.info(ret) | |
continue | |
command = '{}.{}'.format(result[0].decode(), result[1].decode()) | |
logger.debug('Received {}'.format(command)) | |
arguments = [] | |
if len(result) >= 2: | |
for arg in result[2:]: | |
arguments.append(json.loads(arg)) | |
logger.debug('argument: {}'.format(arg.decode())) | |
if command == 'Command.OnProgress': | |
self.position = arguments[0]['position'] | |
self.playing = True | |
elif command == 'Command.SpeedChanged': | |
self.speed = arguments[0]['currSpeed'] | |
self.supported_speeds = arguments[0]['supportSpeedList'] | |
logger.debug('Current speed: {}, supported speeds: {}'.format( | |
arguments[0]['currSpeed'], | |
' / '.join(map(str, arguments[0]['supportSpeedList']))) | |
) | |
elif command == 'Command.OnDanmakuSwitch': | |
self.danmaku = arguments[0]['open'] | |
logger.debug('Danmaku switch is {}'.format(arguments[0]['open'])) | |
elif command == 'Command.PLAY_SUCCESS': | |
self.playing = True | |
logger.debug('Start playing video command successfully') | |
elif command == 'Command.OnPlayState': | |
state = arguments[0]['playState'] | |
if state == 5: | |
logger.debug('Video playing paused') | |
elif state == 4: | |
logger.debug('Video playing resumed') | |
elif state == 7: | |
self.playing = False | |
logger.debug('Video playing stopped') | |
elif command == 'Command.OnQnSwitch': | |
text = [] | |
self.quality = arguments[0]['curQn'] | |
self.supported_qualities = arguments[0]['supportQnList'] | |
elif command == 'Command.OnEpisodeSwitch': | |
self.info = arguments[0] | |
logger.debug('Playing video {} (av{})'.format(self.info['title'], | |
self.info['playItem']['aid'])) | |
elif command == 'Command.Error': | |
logger.warning('Error: {}'.format(arguments[0]['errorCode'])) | |
else: | |
logger.debug('Unimplemented command {}'.format(command)) | |
def discover(self): | |
logger.info('Starting discovering Bilibili TV ...') | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
sock.bind(('239.255.255.250', 1900)) | |
mreq = struct.pack("4sl", socket.inet_aton('239.255.255.250'), socket.INADDR_ANY) | |
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) | |
count = 0 | |
while True: | |
data = sock.recv(1024).decode() | |
for line in data.split('\n'): | |
line = line.strip() | |
if line.lower().startswith('location'): | |
link = line.split(':', 1)[1].strip() | |
resp = requests.get(link).text | |
# bilibili | |
if 'Bilibili Inc.' in resp: | |
name = re.findall('<friendlyName>(.*)</friendlyName>', resp) | |
name = 'Unknown' if not name else name[0] | |
udn = re.findall('<UDN>uuid:(.*)</UDN>', resp)[0] | |
if udn not in self.discovered: | |
self.discovered.append(udn) | |
addr = (urlparse(link).hostname, urlparse(link).port) | |
logger.info('Found Bilibili TV: {}:{} ({})'.format(*addr, name)) | |
break | |
time.sleep(1) | |
count += 1 | |
if count == 5: | |
logger.info('Discover finished') | |
break | |
def setup(self, addr=None): | |
if self.target is None and addr is None: | |
logger.error('Target address not be set') | |
return | |
self.target = addr | |
logger.info('Initialize connection ...') | |
self.io = zio((self.target), print_write=False, print_read=False) | |
self.io.write(SETUP_MSG) | |
self.io.read_until('\r\n\r\n') | |
self.dispatcher_thread = threading.Thread(target=self.dispatch) | |
self.dispatcher_thread.start() | |
c = 0 | |
while not self.settle: | |
self.ping() | |
if c >= 3: | |
logger.error('setup failed for no response') | |
return | |
c += 1 | |
time.sleep(1) | |
self.get_volume() | |
logger.info('Setup TV connection successfully') | |
@check_settle(playing=False) | |
def set_volume(self, volume): | |
logger.info('Set the volume to {}'.format(volume)) | |
if not volume.isdigit(): | |
return | |
self.volume = int(volume) | |
_, msg = self.make_command('SetVolume', {'volume': self.volume}) | |
self.io.write(msg) | |
@check_settle(playing=False) | |
def get_volume(self): | |
serial, msg = self.make_command('GetVolume') | |
self.io.write(msg) | |
while serial in self.command_response: | |
return self.volume | |
@check_settle(playing=False) | |
def play(self, aid=0, biz_id=0, room_id=0, epid=0, season_id=0, cid=0): | |
self.speed = 1 | |
self.quality = None | |
self.supported_speeds = None | |
self.supported_qualities = None | |
self.info = None | |
if epid == 0 and season_id == 0: | |
epid, season_id = get_epid_and_ssid(aid) | |
content_type = 0 if not epid else 1 | |
data = { | |
'biz_id': biz_id, | |
'roomId': room_id, | |
'oid': aid, | |
'aid': aid, | |
'epId': epid, | |
'cid': cid, | |
'seasonId': season_id, | |
'type': 0, | |
'contentType': content_type, | |
'autoNext': 'true', | |
'userDesireQn': '112', | |
'accessKey': self.access_key | |
} | |
if self.access_key is None: | |
logger.warning('access_key is not be set, cannot play high quality videos, try login') | |
logger.info('Sending play video command ...') | |
serial, msg = self.make_command('Play', data) | |
self.io.write(msg) | |
count = 0 | |
while True: | |
if self.info is not None: | |
logger.info('Playing video: {}(av{})'.format(self.info['title'], | |
self.info['playItem']['aid'])) | |
text = '' | |
current = '' | |
for k in self.info['qnDesc']['supportQnList']: | |
if self.info['qnDesc']['curQn'] == k['quality']: | |
current = k['description'] | |
text += '{}({}) / '.format(k['description'], k['quality']) | |
logger.info('Current quality: {}'.format(current)) | |
logger.info('Supported quality: {}'.format(text[:-2])) | |
break | |
if count == 3: | |
logger.error('Play video failed, please check the video ID') | |
break | |
time.sleep(1) | |
count += 1 | |
@check_settle() | |
def stop(self): | |
logger.info('Stop playing video ...') | |
self.playing = False | |
_, msg = self.make_command('Stop') | |
self.io.write(msg) | |
self.dispatcher_thread.not_stop = False | |
@check_settle() | |
def pause(self): | |
logger.info('Pause video playing ...') | |
_, msg = self.make_command('Pause') | |
self.io.write(msg) | |
@check_settle() | |
def resume(self): | |
logger.info('Resume video playing ...') | |
_, msg = self.make_command('Resume') | |
self.io.write(msg) | |
@check_settle() | |
def seek(self, t): | |
logger.info('Seek to {} sec ...'.format(t)) | |
_, msg = self.make_command('Seek', {'seekTs': t}) | |
self.io.write(msg) | |
def set_seek(self, position): | |
self.seek(position) | |
def get_seek(self): | |
return self.position | |
@check_settle() | |
def set_speed(self, speed): | |
logger.info('Set speed to {} ...'.format(speed)) | |
if float(speed) not in self.supported_speeds: | |
logger.warning('Unsupported speed specified') | |
return | |
_, msg = self.make_command('SwitchSpeed', {'speed': speed}) | |
self.io.write(msg) | |
@check_settle() | |
def get_speed(self): | |
return self.speed | |
@check_settle(playing=False) | |
def toggle_danmaku(self): | |
self.danmaku = not self.danmaku | |
logger.info('Toggle danmaku to {} ...'.format(self.danmaku)) | |
_, msg = self.make_command('SwitchDanmaku', {'open': self.danmaku}) | |
self.io.write(msg) | |
@check_settle() | |
def set_quality(self, quality): | |
if not self.playing: | |
return | |
if int(quality) not in map(lambda k: k['quality'], self.supported_qualities): | |
logger.warning('Unsupported quality specified') | |
return | |
logger.info('Set quality to {} ...'.format(quality)) | |
_, msg = self.make_command('SwitchQn', {'qn': quality}) | |
self.io.write(msg) | |
@check_settle() | |
def get_quality(self): | |
return self.quality | |
def parse_command(self, command): | |
args = command[1:] | |
command = command[0].lower() | |
if command in ('quit', 'exit'): | |
if self.dispatcher_thread: | |
self.dispatcher_thread.not_stop = False | |
raise SystemExit | |
elif command in ('pause', 'resume', 'result', 'danmaku', 'stop', 'discover'): | |
if command == 'danmaku': | |
command = 'toggle_danmaku' | |
self.__getattribute__(command)() | |
elif command in ('quality', 'speed', 'volume', 'seek'): | |
if len(args) == 0: | |
ret = self.__getattribute__('get_{}'.format(command))() | |
if ret: | |
print(ret) | |
else: | |
self.__getattribute__('set_{}'.format(command))(*args) | |
elif command == 'play': | |
if not args: | |
print('Usage: play [avXXXXXXX]') | |
return | |
if args[0].lower().startswith('bv'): | |
av = bv_decode(args[0]) | |
elif args[0].startswith('av'): | |
av = args[0][2:] | |
else: | |
av = args[0] | |
self.play(aid=av) | |
elif command == 'setup': | |
if len(args) < 2: | |
print('Usage: setup [host] [port]') | |
return | |
self.setup((args[0], int(args[1]))) | |
elif command == 'debug': | |
logger.setLevel(logging.DEBUG) | |
elif command == 'login': | |
if len(args) > 0: | |
self.login(args[0]) | |
else: | |
self.login() | |
def interactive(self): | |
while True: | |
try: | |
command = input('📺 >>> ').split(' ') | |
except (EOFError, KeyboardInterrupt): | |
self.dispatcher_thread.not_stop = False | |
break | |
self.parse_command(command) | |
def login(self, access_key=None): | |
if access_key is not None: | |
self.access_key = access_key | |
return | |
def get_sign(data): | |
s = '' | |
for k, v in data.items(): | |
s += '{}={}&'.format(k, v) | |
s = s[:-1] | |
s += '59b43e04ad6965f34319062b478f83dd' | |
return hashlib.md5(s.encode()).hexdigest() | |
base = 'https://passport.bilibili.com/x/passport-tv-login/qrcode/' | |
url = base + 'auth_code' | |
data = { | |
'appkey': '4409e2ce8ffd12b8', | |
'local_id': 0, | |
'ts': 0, | |
} | |
data['sign'] = get_sign(data) | |
ret = requests.post(url, data=data).json() | |
qrcode = qrencode_ascii.encode(ret['data']['url']) | |
logger.info('Please scan qrcode and scan via Bilibili app') | |
print(qrcode) | |
url = base + '/poll' | |
data = { | |
'appkey': '4409e2ce8ffd12b8', | |
'auth_code': ret['data']['auth_code'], | |
'local_id': 0, | |
'ts': 0, | |
} | |
data['sign'] = get_sign(data) | |
while True: | |
ret = requests.post(url, data=data).json() | |
if ret['code'] == 0: | |
self.access_key = ret['data']['access_token'] | |
logger.info('Login successfully with user {}'.format(ret['data']['mid'])) | |
logger.info('Access key: {}'.format(self.access_key)) | |
break | |
time.sleep(1) | |
if __name__ == '__main__': | |
logger.setLevel(logging.INFO) | |
handler = logging.StreamHandler() | |
handler.setFormatter(ColorFormatter()) | |
logger.addHandler(handler) | |
b = BilibiliTV() | |
# b.discover() | |
b.setup(('192.168.31.198', 9958)) | |
b.login(access_key='xxxxxxxx') | |
b.interactive() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment