Skip to content

Instantly share code, notes, and snippets.

@Sg4Dylan
Last active October 10, 2023 07:31
Show Gist options
  • Save Sg4Dylan/3b2528d27dd49473b8af56b4caf02efc to your computer and use it in GitHub Desktop.
Save Sg4Dylan/3b2528d27dd49473b8af56b4caf02efc to your computer and use it in GitHub Desktop.
网易云音乐远程控制
#!/usr/bin/env python
#coding:utf-8
# Author: Sg4Dylan -- <sg4dylan#gmail.com>
# Update: 10/13/2017
'''
透过 Chrome Debug Protocol 读取网易云音乐当前播放状态
启动播放器时附带启动参数
--remote-debugging-port=8080
安装依赖
pip install requests cproto BeautifulSoup4
'''
import json
import requests
from bs4 import BeautifulSoup
from cproto import CProto
def get_neteast_status(remote_host, remote_port):
def get_node_id(children, name):
for i in children:
if name in i.get('nodeName',''):
return i['nodeId']
if 'children' in i:
return get_node_id(i['children'], name)
def detect_if_fm():
raw_api = requests.get('http://{0}:{1}/json'.format(remote_host, remote_port))
page_url = json.loads(raw_api.content.decode('UTF-8'))[0]['url']
if page_url.endswith('fm/'):
return True
return False
def play_status(cp):
status_result = {
'is_pause': False,
'time_now': '',
'time_all': ''
}
# Get body nodeId
raw_dom_info = cp.DOM.getDocument()
root_children = raw_dom_info['result']['root']['children']
body_nodeid = get_node_id(root_children, 'BODY')
# Get body HTML
body_html = cp.DOM.getOuterHTML(nodeId=body_nodeid)['result']['outerHTML']
# Get play status
body_soup = BeautifulSoup(body_html, "html.parser")
target_dom = body_soup.select("[class~=btnp]")
# Detect FM page
index_id = 0
if detect_if_fm():
index_id = 1
status_result['is_pause'] = True if 'btnp-play' in target_dom[index_id].get('class') else False
status_result['time_now'] = body_soup.select("[class~=now]")[index_id].contents[0]
status_result['time_all'] = body_soup.select("[class~=all]")[index_id].contents[0]
return status_result
# Create CProto instance and connect to a browser over CDP
cp = CProto(host=remote_host, port=remote_port)
result = play_status(cp)
cp.close()
return result
print(get_neteast_status('127.0.0.1',8080))
# {'is_pause': True, 'time_now': '00:11', 'time_all': '03:41'}
# NeteaseMusicController - impl from fb2kcontroller
# version: alpha 0.7
"""
依赖:
pip install cherrypy psutil requests pymediainfo
pip install https://github.com/AndreMiras/pycaw/archive/master.zip
pywin32 <- 这个自己找预编译安装包搞定
pymediainfo <- 这个库需要自己放一个 MediaInfo.dll (与Python的编译架构相同) 放在环境变量目录里
设定:
启动播放器时附带启动参数
--remote-debugging-port=8088
手机下载 foobar2000controller 并与电脑在同一局域网
填写电脑的 IP
端口是 8080
用户名密码空着
Changelog:
alpha 0.7
- 透过 CDP 协议读播放状态
alpha 0.6
- 修复一个读取歌曲信息的遗漏BUG
- 加入播放列表切换功能(仅支持顺序循环播放功能)
alpha 0.5
- 完善播放列表(修复BUG)
alpha 0.4
- 实现播放列表显示(仍有BUG)
alpha 0.3
- 修复了私人FM状态读取
alpha 0.2
- 修复了本地音乐读取错误的问题
alpha 0.1
- 初次完成编写
TODO:
- 实现进度条读取(完成)
- 实现播放状态检测(完成)
- 实现点击播放列表切换歌曲(咕咕一半)
"""
# 基础库
import cherrypy
import json
# 键盘动作控制
import win32api
import time
# 音量控制
from ctypes import cast, POINTER
from comtypes import CLSCTX_ALL
from pycaw.pycaw import AudioUtilities, IAudioEndpointVolume
import math
# 读取播放信息
import os
import json
import string
import psutil
from ctypes import wintypes
import win32con
import win32gui
import win32process
from pymediainfo import MediaInfo
from get_play_status_from_netease_music import get_neteast_status
# 封面图
import requests
import uuid
class FoorbarAPI(object):
codec_info = ""
song_name = ""
song_artist = ""
song_album = ""
play_status = 1
eta_time = ""
total_time = ""
cover_link = ""
now_volume = ""
playlist_now = ""
now_foc = 0
headers = {
'Accept-Encoding': 'gzip, deflate, sdch',
'Accept-Language': 'en-US,en;q=0.8,zh-CN;q=0.6,zh;q=0.4',
'DNT': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2873.0 Safari/537.36',
}
devices = AudioUtilities.GetSpeakers()
interface = devices.Activate(IAudioEndpointVolume._iid_, CLSCTX_ALL, None)
volume = cast(interface, POINTER(IAudioEndpointVolume))
@cherrypy.expose
def index(self):
return "OK"
@cherrypy.expose
def foobar2000controller(self, cmd="", param3="", param1=""):
self.api_update_info()
# 检查连接
if cmd=="FormatTitles" and param1=="":
return "[]"
if cmd=="FormatTitles" and param1=="%path%":
return "[]"
# 队列信息
if cmd=="GetQueue" and param3=="NoResponse":
return ""
# 获取分类风格信息
if param3=="library.json":
return """{
"queryInfo": [],
"query": [""]
}"""
# 获取歌曲长度
if cmd=="FormatTitles" and param1=="%length_seconds%":
return "[\"%s\"]" % self.total_time
# 读取当前信息
if param3=="info.json":
return self.api_info()
# 歌曲库
if param3=="playlist.json":
return self.playlist_now
# 文件访问
if param3=="browser.json":
return """{
"path": [],
"pathcurrent": "",
"parent": "%20",
"browser": []
}"""
# 版本
if cmd=="Version":
return ""
# 播放列表页
if cmd=="PlaylistItemsPerPage":
return ""
# 返回歌曲的封面
if "albumart" in cherrypy.url():
return self.api_cover("covers/"+cherrypy.url().split("albumart_")[1])
# 停止 or 播放
if cmd=="PlayOrPause":
self.api_startorpause()
# 播放列表切歌
if cmd=="Start":
# 重播这一曲
if not param1:
self.api_replay()
# 切换
else:
self.api_change_now_play(int(param1))
# 上一曲
if cmd=="StartPrevious":
self.api_play_previous()
# 下一曲
if cmd=="StartNext":
self.api_play_next()
# 音量控制
if cmd=="Volume":
if param1:
self.api_volume_control(param1)
return "OK"
def api_cover(self, download_img_name):
if not os.path.isfile(download_img_name):
curl_command = "curl -o " + download_img_name + \
" -A \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) " \
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2906.0 Safari/537.36\" "
final_command = curl_command + self.cover_link
os.system(final_command)
cherrypy.response.headers['Content-Type'] = "image/jpg"
return open(download_img_name, 'rb')
def api_change_now_play(self, target_pos):
if target_pos - self.now_foc >= 0:
for i in range(target_pos - self.now_foc):
self.api_play_next()
time.sleep(0.15)
else:
for i in range(self.now_foc - target_pos):
self.api_play_previous()
time.sleep(0.15)
def api_startorpause(self):
win32api.keybd_event(0xB3, win32api.MapVirtualKey(0xB3, 0))
def api_replay(self):
win32api.keybd_event(0xB1, win32api.MapVirtualKey(0xB1, 0))
time.sleep(0.05)
win32api.keybd_event(0xB0, win32api.MapVirtualKey(0xB0, 0))
def api_play_previous(self):
win32api.keybd_event(0xB1, win32api.MapVirtualKey(0xB1, 0))
def api_play_next(self):
win32api.keybd_event(0xB0, win32api.MapVirtualKey(0xB0, 0))
def api_volume_control(self, value):
target_volume = 34.5 * math.log(int(value)/200, 10)
self.volume.SetMasterVolumeLevel(target_volume, None)
def api_detect_play_info(self):
# 读窗体标题识别歌曲名, 读文件句柄识别当前打开文件
window_title,file_path,self.song_name,self.song_artist,self.song_album,self.total_time,self.cover_link,self.codec_info = ReadTitlePath().get_info()
# 检测当前音量
self.now_volume = str(int((10 ** (self.volume.GetMasterVolumeLevel()/34.5))*200))
def api_detect_play_status(self):
# 播放状态
status_dict = get_neteast_status('127.0.0.1',8088)
self.play_status = 0 if status_dict['is_pause'] else 1
time_now = status_dict['time_now'].split(':',1)
self.eta_time = str(int(self.total_time)-60*int(time_now[0])-int(time_now[1]))
pass
def api_update_info(self):
self.api_detect_play_info()
self.api_detect_play_status()
self.playlist_now,self.now_foc = ConvertPlaylist(self.song_name,self.song_artist,self.song_album).parse_netease_json()
def api_info(self):
json_dict = {"codec":"","helper2":"","isPlaying":"","isPaused":"","elapsedTime":"0","trackLength":"","volume":"100","volumeDB":"0","order":"1","albumArt":"","itemplaying":"0","page":"1","itemsPage":"60","search":"","sac":"","nowPlayingRating":""}
# 当前歌曲编码信息
json_dict["codec"] = self.codec_info
# 歌手 歌曲名 专辑名
json_dict["helper2"] = self.song_artist.replace("-","–") + " - " + self.song_name.replace("-","–") + " - " + self.song_album.replace("-","–")
# 播放状态
json_dict["isPlaying"] = "1" if self.play_status == 1 else "0"
json_dict["isPaused"] = "1" if self.play_status == 0 else "0"
# 剩余时间
json_dict["elapsedTime"] = self.eta_time
# 歌曲时间
json_dict["trackLength"] = self.total_time
# 随机化音乐封面名称
json_dict["albumArt"] = "/foobar2000controller/albumart_" + str(uuid.uuid5(uuid.NAMESPACE_URL, json_dict["helper2"]))
# 音量
json_dict["volume"] = self.now_volume
# 返回 JSON
response_json = json.dumps(json_dict, ensure_ascii=False)
return response_json.encode('utf-8')
class ConvertPlaylist:
json_str = ""
file_name = os.getenv('APPDATA')[:-7] + "Local\\Netease\\CloudMusic\\webdata\\file\\queue"
song_name = ""
song_artist = ""
song_album = ""
def __init__(self, song_name, song_artist, song_album):
file = ""
self.song_name, self.song_artist, self.song_album = song_name, song_artist, song_album
with open(self.file_name,"rb") as target:
file = target.read().decode("utf-8")
self.json_str = json.loads(file)
def parse_netease_json(self):
count_ = 0
song_time = 0
playlist = {'playlist':{'js':[],"t":"16:48:20","pa":"1","pp":"1","page":"0","pages":"1","foc":"1","ip":"1","ipp":"999","ic":"249"},
"order":"1",
"playlists":{"js":[{"name":"网易云音乐","count":"0"}],"ac":"1","pl":"1","page":"0","t":""}
}
song_foc = 0
for item in self.json_str:
count_ += 1
temp_dict = {}
temp_dict['artist'] = item['track']['artists'][0]['name']
temp_dict['album'] = item['track']['album']['name']
temp_dict['track'] = item['track']['name']
song_time +=item['track']['duration']
temp_dict['len'] = "%s:%s" % (str(int(item['track']['duration']/60000)) , str(int(item['track']['duration']/1000 % 60)))
temp_dict['rating'] = "?"
temp_dict['queued'] = ""
temp_dict['playCount'] = "1"
# 填充当前播放位置
if temp_dict['artist'] == self.song_artist and temp_dict['album'] == self.song_album and temp_dict['track'] == self.song_name:
song_foc = count_ - 1
# 写入列表
playlist['playlist']['js'].append(temp_dict)
playlist['playlist']['ic'] = str(count_)
playlist['playlists']['js'][0]['count'] = str(count_)
playlist['playlist']['foc'] = str(song_foc)
playlist['playlist']['ip'] = str(song_foc)
playlist['playlist']['t'] = time.strftime('%H:%M:%S',time.gmtime(song_time))
playlist['playlists']['t'] = playlist['playlist']['t']
return json.dumps(playlist, ensure_ascii=False), song_foc
class ReadTitlePath:
title = ""
target_path = ""
codec_info = ""
song_name = ""
song_artist = ""
song_album = ""
song_len = ""
cover_link = ""
playlist = ""
def enumWindowsProc(self, hwnd, lParam):
if (lParam is None) or ((lParam is not None) and (win32process.GetWindowThreadProcessId(hwnd)[1] == lParam)):
text = win32gui.GetWindowText(hwnd)
if text:
wStyle = win32api.GetWindowLong(hwnd, win32con.GWL_STYLE)
if wStyle & win32con.WS_VISIBLE:
self.title = text
def enumProcWnds(self, pid=None):
win32gui.EnumWindows(self.enumWindowsProc, pid)
def getProcName(self, procName):
pid_list = []
for proc in psutil.process_iter():
if proc.name() == procName:
pid = str(proc)
rig = pid.split('=',3)
pid = rig[1]
rig = pid.split(',',2)
pid = rig[0]
pid_list.append(pid)
return pid_list
def get_info(self):
pid = self.getProcName("cloudmusic.exe")
for x in range(0, len(pid)):
self.enumProcWnds(int(pid[x]))
if self.title != '':
target_process = psutil.Process(int(pid[x]))
self.target_path = ""
for x in target_process.open_files():
if "mp3" in x.path or "aac" in x.path or "flac" in x.path or "ape" in x.path:
self.target_path = x.path
self.get_detail_info()
return self.title, self.target_path, self.song_name, self.song_artist, self.song_album, self.song_len, self.cover_link, self.codec_info
def parse_netease_json(self):
for item in self.playlist:
if self.song_name in item['track']['name']:
self.song_artist = item['track']['artists'][0]['name']
self.song_album = item['track']['album']['name']
self.cover_link = item['track']['album']['picUrl']
self.song_len = str(int(int(item['track']['duration'])/1000))
# 文件信息读取
read_else = True
try:
if "playFile" in item:
media_info = MediaInfo.parse(item['playFile'])
track = media_info.tracks[1]
codec_t = media_info.tracks[0].codec
self.codec_info = codec_t.upper() + " | " + str(int(track.bit_rate/1000)) + "kbps | " + str(int(track.sampling_rate)) + "Hz"
read_else = False
elif "yunSong" in item['track']:
if item['track']['yunSong']:
self.codec_info = item['track']['yunSong']['fileExt'][1:].upper() + " | " + str(int(item['track']['yunSong']['bitrate'])) + "kbps"
read_else = False
else:
read_else = True
if read_else:
target_mode = "hMusic"
if 'bMusic' in item['track']:
if item['track']['bMusic']:
target_mode = "bMusic"
if 'lMusic' in item['track']:
if item['track']['lMusic']:
target_mode = "lMusic"
if 'mMusic' in item['track']:
if item['track']['mMusic']:
target_mode = "mMusic"
if 'hMusic' in item['track']:
if item['track']['hMusic']:
target_mode = "hMusic"
if "extension" in item['track'][target_mode]:
self.codec_info = item['track'][target_mode]['extension'].upper() + " | "
self.codec_info += str(int(item['track'][target_mode]['bitrate'])/1000) + "kbps | "
if "sr" in item['track'][target_mode]:
self.codec_info += str(item['track'][target_mode]['sr']) + "Hz"
else:
self.codec_info += "44100Hz"
except:
print(item)
def get_detail_info(self):
# 当前播放歌曲名
self.song_artist = self.title.split(" - ")[-1]
self.song_name = self.title.split(" - " + self.title.split(" - ")[-1])[0]
# 播放列表 JSON
playlist_path = os.getenv('APPDATA')[:-7] + "Local\\Netease\\CloudMusic\\webdata\\file\\queue"
playlist_raw_json = ""
with open(playlist_path, 'rb') as playlist_text:
playlist_raw_json = playlist_text.read().decode("utf-8")
self.playlist = json.loads(playlist_raw_json)
# 读取信息
self.parse_netease_json()
# 检测是否读取成功,如果是FM,应该是会失败的
if not self.codec_info:
# FM 播放列表
playlist_path = os.getenv('APPDATA')[:-7] + "Local\\Netease\\CloudMusic\\webdata\\file\\history"
playlist_raw_json = ""
with open(playlist_path, 'rb') as playlist_text:
playlist_raw_json = playlist_text.read().decode("utf-8")
self.playlist = json.loads(playlist_raw_json)
# 读取信息
self.parse_netease_json()
if __name__ == '__main__':
cherrypy.server.socket_host = '0.0.0.0'
cherrypy.quickstart(FoorbarAPI())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment