Last active
December 21, 2021 15:41
-
-
Save remram44/3fe8e27894f9c0af0e1acf8506054fbf to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pulsectl # pip install pulsectl==18.12.5 | |
import time | |
# (other apps) -----------------------+ | |
# | | |
# v | |
# microphone --> call --> pr-out -- mon --> loopback --> speakers | |
# | | | |
# | v | |
# | loopback | |
# | | | |
# v v | |
# loopback -> pr-rec -- mon --> recorder | |
def main(): | |
with pulsectl.Pulse('pulse-record') as pulse: | |
cleanup = [] | |
try: | |
# Locate default source/sink | |
mic = pulse.server_info().default_source_name | |
mic, = [source | |
for source in pulse.source_list() | |
if source.name == mic] | |
speakers = pulse.server_info().default_sink_name | |
speakers, = [sink | |
for sink in pulse.sink_list() | |
if sink.name == speakers] | |
# Build node for call output | |
output_mod = pulse.module_load('module-null-sink', args='sink_properties=device.description=pr-out') | |
cleanup.append(output_mod) | |
time.sleep(0.5) | |
output, = [sink | |
for sink in pulse.sink_list() | |
if sink.owner_module == output_mod] | |
output_monitor, = [source | |
for source in pulse.source_list() | |
if source.name == output.monitor_source_name] | |
# Build node for call output + mic, for recording | |
record_mod = pulse.module_load('module-null-sink', args='sink_properties=device.description=pr-rec') | |
cleanup.append(record_mod) | |
time.sleep(0.5) | |
record, = [sink | |
for sink in pulse.sink_list() | |
if sink.owner_module == record_mod] | |
record_monitor, = [source | |
for source in pulse.source_list() | |
if source.name == record.monitor_source_name] | |
# Send call output to speakers | |
lb = pulse.module_load( | |
'module-loopback', | |
'source=%s sink=%s latency_msec=1' % (output_monitor.name, speakers.name), | |
) | |
cleanup.append(lb) | |
# Restore defaults | |
pulse.sink_default_set(speakers) | |
pulse.source_default_set(mic) | |
# Send mic and call output to recording node | |
lb = pulse.module_load( | |
'module-loopback', | |
'source=%s sink=%s latency_msec=1' % (mic.name, record.name), | |
) | |
cleanup.append(lb) | |
lb = pulse.module_load( | |
'module-loopback', | |
'source=%s sink=%s latency_msec=1' % (output_monitor.name, record.name), | |
) | |
cleanup.append(lb) | |
print("Devices created") | |
# Now we wait for call to show up, to connect it to 'output' | |
# Also wait for audacity, to connect it to 'record_monitor' | |
audacity_found = False | |
calls = set() | |
while True: | |
if not audacity_found: | |
# Find audacity, route from 'record_monitor', set audacity_found | |
for rec in pulse.source_output_list(): | |
app = rec.proplist.get('application.name') | |
if app and 'audacity' in app: | |
print("Audacity found, wiring it up: %s" % app) | |
pulse.source_output_move(rec.index, record_monitor.index) | |
audacity_found = True | |
# Find calls, route to 'output' | |
for out in pulse.sink_input_list(): | |
if out.index in calls: | |
continue | |
app = out.proplist.get('application.name') | |
if app in ( | |
'WEBRTC VoiceEngine', # Zoom | |
'Chrome', 'Chromium', 'Google Chrome', | |
'Skype', | |
): | |
print("Call identified, recording: %s" % app) | |
pulse.sink_input_move(out.index, output.index) | |
calls.add(out.index) | |
# Remove terminated calls from list | |
calls.intersection_update( | |
out.index | |
for out in pulse.sink_input_list() | |
) | |
time.sleep(2) | |
except KeyboardInterrupt: | |
pass | |
finally: | |
print("Cleaning up") | |
for mod in reversed(cleanup): | |
pulse.module_unload(mod) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment