Skip to content

Instantly share code, notes, and snippets.

@jmpinit
Created June 7, 2025 21:07
Show Gist options
  • Save jmpinit/fe7188b9e99d928449ae82c50ac1ad24 to your computer and use it in GitHub Desktop.
Save jmpinit/fe7188b9e99d928449ae82c50ac1ad24 to your computer and use it in GitHub Desktop.
A Python 3 library to send data to the Houdini Pipe In CHOP.
import os
import socket
import struct
ESCAPE_CHARACTER = 170 # Reset/escape character from docs
class PipeInCHOP:
'''
Class to handle communication with the Houdini Pipe In CHOP. It
can communicate either via a named pipe (FIFO) or by setting up
a TCP server and waiting for an incoming connection from the
Pipe In CHOP.
'''
def __init__(self, path_or_host, port=None, fifo_permissions=0o666):
'''
If port is None, path_or_host is treated as a FIFO path.
Otherwise, path_or_host is treated as the host/IP address
to bind to, and port is a TCP port for listening.
'''
self.path_or_host = path_or_host
self.port = port
self.fifo_permissions = fifo_permissions
# For FIFO mode
self._file_obj = None
# For TCP server mode
self._server_socket = None
self._conn_socket = None # the connected client socket
def open_connection(self):
'''
Opens the FIFO or starts the TCP server and accepts one
incoming connection from the Pipe In CHOP.
'''
if self.port is None:
# Named pipe (FIFO) mode
if not os.path.exists(self.path_or_host):
# Create the FIFO if it doesn't exist
try:
old_mask = os.umask(0)
os.mkfifo(self.path_or_host, self.fifo_permissions)
finally:
os.umask(old_mask)
# Open the FIFO for binary writing
# Blocks until there is a reader (the Pipe In CHOP).
self._file_obj = open(self.path_or_host, 'wb')
else:
# Network (server) mode
self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# So we can re-bind quickly if we stop/start
self._server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._server_socket.bind((self.path_or_host, self.port))
self._server_socket.listen(1)
print(f'Listening on {self.path_or_host}:{self.port} ...')
self._conn_socket, addr = self._server_socket.accept()
print(f'Connection accepted from {addr}')
def close_connection(self):
'''
Closes the FIFO or the server socket and/or active client connection.
'''
# FIFO
if self._file_obj is not None:
self._file_obj.close()
self._file_obj = None
# TCP
if self._conn_socket is not None:
self._conn_socket.close()
self._conn_socket = None
if self._server_socket is not None:
self._server_socket.close()
self._server_socket = None
def _write_unescaped_byte(self, b):
'''
Writes a single raw byte b (0 <= b <= 255).
'''
if self._file_obj is not None:
self._file_obj.write(bytes([b]))
elif self._conn_socket is not None:
self._conn_socket.sendall(bytes([b]))
else:
raise RuntimeError('No open connection to write bytes.')
def _write_escaped_byte(self, b):
'''
Writes a single byte, escaping if it matches the special
ESCAPE_CHARACTER. The Pipe In CHOP uses an escape scheme
where if the byte == 170, it must be duplicated.
'''
if b == ESCAPE_CHARACTER:
# Write ESC + ESC
self._write_unescaped_byte(ESCAPE_CHARACTER)
self._write_unescaped_byte(ESCAPE_CHARACTER)
else:
self._write_unescaped_byte(b)
def _write_escaped_8bytes(self, raw_bytes):
'''
Given 8 raw bytes, writes them with the escape scheme.
(Pipe In CHOP expects 8-byte big-endian tokens, each escaped
if necessary.)
'''
for b in raw_bytes:
self._write_escaped_byte(b)
def _flush(self):
'''
Flush the file if possible; for sockets there's typically no
manual flush, but we keep this for consistency.
'''
if self._file_obj is not None:
self._file_obj.flush()
# For sockets, flush isn't strictly necessary.
def send_reset(self):
'''
Sends a reset sequence. According to the docs, "When this byte
is received, followed by a byte with a value of zero, the current
parsing is reset." The recommended reset is four times in order
to fill an 8-byte token boundary.
'''
# Each reset token is (ESC, 0). The doc example sends four times,
# presumably to pad out 8 bytes, so let's replicate exactly:
for _ in range(4):
self._write_unescaped_byte(ESCAPE_CHARACTER)
self._write_unescaped_byte(0)
self._flush()
def _pack_int64_be(self, val):
'''
Packs a Python integer into 8-byte big-endian format.
'''
return struct.pack('>q', val)
def _pack_float64_be(self, val):
'''
Packs a Python float into 8-byte big-endian format.
'''
return struct.pack('>d', val)
def _write_token_int64(self, val):
'''
Writes an int64 token (8-byte big-endian) with escaping.
'''
raw = self._pack_int64_be(val)
self._write_escaped_8bytes(raw)
def _write_token_float64(self, val):
'''
Writes a float64 token (8-byte big-endian) with escaping.
'''
raw = self._pack_float64_be(val)
self._write_escaped_8bytes(raw)
# -------------------------------------------------------------------------
# COMMANDS
def send_current_values(self, values):
'''
Command Type #1: Current Values
Sends the latest sample data for each channel. Pipe In CHOP
automatically sets length=1, sample rate=30, and start=0 by default.
Args:
values: List of float values, one for each channel.
'''
self.send_reset() # recommended
# Command type
self._write_token_int64(1)
# Number of channels
self._write_token_int64(len(values))
# Sample data
for val in values:
self._write_token_float64(val)
self._flush()
def send_waveform(self, samples, sample_rate, start_index=0.0):
'''
Command Type #2: Upload full waveform.
Args:
samples: 2D list (or list of lists) of floats.
Outer length = number of channels,
each inner list = waveform samples for that channel.
All channels must have the same length.
sample_rate: float, sample rate of the data
start_index: float, start time index
'''
if not samples:
return
# Validate consistent lengths
n_channels = len(samples)
length = len(samples[0])
for chan_data in samples:
if len(chan_data) != length:
raise ValueError('All channels must have the same number of samples.')
self.send_reset()
# Command type
self._write_token_int64(2)
# Track Length
self._write_token_int64(length)
# Sample Rate
self._write_token_float64(sample_rate)
# Start Index
self._write_token_float64(start_index)
# Number of Channels
self._write_token_int64(n_channels)
# Interleave the channel samples:
# For index i in 0..length-1, write samples[ch][i]
# for ch in 0..n_channels-1
for i in range(length):
for ch in range(n_channels):
self._write_token_float64(samples[ch][i])
self._flush()
def send_channel_names(self, names):
'''
Command Type #3: Set channel names.
Args:
names: list of strings representing channel names.
'''
if not names:
return
self.send_reset()
# Command type
self._write_token_int64(3)
# Number of names
self._write_token_int64(len(names))
# Each name must be written in 8-byte tokens.
for name in names:
# Convert name to bytes
b_name = name.encode('utf-8')
# Determine how many 8-byte chunks are needed
chunk_count = (len(b_name) + 7) // 8
self._write_token_int64(chunk_count)
# Write the actual data, padded with zeros if needed
total_bytes = chunk_count * 8
b_name_padded = b_name + b'\x00' * (total_bytes - len(b_name))
# Write each chunk in an 8-byte token
offset = 0
for _ in range(chunk_count):
chunk = b_name_padded[offset:offset+8]
self._write_escaped_8bytes(chunk)
offset += 8
self._flush()
def disconnect(self):
'''
Command Type #4: Disconnect. Causes the Pipe In CHOP to
disconnect the network connection. Not typically used for
FIFOs, but can be sent anyway.
'''
self.send_reset()
# Command type
self._write_token_int64(4)
self._flush()
def delay_refresh(self, seconds):
'''
Command Type #5: Delay refresh.
Args:
seconds: int, how many seconds to delay the refresh by.
'''
self.send_reset()
self._write_token_int64(5)
self._write_token_int64(seconds)
self._flush()
def execute_script(self, script):
'''
Command Type #6: Execute script. The script is sent in
8-byte tokens.
Args:
script: string with the script to execute.
'''
self.send_reset()
self._write_token_int64(6)
b_script = script.encode('utf-8')
chunk_count = (len(b_script) + 7) // 8
self._write_token_int64(chunk_count)
total_bytes = chunk_count * 8
b_script_padded = b_script + b'\x00' * (total_bytes - len(b_script))
offset = 0
for _ in range(chunk_count):
chunk = b_script_padded[offset:offset+8]
self._write_escaped_8bytes(chunk)
offset += 8
self._flush()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment