Created
June 7, 2025 21:07
-
-
Save jmpinit/fe7188b9e99d928449ae82c50ac1ad24 to your computer and use it in GitHub Desktop.
A Python 3 library to send data to the Houdini Pipe In CHOP.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import socket | |
import struct | |
ESCAPE_CHARACTER = 170 # Reset/escape character from docs | |
class PipeInCHOP: | |
''' | |
Class to handle communication with the Houdini Pipe In CHOP. It | |
can communicate either via a named pipe (FIFO) or by setting up | |
a TCP server and waiting for an incoming connection from the | |
Pipe In CHOP. | |
''' | |
def __init__(self, path_or_host, port=None, fifo_permissions=0o666): | |
''' | |
If port is None, path_or_host is treated as a FIFO path. | |
Otherwise, path_or_host is treated as the host/IP address | |
to bind to, and port is a TCP port for listening. | |
''' | |
self.path_or_host = path_or_host | |
self.port = port | |
self.fifo_permissions = fifo_permissions | |
# For FIFO mode | |
self._file_obj = None | |
# For TCP server mode | |
self._server_socket = None | |
self._conn_socket = None # the connected client socket | |
def open_connection(self): | |
''' | |
Opens the FIFO or starts the TCP server and accepts one | |
incoming connection from the Pipe In CHOP. | |
''' | |
if self.port is None: | |
# Named pipe (FIFO) mode | |
if not os.path.exists(self.path_or_host): | |
# Create the FIFO if it doesn't exist | |
try: | |
old_mask = os.umask(0) | |
os.mkfifo(self.path_or_host, self.fifo_permissions) | |
finally: | |
os.umask(old_mask) | |
# Open the FIFO for binary writing | |
# Blocks until there is a reader (the Pipe In CHOP). | |
self._file_obj = open(self.path_or_host, 'wb') | |
else: | |
# Network (server) mode | |
self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
# So we can re-bind quickly if we stop/start | |
self._server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
self._server_socket.bind((self.path_or_host, self.port)) | |
self._server_socket.listen(1) | |
print(f'Listening on {self.path_or_host}:{self.port} ...') | |
self._conn_socket, addr = self._server_socket.accept() | |
print(f'Connection accepted from {addr}') | |
def close_connection(self): | |
''' | |
Closes the FIFO or the server socket and/or active client connection. | |
''' | |
# FIFO | |
if self._file_obj is not None: | |
self._file_obj.close() | |
self._file_obj = None | |
# TCP | |
if self._conn_socket is not None: | |
self._conn_socket.close() | |
self._conn_socket = None | |
if self._server_socket is not None: | |
self._server_socket.close() | |
self._server_socket = None | |
def _write_unescaped_byte(self, b): | |
''' | |
Writes a single raw byte b (0 <= b <= 255). | |
''' | |
if self._file_obj is not None: | |
self._file_obj.write(bytes([b])) | |
elif self._conn_socket is not None: | |
self._conn_socket.sendall(bytes([b])) | |
else: | |
raise RuntimeError('No open connection to write bytes.') | |
def _write_escaped_byte(self, b): | |
''' | |
Writes a single byte, escaping if it matches the special | |
ESCAPE_CHARACTER. The Pipe In CHOP uses an escape scheme | |
where if the byte == 170, it must be duplicated. | |
''' | |
if b == ESCAPE_CHARACTER: | |
# Write ESC + ESC | |
self._write_unescaped_byte(ESCAPE_CHARACTER) | |
self._write_unescaped_byte(ESCAPE_CHARACTER) | |
else: | |
self._write_unescaped_byte(b) | |
def _write_escaped_8bytes(self, raw_bytes): | |
''' | |
Given 8 raw bytes, writes them with the escape scheme. | |
(Pipe In CHOP expects 8-byte big-endian tokens, each escaped | |
if necessary.) | |
''' | |
for b in raw_bytes: | |
self._write_escaped_byte(b) | |
def _flush(self): | |
''' | |
Flush the file if possible; for sockets there's typically no | |
manual flush, but we keep this for consistency. | |
''' | |
if self._file_obj is not None: | |
self._file_obj.flush() | |
# For sockets, flush isn't strictly necessary. | |
def send_reset(self): | |
''' | |
Sends a reset sequence. According to the docs, "When this byte | |
is received, followed by a byte with a value of zero, the current | |
parsing is reset." The recommended reset is four times in order | |
to fill an 8-byte token boundary. | |
''' | |
# Each reset token is (ESC, 0). The doc example sends four times, | |
# presumably to pad out 8 bytes, so let's replicate exactly: | |
for _ in range(4): | |
self._write_unescaped_byte(ESCAPE_CHARACTER) | |
self._write_unescaped_byte(0) | |
self._flush() | |
def _pack_int64_be(self, val): | |
''' | |
Packs a Python integer into 8-byte big-endian format. | |
''' | |
return struct.pack('>q', val) | |
def _pack_float64_be(self, val): | |
''' | |
Packs a Python float into 8-byte big-endian format. | |
''' | |
return struct.pack('>d', val) | |
def _write_token_int64(self, val): | |
''' | |
Writes an int64 token (8-byte big-endian) with escaping. | |
''' | |
raw = self._pack_int64_be(val) | |
self._write_escaped_8bytes(raw) | |
def _write_token_float64(self, val): | |
''' | |
Writes a float64 token (8-byte big-endian) with escaping. | |
''' | |
raw = self._pack_float64_be(val) | |
self._write_escaped_8bytes(raw) | |
# ------------------------------------------------------------------------- | |
# COMMANDS | |
def send_current_values(self, values): | |
''' | |
Command Type #1: Current Values | |
Sends the latest sample data for each channel. Pipe In CHOP | |
automatically sets length=1, sample rate=30, and start=0 by default. | |
Args: | |
values: List of float values, one for each channel. | |
''' | |
self.send_reset() # recommended | |
# Command type | |
self._write_token_int64(1) | |
# Number of channels | |
self._write_token_int64(len(values)) | |
# Sample data | |
for val in values: | |
self._write_token_float64(val) | |
self._flush() | |
def send_waveform(self, samples, sample_rate, start_index=0.0): | |
''' | |
Command Type #2: Upload full waveform. | |
Args: | |
samples: 2D list (or list of lists) of floats. | |
Outer length = number of channels, | |
each inner list = waveform samples for that channel. | |
All channels must have the same length. | |
sample_rate: float, sample rate of the data | |
start_index: float, start time index | |
''' | |
if not samples: | |
return | |
# Validate consistent lengths | |
n_channels = len(samples) | |
length = len(samples[0]) | |
for chan_data in samples: | |
if len(chan_data) != length: | |
raise ValueError('All channels must have the same number of samples.') | |
self.send_reset() | |
# Command type | |
self._write_token_int64(2) | |
# Track Length | |
self._write_token_int64(length) | |
# Sample Rate | |
self._write_token_float64(sample_rate) | |
# Start Index | |
self._write_token_float64(start_index) | |
# Number of Channels | |
self._write_token_int64(n_channels) | |
# Interleave the channel samples: | |
# For index i in 0..length-1, write samples[ch][i] | |
# for ch in 0..n_channels-1 | |
for i in range(length): | |
for ch in range(n_channels): | |
self._write_token_float64(samples[ch][i]) | |
self._flush() | |
def send_channel_names(self, names): | |
''' | |
Command Type #3: Set channel names. | |
Args: | |
names: list of strings representing channel names. | |
''' | |
if not names: | |
return | |
self.send_reset() | |
# Command type | |
self._write_token_int64(3) | |
# Number of names | |
self._write_token_int64(len(names)) | |
# Each name must be written in 8-byte tokens. | |
for name in names: | |
# Convert name to bytes | |
b_name = name.encode('utf-8') | |
# Determine how many 8-byte chunks are needed | |
chunk_count = (len(b_name) + 7) // 8 | |
self._write_token_int64(chunk_count) | |
# Write the actual data, padded with zeros if needed | |
total_bytes = chunk_count * 8 | |
b_name_padded = b_name + b'\x00' * (total_bytes - len(b_name)) | |
# Write each chunk in an 8-byte token | |
offset = 0 | |
for _ in range(chunk_count): | |
chunk = b_name_padded[offset:offset+8] | |
self._write_escaped_8bytes(chunk) | |
offset += 8 | |
self._flush() | |
def disconnect(self): | |
''' | |
Command Type #4: Disconnect. Causes the Pipe In CHOP to | |
disconnect the network connection. Not typically used for | |
FIFOs, but can be sent anyway. | |
''' | |
self.send_reset() | |
# Command type | |
self._write_token_int64(4) | |
self._flush() | |
def delay_refresh(self, seconds): | |
''' | |
Command Type #5: Delay refresh. | |
Args: | |
seconds: int, how many seconds to delay the refresh by. | |
''' | |
self.send_reset() | |
self._write_token_int64(5) | |
self._write_token_int64(seconds) | |
self._flush() | |
def execute_script(self, script): | |
''' | |
Command Type #6: Execute script. The script is sent in | |
8-byte tokens. | |
Args: | |
script: string with the script to execute. | |
''' | |
self.send_reset() | |
self._write_token_int64(6) | |
b_script = script.encode('utf-8') | |
chunk_count = (len(b_script) + 7) // 8 | |
self._write_token_int64(chunk_count) | |
total_bytes = chunk_count * 8 | |
b_script_padded = b_script + b'\x00' * (total_bytes - len(b_script)) | |
offset = 0 | |
for _ in range(chunk_count): | |
chunk = b_script_padded[offset:offset+8] | |
self._write_escaped_8bytes(chunk) | |
offset += 8 | |
self._flush() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment