Skip to content

Instantly share code, notes, and snippets.

@rickybrent
Created June 30, 2020 00:25
Show Gist options
  • Save rickybrent/11118f35b3a295242e9c9d70ef28513a to your computer and use it in GitHub Desktop.
Save rickybrent/11118f35b3a295242e9c9d70ef28513a to your computer and use it in GitHub Desktop.
# Adopted from https://menno.io/posts/pulseaudio_monitoring/ for a very different purpose.
# Needs: pacmd, xdotool, setting up .alsoftrc as below and changing Minecraft to the new output.
from signal import signal, SIGINT
import os
import sys
import time
from Queue import Queue
from ctypes import POINTER, c_ubyte, c_void_p, c_ulong, cast
# First download https://github.com/Valodim/python-pulseaudio
from pulseaudio.lib_pulseaudio import *
# Check out https://stackoverflow.com/questions/44608514/getting-volume-value-from-pulseaudio for a python3 port.
# Messy, and we can do it without pacmd, but this was easier to write.
SINK_NAME = 'minecraft_fish_out'
os.system("pacmd load-module module-null-sink sink_name=" + SINK_NAME +
" sink_properties=device.description=\"" + SINK_NAME + "\"")
# Will also need to mute everything but "friendly creatures", then make sure this is created:
# ~/.alsoftrc containing:
# [pulse]
# allow-moves=yes
METER_RATE = 80
MAX_SAMPLE_VALUE = 127
DISPLAY_SCALE = 1
MAX_SPACES = MAX_SAMPLE_VALUE >> DISPLAY_SCALE
class PeakMonitor(object):
def __init__(self, sink_name, rate):
self.sink_name = sink_name
self.rate = rate
# Wrap callback methods in appropriate ctypefunc instances so
# that the Pulseaudio C API can call them
self._context_notify_cb = pa_context_notify_cb_t(
self.context_notify_cb)
self._sink_info_cb = pa_sink_info_cb_t(self.sink_info_cb)
self._stream_read_cb = pa_stream_request_cb_t(self.stream_read_cb)
# stream_read_cb() puts peak samples into this Queue instance
self._samples = Queue()
# Create the mainloop thread and set our context_notify_cb
# method to be called when there's updates relating to the
# connection to Pulseaudio
_mainloop = pa_threaded_mainloop_new()
_mainloop_api = pa_threaded_mainloop_get_api(_mainloop)
context = pa_context_new(_mainloop_api, 'peak_demo')
pa_context_set_state_callback(context, self._context_notify_cb, None)
pa_context_connect(context, None, 0, None)
pa_threaded_mainloop_start(_mainloop)
def __iter__(self):
while True:
yield self._samples.get()
def context_notify_cb(self, context, _):
state = pa_context_get_state(context)
if state == PA_CONTEXT_READY:
print "Pulseaudio connection ready..."
# Connected to Pulseaudio. Now request that sink_info_cb
# be called with information about the available sinks.
o = pa_context_get_sink_info_list(
context, self._sink_info_cb, None)
pa_operation_unref(o)
elif state == PA_CONTEXT_FAILED:
print "Connection failed"
elif state == PA_CONTEXT_TERMINATED:
print "Connection terminated"
def sink_info_cb(self, context, sink_info_p, _, __):
if not sink_info_p:
return
sink_info = sink_info_p.contents
print '-' * 60
print 'index:', sink_info.index
print 'name:', sink_info.name
print 'description:', sink_info.description
print repr(sink_info)
if sink_info.name == self.sink_name:
# Found the sink we want to monitor for peak levels.
# Tell PA to call stream_read_cb with peak samples.
print
print 'setting up peak recording using', sink_info.monitor_source_name
print
samplespec = pa_sample_spec()
samplespec.channels = 2
samplespec.format = PA_SAMPLE_U8
samplespec.rate = self.rate
pa_stream = pa_stream_new(
context, "peak detect demo", samplespec, None)
pa_stream_set_read_callback(pa_stream,
self._stream_read_cb,
sink_info.index)
pa_stream_connect_record(pa_stream,
sink_info.monitor_source_name,
None,
PA_STREAM_PEAK_DETECT)
def stream_read_cb(self, stream, length, index_incr):
data = c_void_p()
pa_stream_peek(stream, data, c_ulong(length))
data = cast(data, POINTER(c_ubyte))
# for i in xrange(length):
i = 0
while i < length:
# When PA_SAMPLE_U8 is used, samples values range from 128
# to 255 because the underlying audio data is signed but
# it doesn't make sense to return signed peaks.
self._samples.put(((data[i] - 128), (data[i + 1] - 128)))
i += 2
pa_stream_drop(stream)
def cleanup(signal_received, frame):
# Handle any cleanup here
print('Exiting gracefully')
os.system('pactl unload-module $(pactl list modules | grep sink_name=' +
SINK_NAME + '\ -B 3 | grep "#" | cut -f 2 -d "#")')
sys.exit(0)
def main():
signal(SIGINT, cleanup)
lines = 2
monitor = PeakMonitor(SINK_NAME, METER_RATE)
last_trigger = time.time()
for sampair in monitor:
sample1, sample2 = sampair
sample1 = sample1 >> DISPLAY_SCALE
bar1 = '>' * sample1
spaces1 = ' ' * (MAX_SPACES - sample1)
sample2 = sample2 >> DISPLAY_SCALE
bar2 = '>' * sample2
spaces2 = ' ' * (MAX_SPACES - sample2)
print ' %3d %s%s\n' % (sample1, bar1, spaces1),
print ' %3d %s%s\r' % (sample2, bar2, spaces2),
print u"\u001b[%dA" % lines
if sample1 > 2 and sample2 > 2:
if time.time() > (last_trigger + 5):
print ' %3d Passed threshold ' % sample1
os.system("xdotool click 3 && sleep 1 && xdotool click 3 &")
last_trigger = time.time()
sys.stdout.flush()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment