Last active
October 10, 2023 07:31
-
-
Save Sg4Dylan/3b2528d27dd49473b8af56b4caf02efc to your computer and use it in GitHub Desktop.
网易云音乐远程控制
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
#coding:utf-8 | |
# Author: Sg4Dylan -- <sg4dylan#gmail.com> | |
# Update: 10/13/2017 | |
''' | |
透过 Chrome Debug Protocol 读取网易云音乐当前播放状态 | |
启动播放器时附带启动参数 | |
--remote-debugging-port=8080 | |
安装依赖 | |
pip install requests cproto BeautifulSoup4 | |
''' | |
import json | |
import requests | |
from bs4 import BeautifulSoup | |
from cproto import CProto | |
def get_neteast_status(remote_host, remote_port): | |
def get_node_id(children, name): | |
for i in children: | |
if name in i.get('nodeName',''): | |
return i['nodeId'] | |
if 'children' in i: | |
return get_node_id(i['children'], name) | |
def detect_if_fm(): | |
raw_api = requests.get('http://{0}:{1}/json'.format(remote_host, remote_port)) | |
page_url = json.loads(raw_api.content.decode('UTF-8'))[0]['url'] | |
if page_url.endswith('fm/'): | |
return True | |
return False | |
def play_status(cp): | |
status_result = { | |
'is_pause': False, | |
'time_now': '', | |
'time_all': '' | |
} | |
# Get body nodeId | |
raw_dom_info = cp.DOM.getDocument() | |
root_children = raw_dom_info['result']['root']['children'] | |
body_nodeid = get_node_id(root_children, 'BODY') | |
# Get body HTML | |
body_html = cp.DOM.getOuterHTML(nodeId=body_nodeid)['result']['outerHTML'] | |
# Get play status | |
body_soup = BeautifulSoup(body_html, "html.parser") | |
target_dom = body_soup.select("[class~=btnp]") | |
# Detect FM page | |
index_id = 0 | |
if detect_if_fm(): | |
index_id = 1 | |
status_result['is_pause'] = True if 'btnp-play' in target_dom[index_id].get('class') else False | |
status_result['time_now'] = body_soup.select("[class~=now]")[index_id].contents[0] | |
status_result['time_all'] = body_soup.select("[class~=all]")[index_id].contents[0] | |
return status_result | |
# Create CProto instance and connect to a browser over CDP | |
cp = CProto(host=remote_host, port=remote_port) | |
result = play_status(cp) | |
cp.close() | |
return result | |
print(get_neteast_status('127.0.0.1',8080)) | |
# {'is_pause': True, 'time_now': '00:11', 'time_all': '03:41'} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# NeteaseMusicController - impl from fb2kcontroller | |
# version: alpha 0.7 | |
""" | |
依赖: | |
pip install cherrypy psutil requests pymediainfo | |
pip install https://github.com/AndreMiras/pycaw/archive/master.zip | |
pywin32 <- 这个自己找预编译安装包搞定 | |
pymediainfo <- 这个库需要自己放一个 MediaInfo.dll (与Python的编译架构相同) 放在环境变量目录里 | |
设定: | |
启动播放器时附带启动参数 | |
--remote-debugging-port=8088 | |
手机下载 foobar2000controller 并与电脑在同一局域网 | |
填写电脑的 IP | |
端口是 8080 | |
用户名密码空着 | |
Changelog: | |
alpha 0.7 | |
- 透过 CDP 协议读播放状态 | |
alpha 0.6 | |
- 修复一个读取歌曲信息的遗漏BUG | |
- 加入播放列表切换功能(仅支持顺序循环播放功能) | |
alpha 0.5 | |
- 完善播放列表(修复BUG) | |
alpha 0.4 | |
- 实现播放列表显示(仍有BUG) | |
alpha 0.3 | |
- 修复了私人FM状态读取 | |
alpha 0.2 | |
- 修复了本地音乐读取错误的问题 | |
alpha 0.1 | |
- 初次完成编写 | |
TODO: | |
- 实现进度条读取(完成) | |
- 实现播放状态检测(完成) | |
- 实现点击播放列表切换歌曲(咕咕一半) | |
""" | |
# 基础库 | |
import cherrypy | |
import json | |
# 键盘动作控制 | |
import win32api | |
import time | |
# 音量控制 | |
from ctypes import cast, POINTER | |
from comtypes import CLSCTX_ALL | |
from pycaw.pycaw import AudioUtilities, IAudioEndpointVolume | |
import math | |
# 读取播放信息 | |
import os | |
import json | |
import string | |
import psutil | |
from ctypes import wintypes | |
import win32con | |
import win32gui | |
import win32process | |
from pymediainfo import MediaInfo | |
from get_play_status_from_netease_music import get_neteast_status | |
# 封面图 | |
import requests | |
import uuid | |
class FoorbarAPI(object): | |
codec_info = "" | |
song_name = "" | |
song_artist = "" | |
song_album = "" | |
play_status = 1 | |
eta_time = "" | |
total_time = "" | |
cover_link = "" | |
now_volume = "" | |
playlist_now = "" | |
now_foc = 0 | |
headers = { | |
'Accept-Encoding': 'gzip, deflate, sdch', | |
'Accept-Language': 'en-US,en;q=0.8,zh-CN;q=0.6,zh;q=0.4', | |
'DNT': '1', | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' | |
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2873.0 Safari/537.36', | |
} | |
devices = AudioUtilities.GetSpeakers() | |
interface = devices.Activate(IAudioEndpointVolume._iid_, CLSCTX_ALL, None) | |
volume = cast(interface, POINTER(IAudioEndpointVolume)) | |
@cherrypy.expose | |
def index(self): | |
return "OK" | |
@cherrypy.expose | |
def foobar2000controller(self, cmd="", param3="", param1=""): | |
self.api_update_info() | |
# 检查连接 | |
if cmd=="FormatTitles" and param1=="": | |
return "[]" | |
if cmd=="FormatTitles" and param1=="%path%": | |
return "[]" | |
# 队列信息 | |
if cmd=="GetQueue" and param3=="NoResponse": | |
return "" | |
# 获取分类风格信息 | |
if param3=="library.json": | |
return """{ | |
"queryInfo": [], | |
"query": [""] | |
}""" | |
# 获取歌曲长度 | |
if cmd=="FormatTitles" and param1=="%length_seconds%": | |
return "[\"%s\"]" % self.total_time | |
# 读取当前信息 | |
if param3=="info.json": | |
return self.api_info() | |
# 歌曲库 | |
if param3=="playlist.json": | |
return self.playlist_now | |
# 文件访问 | |
if param3=="browser.json": | |
return """{ | |
"path": [], | |
"pathcurrent": "", | |
"parent": "%20", | |
"browser": [] | |
}""" | |
# 版本 | |
if cmd=="Version": | |
return "" | |
# 播放列表页 | |
if cmd=="PlaylistItemsPerPage": | |
return "" | |
# 返回歌曲的封面 | |
if "albumart" in cherrypy.url(): | |
return self.api_cover("covers/"+cherrypy.url().split("albumart_")[1]) | |
# 停止 or 播放 | |
if cmd=="PlayOrPause": | |
self.api_startorpause() | |
# 播放列表切歌 | |
if cmd=="Start": | |
# 重播这一曲 | |
if not param1: | |
self.api_replay() | |
# 切换 | |
else: | |
self.api_change_now_play(int(param1)) | |
# 上一曲 | |
if cmd=="StartPrevious": | |
self.api_play_previous() | |
# 下一曲 | |
if cmd=="StartNext": | |
self.api_play_next() | |
# 音量控制 | |
if cmd=="Volume": | |
if param1: | |
self.api_volume_control(param1) | |
return "OK" | |
def api_cover(self, download_img_name): | |
if not os.path.isfile(download_img_name): | |
curl_command = "curl -o " + download_img_name + \ | |
" -A \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) " \ | |
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2906.0 Safari/537.36\" " | |
final_command = curl_command + self.cover_link | |
os.system(final_command) | |
cherrypy.response.headers['Content-Type'] = "image/jpg" | |
return open(download_img_name, 'rb') | |
def api_change_now_play(self, target_pos): | |
if target_pos - self.now_foc >= 0: | |
for i in range(target_pos - self.now_foc): | |
self.api_play_next() | |
time.sleep(0.15) | |
else: | |
for i in range(self.now_foc - target_pos): | |
self.api_play_previous() | |
time.sleep(0.15) | |
def api_startorpause(self): | |
win32api.keybd_event(0xB3, win32api.MapVirtualKey(0xB3, 0)) | |
def api_replay(self): | |
win32api.keybd_event(0xB1, win32api.MapVirtualKey(0xB1, 0)) | |
time.sleep(0.05) | |
win32api.keybd_event(0xB0, win32api.MapVirtualKey(0xB0, 0)) | |
def api_play_previous(self): | |
win32api.keybd_event(0xB1, win32api.MapVirtualKey(0xB1, 0)) | |
def api_play_next(self): | |
win32api.keybd_event(0xB0, win32api.MapVirtualKey(0xB0, 0)) | |
def api_volume_control(self, value): | |
target_volume = 34.5 * math.log(int(value)/200, 10) | |
self.volume.SetMasterVolumeLevel(target_volume, None) | |
def api_detect_play_info(self): | |
# 读窗体标题识别歌曲名, 读文件句柄识别当前打开文件 | |
window_title,file_path,self.song_name,self.song_artist,self.song_album,self.total_time,self.cover_link,self.codec_info = ReadTitlePath().get_info() | |
# 检测当前音量 | |
self.now_volume = str(int((10 ** (self.volume.GetMasterVolumeLevel()/34.5))*200)) | |
def api_detect_play_status(self): | |
# 播放状态 | |
status_dict = get_neteast_status('127.0.0.1',8088) | |
self.play_status = 0 if status_dict['is_pause'] else 1 | |
time_now = status_dict['time_now'].split(':',1) | |
self.eta_time = str(int(self.total_time)-60*int(time_now[0])-int(time_now[1])) | |
pass | |
def api_update_info(self): | |
self.api_detect_play_info() | |
self.api_detect_play_status() | |
self.playlist_now,self.now_foc = ConvertPlaylist(self.song_name,self.song_artist,self.song_album).parse_netease_json() | |
def api_info(self): | |
json_dict = {"codec":"","helper2":"","isPlaying":"","isPaused":"","elapsedTime":"0","trackLength":"","volume":"100","volumeDB":"0","order":"1","albumArt":"","itemplaying":"0","page":"1","itemsPage":"60","search":"","sac":"","nowPlayingRating":""} | |
# 当前歌曲编码信息 | |
json_dict["codec"] = self.codec_info | |
# 歌手 歌曲名 专辑名 | |
json_dict["helper2"] = self.song_artist.replace("-","–") + " - " + self.song_name.replace("-","–") + " - " + self.song_album.replace("-","–") | |
# 播放状态 | |
json_dict["isPlaying"] = "1" if self.play_status == 1 else "0" | |
json_dict["isPaused"] = "1" if self.play_status == 0 else "0" | |
# 剩余时间 | |
json_dict["elapsedTime"] = self.eta_time | |
# 歌曲时间 | |
json_dict["trackLength"] = self.total_time | |
# 随机化音乐封面名称 | |
json_dict["albumArt"] = "/foobar2000controller/albumart_" + str(uuid.uuid5(uuid.NAMESPACE_URL, json_dict["helper2"])) | |
# 音量 | |
json_dict["volume"] = self.now_volume | |
# 返回 JSON | |
response_json = json.dumps(json_dict, ensure_ascii=False) | |
return response_json.encode('utf-8') | |
class ConvertPlaylist: | |
json_str = "" | |
file_name = os.getenv('APPDATA')[:-7] + "Local\\Netease\\CloudMusic\\webdata\\file\\queue" | |
song_name = "" | |
song_artist = "" | |
song_album = "" | |
def __init__(self, song_name, song_artist, song_album): | |
file = "" | |
self.song_name, self.song_artist, self.song_album = song_name, song_artist, song_album | |
with open(self.file_name,"rb") as target: | |
file = target.read().decode("utf-8") | |
self.json_str = json.loads(file) | |
def parse_netease_json(self): | |
count_ = 0 | |
song_time = 0 | |
playlist = {'playlist':{'js':[],"t":"16:48:20","pa":"1","pp":"1","page":"0","pages":"1","foc":"1","ip":"1","ipp":"999","ic":"249"}, | |
"order":"1", | |
"playlists":{"js":[{"name":"网易云音乐","count":"0"}],"ac":"1","pl":"1","page":"0","t":""} | |
} | |
song_foc = 0 | |
for item in self.json_str: | |
count_ += 1 | |
temp_dict = {} | |
temp_dict['artist'] = item['track']['artists'][0]['name'] | |
temp_dict['album'] = item['track']['album']['name'] | |
temp_dict['track'] = item['track']['name'] | |
song_time +=item['track']['duration'] | |
temp_dict['len'] = "%s:%s" % (str(int(item['track']['duration']/60000)) , str(int(item['track']['duration']/1000 % 60))) | |
temp_dict['rating'] = "?" | |
temp_dict['queued'] = "" | |
temp_dict['playCount'] = "1" | |
# 填充当前播放位置 | |
if temp_dict['artist'] == self.song_artist and temp_dict['album'] == self.song_album and temp_dict['track'] == self.song_name: | |
song_foc = count_ - 1 | |
# 写入列表 | |
playlist['playlist']['js'].append(temp_dict) | |
playlist['playlist']['ic'] = str(count_) | |
playlist['playlists']['js'][0]['count'] = str(count_) | |
playlist['playlist']['foc'] = str(song_foc) | |
playlist['playlist']['ip'] = str(song_foc) | |
playlist['playlist']['t'] = time.strftime('%H:%M:%S',time.gmtime(song_time)) | |
playlist['playlists']['t'] = playlist['playlist']['t'] | |
return json.dumps(playlist, ensure_ascii=False), song_foc | |
class ReadTitlePath: | |
title = "" | |
target_path = "" | |
codec_info = "" | |
song_name = "" | |
song_artist = "" | |
song_album = "" | |
song_len = "" | |
cover_link = "" | |
playlist = "" | |
def enumWindowsProc(self, hwnd, lParam): | |
if (lParam is None) or ((lParam is not None) and (win32process.GetWindowThreadProcessId(hwnd)[1] == lParam)): | |
text = win32gui.GetWindowText(hwnd) | |
if text: | |
wStyle = win32api.GetWindowLong(hwnd, win32con.GWL_STYLE) | |
if wStyle & win32con.WS_VISIBLE: | |
self.title = text | |
def enumProcWnds(self, pid=None): | |
win32gui.EnumWindows(self.enumWindowsProc, pid) | |
def getProcName(self, procName): | |
pid_list = [] | |
for proc in psutil.process_iter(): | |
if proc.name() == procName: | |
pid = str(proc) | |
rig = pid.split('=',3) | |
pid = rig[1] | |
rig = pid.split(',',2) | |
pid = rig[0] | |
pid_list.append(pid) | |
return pid_list | |
def get_info(self): | |
pid = self.getProcName("cloudmusic.exe") | |
for x in range(0, len(pid)): | |
self.enumProcWnds(int(pid[x])) | |
if self.title != '': | |
target_process = psutil.Process(int(pid[x])) | |
self.target_path = "" | |
for x in target_process.open_files(): | |
if "mp3" in x.path or "aac" in x.path or "flac" in x.path or "ape" in x.path: | |
self.target_path = x.path | |
self.get_detail_info() | |
return self.title, self.target_path, self.song_name, self.song_artist, self.song_album, self.song_len, self.cover_link, self.codec_info | |
def parse_netease_json(self): | |
for item in self.playlist: | |
if self.song_name in item['track']['name']: | |
self.song_artist = item['track']['artists'][0]['name'] | |
self.song_album = item['track']['album']['name'] | |
self.cover_link = item['track']['album']['picUrl'] | |
self.song_len = str(int(int(item['track']['duration'])/1000)) | |
# 文件信息读取 | |
read_else = True | |
try: | |
if "playFile" in item: | |
media_info = MediaInfo.parse(item['playFile']) | |
track = media_info.tracks[1] | |
codec_t = media_info.tracks[0].codec | |
self.codec_info = codec_t.upper() + " | " + str(int(track.bit_rate/1000)) + "kbps | " + str(int(track.sampling_rate)) + "Hz" | |
read_else = False | |
elif "yunSong" in item['track']: | |
if item['track']['yunSong']: | |
self.codec_info = item['track']['yunSong']['fileExt'][1:].upper() + " | " + str(int(item['track']['yunSong']['bitrate'])) + "kbps" | |
read_else = False | |
else: | |
read_else = True | |
if read_else: | |
target_mode = "hMusic" | |
if 'bMusic' in item['track']: | |
if item['track']['bMusic']: | |
target_mode = "bMusic" | |
if 'lMusic' in item['track']: | |
if item['track']['lMusic']: | |
target_mode = "lMusic" | |
if 'mMusic' in item['track']: | |
if item['track']['mMusic']: | |
target_mode = "mMusic" | |
if 'hMusic' in item['track']: | |
if item['track']['hMusic']: | |
target_mode = "hMusic" | |
if "extension" in item['track'][target_mode]: | |
self.codec_info = item['track'][target_mode]['extension'].upper() + " | " | |
self.codec_info += str(int(item['track'][target_mode]['bitrate'])/1000) + "kbps | " | |
if "sr" in item['track'][target_mode]: | |
self.codec_info += str(item['track'][target_mode]['sr']) + "Hz" | |
else: | |
self.codec_info += "44100Hz" | |
except: | |
print(item) | |
def get_detail_info(self): | |
# 当前播放歌曲名 | |
self.song_artist = self.title.split(" - ")[-1] | |
self.song_name = self.title.split(" - " + self.title.split(" - ")[-1])[0] | |
# 播放列表 JSON | |
playlist_path = os.getenv('APPDATA')[:-7] + "Local\\Netease\\CloudMusic\\webdata\\file\\queue" | |
playlist_raw_json = "" | |
with open(playlist_path, 'rb') as playlist_text: | |
playlist_raw_json = playlist_text.read().decode("utf-8") | |
self.playlist = json.loads(playlist_raw_json) | |
# 读取信息 | |
self.parse_netease_json() | |
# 检测是否读取成功,如果是FM,应该是会失败的 | |
if not self.codec_info: | |
# FM 播放列表 | |
playlist_path = os.getenv('APPDATA')[:-7] + "Local\\Netease\\CloudMusic\\webdata\\file\\history" | |
playlist_raw_json = "" | |
with open(playlist_path, 'rb') as playlist_text: | |
playlist_raw_json = playlist_text.read().decode("utf-8") | |
self.playlist = json.loads(playlist_raw_json) | |
# 读取信息 | |
self.parse_netease_json() | |
if __name__ == '__main__': | |
cherrypy.server.socket_host = '0.0.0.0' | |
cherrypy.quickstart(FoorbarAPI()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment