Created
June 30, 2020 00:25
-
-
Save rickybrent/11118f35b3a295242e9c9d70ef28513a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Adopted from https://menno.io/posts/pulseaudio_monitoring/ for a very different purpose. | |
# Needs: pacmd, xdotool, setting up .alsoftrc as below and changing Minecraft to the new output. | |
from signal import signal, SIGINT | |
import os | |
import sys | |
import time | |
from Queue import Queue | |
from ctypes import POINTER, c_ubyte, c_void_p, c_ulong, cast | |
# First download https://github.com/Valodim/python-pulseaudio | |
from pulseaudio.lib_pulseaudio import * | |
# Check out https://stackoverflow.com/questions/44608514/getting-volume-value-from-pulseaudio for a python3 port. | |
# Messy, and we can do it without pacmd, but this was easier to write. | |
SINK_NAME = 'minecraft_fish_out' | |
os.system("pacmd load-module module-null-sink sink_name=" + SINK_NAME + | |
" sink_properties=device.description=\"" + SINK_NAME + "\"") | |
# Will also need to mute everything but "friendly creatures", then make sure this is created: | |
# ~/.alsoftrc containing: | |
# [pulse] | |
# allow-moves=yes | |
METER_RATE = 80 | |
MAX_SAMPLE_VALUE = 127 | |
DISPLAY_SCALE = 1 | |
MAX_SPACES = MAX_SAMPLE_VALUE >> DISPLAY_SCALE | |
class PeakMonitor(object): | |
def __init__(self, sink_name, rate): | |
self.sink_name = sink_name | |
self.rate = rate | |
# Wrap callback methods in appropriate ctypefunc instances so | |
# that the Pulseaudio C API can call them | |
self._context_notify_cb = pa_context_notify_cb_t( | |
self.context_notify_cb) | |
self._sink_info_cb = pa_sink_info_cb_t(self.sink_info_cb) | |
self._stream_read_cb = pa_stream_request_cb_t(self.stream_read_cb) | |
# stream_read_cb() puts peak samples into this Queue instance | |
self._samples = Queue() | |
# Create the mainloop thread and set our context_notify_cb | |
# method to be called when there's updates relating to the | |
# connection to Pulseaudio | |
_mainloop = pa_threaded_mainloop_new() | |
_mainloop_api = pa_threaded_mainloop_get_api(_mainloop) | |
context = pa_context_new(_mainloop_api, 'peak_demo') | |
pa_context_set_state_callback(context, self._context_notify_cb, None) | |
pa_context_connect(context, None, 0, None) | |
pa_threaded_mainloop_start(_mainloop) | |
def __iter__(self): | |
while True: | |
yield self._samples.get() | |
def context_notify_cb(self, context, _): | |
state = pa_context_get_state(context) | |
if state == PA_CONTEXT_READY: | |
print "Pulseaudio connection ready..." | |
# Connected to Pulseaudio. Now request that sink_info_cb | |
# be called with information about the available sinks. | |
o = pa_context_get_sink_info_list( | |
context, self._sink_info_cb, None) | |
pa_operation_unref(o) | |
elif state == PA_CONTEXT_FAILED: | |
print "Connection failed" | |
elif state == PA_CONTEXT_TERMINATED: | |
print "Connection terminated" | |
def sink_info_cb(self, context, sink_info_p, _, __): | |
if not sink_info_p: | |
return | |
sink_info = sink_info_p.contents | |
print '-' * 60 | |
print 'index:', sink_info.index | |
print 'name:', sink_info.name | |
print 'description:', sink_info.description | |
print repr(sink_info) | |
if sink_info.name == self.sink_name: | |
# Found the sink we want to monitor for peak levels. | |
# Tell PA to call stream_read_cb with peak samples. | |
print 'setting up peak recording using', sink_info.monitor_source_name | |
samplespec = pa_sample_spec() | |
samplespec.channels = 2 | |
samplespec.format = PA_SAMPLE_U8 | |
samplespec.rate = self.rate | |
pa_stream = pa_stream_new( | |
context, "peak detect demo", samplespec, None) | |
pa_stream_set_read_callback(pa_stream, | |
self._stream_read_cb, | |
sink_info.index) | |
pa_stream_connect_record(pa_stream, | |
sink_info.monitor_source_name, | |
None, | |
PA_STREAM_PEAK_DETECT) | |
def stream_read_cb(self, stream, length, index_incr): | |
data = c_void_p() | |
pa_stream_peek(stream, data, c_ulong(length)) | |
data = cast(data, POINTER(c_ubyte)) | |
# for i in xrange(length): | |
i = 0 | |
while i < length: | |
# When PA_SAMPLE_U8 is used, samples values range from 128 | |
# to 255 because the underlying audio data is signed but | |
# it doesn't make sense to return signed peaks. | |
self._samples.put(((data[i] - 128), (data[i + 1] - 128))) | |
i += 2 | |
pa_stream_drop(stream) | |
def cleanup(signal_received, frame): | |
# Handle any cleanup here | |
print('Exiting gracefully') | |
os.system('pactl unload-module $(pactl list modules | grep sink_name=' + | |
SINK_NAME + '\ -B 3 | grep "#" | cut -f 2 -d "#")') | |
sys.exit(0) | |
def main(): | |
signal(SIGINT, cleanup) | |
lines = 2 | |
monitor = PeakMonitor(SINK_NAME, METER_RATE) | |
last_trigger = time.time() | |
for sampair in monitor: | |
sample1, sample2 = sampair | |
sample1 = sample1 >> DISPLAY_SCALE | |
bar1 = '>' * sample1 | |
spaces1 = ' ' * (MAX_SPACES - sample1) | |
sample2 = sample2 >> DISPLAY_SCALE | |
bar2 = '>' * sample2 | |
spaces2 = ' ' * (MAX_SPACES - sample2) | |
print ' %3d %s%s\n' % (sample1, bar1, spaces1), | |
print ' %3d %s%s\r' % (sample2, bar2, spaces2), | |
print u"\u001b[%dA" % lines | |
if sample1 > 2 and sample2 > 2: | |
if time.time() > (last_trigger + 5): | |
print ' %3d Passed threshold ' % sample1 | |
os.system("xdotool click 3 && sleep 1 && xdotool click 3 &") | |
last_trigger = time.time() | |
sys.stdout.flush() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment