Last active
June 1, 2023 01:17
-
-
Save cucmberium/8ccaa808e2452c73dc582b79402ee14b to your computer and use it in GitHub Desktop.
Playing Be-Music Script (BMS) with Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import re | |
import tempfile | |
import time | |
import uuid | |
from functools import cmp_to_key | |
import pygame | |
import tqdm | |
class BMSInfo: | |
NORMAL_BAR_LENGTH = 65536 | |
BAR_LENGTH_PER_BPM = NORMAL_BAR_LENGTH / 240 | |
def __init__(self, path): | |
self.path = path | |
self.header = {} | |
self.wav = {} | |
self.stop = {} | |
self.bpm = {} | |
self.bar_data = [{"bar": i, "length": 1.0} for i in range(1000)] | |
self.objects = [] | |
self.bpm_objects = [] | |
self.load() | |
@staticmethod | |
def detect_encoding(path): | |
for encoding in ["utf-8", "cp932", "shift-jis"]: | |
try: | |
open(path, encoding=encoding).read() | |
return encoding | |
except: | |
pass | |
return None | |
@staticmethod | |
def objects_compare(x, y): | |
if x["score_time"] != y["score_time"]: | |
return x["score_time"] - y["score_time"] | |
else: | |
return x["channel"] - y["channel"] | |
def load(self): | |
encoding = BMSInfo.detect_encoding(self.path) | |
if encoding is None: | |
raise ValueError("cannot detect bms file encoding") | |
lines = [line.rstrip().upper() for line in open(self.path, encoding=encoding) if line.startswith("#")] | |
for line in lines: | |
if line.startswith("#RANK "): | |
self.header["rank"] = int(line.replace("#RANK ", "").strip()) | |
elif line.startswith("#TOTAL "): | |
self.header["total"] = float(line.replace("#TOTAL ", "").strip()) | |
elif line.startswith("#TITLE "): | |
self.header["title"] = line.replace("#TITLE ", "").strip() | |
elif line.startswith("#BPM "): | |
self.header["bpm"] = float(line.replace("#BPM ", "").strip()) | |
elif re.match(r"#WAV[0-9A-Z]{2}", line): | |
self.wav[line[4:6]] = line[7:].strip() | |
elif re.match(r"#BPM[0-9A-Z]{2}", line): | |
self.bpm[line[4:6]] = float(line[7:].strip()) | |
elif re.match(r"#STOP[0-9A-Z]{2}", line): | |
self.stop[line[5:7]] = float(line[8:].strip()) | |
elif line[1:6].isnumeric(): | |
bar = int(line[1:4]) | |
channel = int(line[4:6]) | |
if channel != 2: # bar length | |
continue | |
self.bar_data[bar]["length"] = float(line.split(":")[-1]) | |
if "bpm" not in self.header: | |
raise ValueError("no bpm info found") | |
score_time = 0 | |
for bar in range(len(self.bar_data)): | |
self.bar_data[bar]["score_time"] = score_time | |
score_time += int(BMSInfo.NORMAL_BAR_LENGTH * self.bar_data[bar]["length"]) | |
for line in lines: | |
if line[1:6].isnumeric(): | |
bar = int(line[1:4]) | |
channel = int(line[4:6]) | |
data = line.split(":")[-1] | |
self.load_object(bar, channel, data) | |
self.bpm_objects = sorted(self.bpm_objects, key=cmp_to_key(BMSInfo.objects_compare)) | |
self.bpm_objects.insert( | |
0, | |
{ | |
"channel": 3, | |
"bar": 0, | |
"bpm": self.header["bpm"], | |
"stop": 0, | |
"score_time": 0, | |
}, | |
) | |
self.initialize_bpm_objects() | |
self.objects = sorted(self.objects, key=cmp_to_key(BMSInfo.objects_compare)) | |
self.initialize_objects() | |
def initialize_bpm_objects(self): | |
bpm = self.bpm_objects[0]["bpm"] | |
elapsed_score_time = 0 | |
elapsed_real_time = 0.0 | |
i = 0 | |
while i < len(self.bpm_objects): | |
elapsed_real_time += (self.bpm_objects[i]["score_time"] - elapsed_score_time) / ( | |
bpm * BMSInfo.BAR_LENGTH_PER_BPM | |
) | |
self.bpm_objects[i]["real_time"] = elapsed_real_time | |
if self.bpm_objects[i]["channel"] == 9: | |
elapsed_real_time += (240 * self.bpm_objects[i]["stop"]) / (bpm * 192) | |
self.bpm_objects[i]["bpm"] = bpm | |
obj = { | |
"channel": 9, | |
"bar": self.bpm_objects[i]["bar"], | |
"bpm": bpm, | |
"stop": 0, | |
"score_time": self.bpm_objects[i]["score_time"], | |
"real_time": elapsed_real_time, | |
} | |
i += 1 | |
self.bpm_objects.insert(i, obj) | |
elapsed_score_time = self.bpm_objects[i]["score_time"] | |
bpm = self.bpm_objects[i]["bpm"] | |
i += 1 | |
def initialize_objects(self): | |
i = 0 | |
j = 0 | |
while i < len(self.objects): | |
while ( | |
j < len(self.bpm_objects) - 1 and self.objects[i]["score_time"] > self.bpm_objects[j + 1]["score_time"] | |
): | |
j += 1 | |
current_bpm_object = self.bpm_objects[j] | |
real_time = current_bpm_object["real_time"] + ( | |
self.objects[i]["score_time"] - current_bpm_object["score_time"] | |
) / (current_bpm_object["bpm"] * BMSInfo.BAR_LENGTH_PER_BPM) | |
self.objects[i]["real_time"] = real_time | |
i += 1 | |
def load_object(self, bar, channel, data): | |
if channel == 2: # bar length | |
return | |
elif channel == 3: # bpm change | |
commands = [data[i : i + 2] for i in range(0, len(data), 2)] | |
for e, command in enumerate(commands): | |
if command in {"00", "0"}: | |
continue | |
obj = { | |
"channel": channel, | |
"bar": bar, | |
"bpm": int(command, 16), | |
"stop": 0, | |
"score_time": self.bar_data[bar]["score_time"] | |
+ int(BMSInfo.NORMAL_BAR_LENGTH * self.bar_data[bar]["length"] * e / len(commands)), | |
} | |
self.bpm_objects.append(obj) | |
elif channel == 8: # ex bpm change | |
commands = [data[i : i + 2] for i in range(0, len(data), 2)] | |
for e, command in enumerate(commands): | |
if command in {"00", "0"}: | |
continue | |
obj = { | |
"channel": channel, | |
"bar": bar, | |
"bpm": self.bpm[command], | |
"stop": 0, | |
"score_time": self.bar_data[bar]["score_time"] | |
+ int(BMSInfo.NORMAL_BAR_LENGTH * self.bar_data[bar]["length"] * e / len(commands)), | |
} | |
self.bpm_objects.append(obj) | |
elif channel == 9: # stop sequence | |
commands = [data[i : i + 2] for i in range(0, len(data), 2)] | |
for e, command in enumerate(commands): | |
if command in {"00", "0"}: | |
continue | |
obj = { | |
"channel": channel, | |
"bar": bar, | |
"bpm": 0, | |
"stop": self.stop[command], | |
"score_time": self.bar_data[bar]["score_time"] | |
+ int(BMSInfo.NORMAL_BAR_LENGTH * self.bar_data[bar]["length"] * e / len(commands)), | |
} | |
self.bpm_objects.append(obj) | |
else: | |
commands = [data[i : i + 2] for i in range(0, len(data), 2)] | |
for e, command in enumerate(commands): | |
if command in {"00", "0"}: | |
continue | |
obj = { | |
"channel": channel, | |
"bar": bar, | |
"data": command, | |
"score_time": self.bar_data[bar]["score_time"] | |
+ int(BMSInfo.NORMAL_BAR_LENGTH * self.bar_data[bar]["length"] * e / len(commands)), | |
} | |
self.objects.append(obj) | |
def main(): | |
pygame.mixer.init() | |
pygame.mixer.set_num_channels(64) | |
bms_path = r"./[zyosouzai]sakuzyo_destr0yer_ogg./7INSANE.bml" | |
bms_dir = os.path.dirname(bms_path) | |
bms = BMSInfo(bms_path) | |
bms_sound_cache = {} | |
with tempfile.TemporaryDirectory() as tmpdir: | |
for key, path in tqdm.tqdm(bms.wav.items()): | |
path = path.lower().replace(".ogg", ".wav") | |
if os.path.exists(os.path.join(bms_dir, path)): | |
bms_sound_cache[key] = pygame.mixer.Sound(os.path.join(bms_dir, path)) | |
elif os.path.exists(os.path.join(bms_dir, path.replace(".wav", ".ogg"))): | |
try: | |
import ffmpeg | |
except ImportError: | |
raise ValueError("ffmpeg is required for loading .ogg file") | |
p = os.path.join(tmpdir, f"{uuid.uuid4()}.wav") | |
ffmpeg.input(os.path.join(bms_dir, path.replace(".wav", ".ogg"))).output(p).run(quiet=True) | |
bms_sound_cache[key] = pygame.mixer.Sound(p) | |
bgm_channels = {1} | |
player1_note_channels = {11, 12, 13, 14, 15, 18, 19, 16} | |
player2_note_channels = {21, 22, 23, 24, 25, 28, 29, 26} | |
player1_longnote_channels = {51, 52, 53, 54, 55, 58, 59, 56} | |
player2_longnote_channels = {61, 62, 63, 64, 65, 68, 69, 66} | |
target_channels = ( | |
bgm_channels | |
| player1_note_channels | |
| player2_note_channels | |
| player1_longnote_channels | |
| player2_longnote_channels | |
) | |
length = max( | |
[ | |
obj["real_time"] + bms_sound_cache[obj["data"]].get_length() | |
for obj in bms.objects | |
if obj["channel"] in target_channels and obj["data"] in bms_sound_cache | |
] | |
) | |
longnote_info = {channel: False for channel in (player1_longnote_channels | player2_longnote_channels)} | |
played_obj_count = 0 | |
start_time = time.perf_counter() | |
progress_bar = tqdm.tqdm(total=int(length) + 1, bar_format="{percentage:3.0f}%|{bar}| {n}s/{total}s") | |
while True: | |
current_time = time.perf_counter() | |
real_time = current_time - start_time | |
for obj in bms.objects[played_obj_count:]: | |
if obj["real_time"] > real_time: | |
break | |
played_obj_count += 1 | |
if obj["channel"] not in target_channels: | |
continue | |
if obj["data"] not in bms_sound_cache: | |
continue | |
if obj["channel"] in longnote_info: | |
if not longnote_info[obj["channel"]]: | |
bms_sound_cache[obj["data"]].play() | |
longnote_info[obj["channel"]] = not longnote_info[obj["channel"]] | |
else: | |
bms_sound_cache[obj["data"]].play() | |
if real_time >= length: | |
break | |
progress_bar.n = int(real_time) | |
progress_bar.refresh() | |
time.sleep(0.001) | |
progress_bar.n = int(length) + 1 | |
progress_bar.refresh() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment