Skip to content

Instantly share code, notes, and snippets.

@FrancescoCaracciolo
Created July 28, 2025 09:33
Show Gist options
  • Select an option

  • Save FrancescoCaracciolo/7e8190d2328f5a76e07d8152c67af746 to your computer and use it in GitHub Desktop.

Select an option

Save FrancescoCaracciolo/7e8190d2328f5a76e07d8152c67af746 to your computer and use it in GitHub Desktop.
VRM Extension for Nyarch Assistant
from .extensions import NewelleExtension
from urllib.parse import urlencode, urljoin
from http.server import HTTPServer, SimpleHTTPRequestHandler
from .handlers.avatar import AvatarHandler
from .handlers.tts import TTSHandler
from .handlers import HandlerDescription, ExtraSettings
import threading
import os
import subprocess
import json
from pydub import AudioSegment
from livepng import LivePNG
from gi.repository import Gtk, WebKit, GLib
from time import sleep
from .utility.strings import rgb_to_hex
class VRMAvatarExtension(NewelleExtension):
id = "vrmavatar"
name="VRM Avatar"
def get_avatar_handlers(self) -> list[dict]:
return [HandlerDescription("vrm", "VRM Avatar", "3D Avatars in VRM format", VRMHandler)]
class VRMHandler(AvatarHandler):
key = "vrm"
_wait_js : threading.Event
_wait_js2 : threading.Event
_expressions_raw : list[str]
_motions_raw : list[str]
def __init__(self, settings, path: str):
super().__init__(settings, path)
self._expressions_raw = []
self._motions_raw = []
self._wait_js = threading.Event()
self._wait_js2 = threading.Event()
self.webview_path = os.path.join(path, "avatars", "vrm", "web")
self.models_dir = os.path.join(self.webview_path, "models")
self.webview = None
def get_available_models(self):
file_list = []
for root, _, files in os.walk(self.models_dir):
for file in files:
if file.endswith('.vrm'):
file_name = file.rstrip('.vrm')
relative_path = os.path.relpath(os.path.join(root, file), self.models_dir)
file_list.append((file_name, relative_path))
return file_list
def model_updated(self):
self.settings_update()
def get_model(self):
m = self.get_setting("model", False)
return "models/model.vrm" if m is None else m
def get_extra_settings(self) -> list:
widget = Gtk.Box()
color = widget.get_style_context().lookup_color('window_bg_color')[1]
default = rgb_to_hex(color.red, color.green, color.blue)
return [
{
"key": "model",
"title": _("VRM Model"),
"description": _("VRM Model to use"),
"type": "combo",
"values": self.get_available_models(),
"default": "models/model.vrm",
"folder": os.path.abspath(self.models_dir),
"refresh": lambda x: self.settings_update(),
"update_settings": True
},
{
"key": "fps",
"title": _("Lipsync Framerate"),
"description": _("Maximum amount of frames to generate for lipsync"),
"type": "range",
"min": 5,
"max": 30,
"default": 10.0,
"round-digits": 0
},
{
"key": "background-color",
"title": _("Background Color"),
"description": _("Background color of the avatar"),
"type": "entry",
"default": default,
},
]
def is_installed(self) -> bool:
return os.path.isdir(self.webview_path)
def install(self):
subprocess.check_output(["git", "clone", "https://github.com/NyarchLinux/VRM-Web-Viewer", self.webview_path])
def __start_webserver(self):
folder_path = self.webview_path
class CustomHTTPRequestHandler(SimpleHTTPRequestHandler):
def translate_path(self, path):
# Get the default translate path
path = super().translate_path(path)
# Replace the default directory with the specified folder path
return os.path.join(folder_path, os.path.relpath(path, os.getcwd()))
self.httpd = HTTPServer(('127.0.0.1', 0), CustomHTTPRequestHandler)
httpd = self.httpd
model = self.get_setting("model")
background_color = self.get_setting("background-color")
scale = int(self.get_setting("scale", False, 100))/100
q = urlencode({"model": "models/" + model, "bg": background_color, "scale": scale})
GLib.idle_add(self.webview.load_uri, urljoin("http://localhost:" + str(httpd.server_address[1]), f"?{q}"))
def update_expressions():
sleep(2)
self.get_expressions()
self.get_motions()
threading.Thread(target=update_expressions).start()
httpd.serve_forever()
def create_gtk_widget(self) -> Gtk.Widget:
self.webview = WebKit.WebView()
self.webview.connect("destroy", self.destroy)
threading.Thread(target=self.__start_webserver).start()
self.webview.set_hexpand(True)
self.webview.set_vexpand(True)
settings = self.webview.get_settings()
settings.set_enable_webaudio(True)
settings.set_media_playback_requires_user_gesture(False)
self.webview.set_is_muted(False)
self.webview.set_settings(settings)
return self.webview
def destroy(self, add=None):
self.httpd.shutdown()
self.webview = None
def wait_emotions(self, object, result):
value = self.webview.evaluate_javascript_finish(result)
self._expressions_raw = json.loads(value.to_string())
self._wait_js.set()
def get_expressions_raw(self, allow_webview=True):
try:
if len(self._expressions_raw) > 0:
return self._expressions_raw
if self.webview is None or not allow_webview:
m = self.get_setting(self.get_model() + " expressions", False)
return m if m is not None else []
self._expressions_raw = []
script = "get_expressions_json()"
self.webview.evaluate_javascript(script, len(script), callback=self.wait_emotions)
self._wait_js.wait(3)
self.set_setting(self.get_model() + " expressions", self._expressions_raw)
except Exception as e:
return []
return self._expressions_raw
def convert_motion(self, motion: str):
if motion in self.get_motions_raw():
return motion
for motion in self.get_motions_raw():
name = self.get_setting("Expression " + motion, False)
if name is not None:
if name == motion:
return motion
return None
def convert_expression(self, expression: str):
if expression in self.get_expressions_raw():
return expression
for expression in self.get_expressions_raw():
name = self.get_setting("Expression " + expression, False)
if name is not None:
if name == expression:
return expression
return None
def get_expressions(self) -> list[str]:
r = []
for expression in self.get_expressions_raw():
if expression is None:
continue
name = self.get_setting("Expression " + expression, False)
if name is not None:
r.append(name)
else:
r.append(expression)
return r
def get_motions(self) -> list[str]:
r = []
for motion in self.get_motions_raw():
print(motion)
name = self.get_setting("Expression " + motion, False, None)
if name is not None:
r.append(name)
else:
if type(motion) is str:
r.append(motion)
print(r)
return r
def get_motions_groups(self):
if len(self._motions_raw) > 0:
return self._motions_raw
self._motions_raw = []
script = "get_motions_json()"
self.webview.evaluate_javascript(script, len(script), callback=self.wait_motions)
self._wait_js2.wait(3)
return self._motions_raw
def get_motions_raw(self, allow_webview=True):
if self.webview is None or not allow_webview:
m = self.get_setting(self.get_model() + " motions", False)
return m if m is not None else []
r = []
groups = self.get_motions_groups()
r = groups
if allow_webview:
self.set_setting(self.get_model() + " motions", r)
return r
def wait_motions(self, object, result):
value = self.webview.evaluate_javascript_finish(result)
self._motions_raw = json.loads(value.to_string())
self._wait_js2.set()
def do_motion(self, motion : str):
motion = self.convert_motion(motion)
if motion is None:
return
script = "do_motion('{}')".format(motion)
self.webview.evaluate_javascript(script, len(script))
pass
def set_expression(self, expression : str):
exp = self.convert_expression(expression)
if exp is None:
return
script = "set_expression('{}')".format(exp)
self.webview.evaluate_javascript(script, len(script))
pass
def speak(self, path: str, tts: TTSHandler, frame_rate: int):
tts.stop()
audio = AudioSegment.from_file(path)
sample_rate = audio.frame_rate
audio_data = audio.get_array_of_samples()
amplitudes = LivePNG.calculate_amplitudes(sample_rate, audio_data, frame_rate=frame_rate)
t1 = threading.Thread(target=self._start_animation, args=(amplitudes, frame_rate))
t2 = threading.Thread(target=tts.playsound, args=(path, ))
t1.start()
t2.start()
t1.join()
t2.join()
def _start_animation(self, amplitudes: list[float], frame_rate=10):
max_amplitude = max(amplitudes)
for amplitude in amplitudes:
if self.stop_request:
self.set_mouth(0)
return
self.set_mouth(amplitude/max_amplitude)
sleep(1/frame_rate)
def set_mouth(self, value):
script = "set_mouth_y({})".format(value)
self.webview.evaluate_javascript(script, len(script))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment