Last active
April 23, 2019 03:17
-
-
Save drscotthawley/7c8337582f55191989d96db277be3c7d to your computer and use it in GitHub Desktop.
Sends incoming OSC messages (subject to some filter) to Wekinator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
''' | |
osc2wek.py | |
Author: Scott Hawley | |
This listens for incoming OSC messages and sends them on to Wekinator | |
Steps to get running (in Terminal): | |
0. First you need Mercurial "hg". It might be installed by default. | |
1. Use hg to clone the grail osc code: | |
hg clone https://bitbucket.org/grailapp/osc | |
If this gives you an error, run 'ln -s /usr/local/bin/python /usr/local/opt/python/bin/python2.7' | |
2. cd osc | |
3. python setup.py install | |
4. cd .. | |
5. ./osc2wek.py --ip (your computer's network ip, probably not 127.0.0.1) | |
When I run it with TouchOSC for the x-y controls in the 3rd panel, I run | |
./osc2wek.py --ip 169.254.156.214 -f /3/xy | |
NOTE: I find this takes a few seconds before Wekinator will start showing a green light on "OSC in" | |
Old PyOSC only worked with Python 2, so this uses code from https://osc.readthedocs.io/en/latest/index.html | |
''' | |
import argparse | |
import math | |
from osc import OSCServer,OSCMessage, OSCClient | |
import time | |
def add_to_dict(msg, variables, locked=False): | |
"""use a dictionary to store unique variable names, and their values""" | |
for i, arg in enumerate(msg.args): | |
in_str = msg.address+':'+str(i) # some unique string for each part of each variable (list) | |
if (not locked) or (in_str in variables): # allow new variables to be added | |
variables[in_str] = arg | |
#print(", ",len(variables)," variables: ",variables) # useful for debugging | |
class RerouteServer(OSCServer): | |
"""Routes all incoming OSC messages matching a certain port & filter string | |
to wekinator. Only allows new varialbes within the first _15_ seconds, | |
after which new inputs are "locked out" | |
(this is to avoid accidentally breaking things if you have an extra stray input) | |
""" | |
def __init__(self, client, address='127.0.0.1', inport=8000, filter='', | |
out_address='/wek/inputs', lock_time=15): | |
self.client = client | |
self.variables= {} | |
self.start, self.lock_time = time.time(), lock_time | |
self.filter = filter | |
self.out_address = out_address | |
super(RerouteServer, self).__init__(address=address, port=inport) | |
def handle(self, address, message, msg_time): # When new OSC messages arrive | |
if not ( hasattr(message,'is_bundle') and message.is_bundle() ): | |
message = [message] # convert single message to a list | |
for msg in message: | |
if not self.filter in msg.address: # ignore messages we're not listening for | |
break | |
# only allow a certain number of seconds to 'record' new types of inputs | |
locked = time.time() - self.start > self.lock_time | |
add_to_dict(msg, self.variables, locked=locked) | |
print(f"Received {msg.address}, sending {len(self.variables)} variables to {self.out_address}") | |
# create an OSC message consisting of all variables, & send it | |
out_msg = OSCMessage(address=self.out_address) | |
for key in self.variables: | |
out_msg.add(self.variables[key]) | |
client.send(out_msg) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--ip", default="127.0.0.1", help="The ip to listen on") | |
parser.add_argument("--inport", type=int, default=8000, help="The port to listen on") | |
parser.add_argument("--outport",type=int, default=6448, help="The port to send to") | |
parser.add_argument("-f", "--filter", default='', help="Listen for OSC messages containing this string") | |
args = parser.parse_args() | |
client = OSCClient(args.ip, args.outport) | |
server = RerouteServer(client, args.ip, args.inport, args.filter) | |
print("Listening on {}".format(server.server_address)) | |
server.serve_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment