Skip to content

Instantly share code, notes, and snippets.

@cboulay
Last active July 21, 2019 08:04
Show Gist options
  • Save cboulay/7b13d074730d454c9a3e29e91c0a8956 to your computer and use it in GitHub Desktop.
Save cboulay/7b13d074730d454c9a3e29e91c0a8956 to your computer and use it in GitHub Desktop.
A script to replay pupil-labs/hmd-eyes data to trigger calibration and gaze processing
import os
import zmq
from time import sleep, time
import numpy as np
import sys
sys.path.append('E:\SachsLab\Tools\VR\Pupil\pupil')
import pupil_src.shared_modules.file_methods as fm
from pupil_src.shared_modules import zmq_tools
# Make sure pupil capture is running but turn off its eye sources.
# Use the pupil capture GUI "Pupil Remote" section to find the port and put it here
port = 50020 # The port defaults to 50020 but can be modified in the GUI.
ip = 'localhost' # If you talk to a different machine use its IP.
# datapath = os.path.abspath(r'D:\The Ottawa Hospital\Sachs Lab - Documents\RawData\Human\Study-Michael\EEG_NoGo\sub-03\ses-beta03look\eyetrack')
datapath = os.path.abspath(r'E:\Chad\The Ottawa Hospital\Sachs Lab - Documents\RawData\Human\Study-Michael\EEG_NoGo\sub-03\ses-nocap\eyetrack')
# Then we setup some connections to its IPC backbone.
zmq_ctx = zmq.Context()
# The requester talks to Pupil remote
requester = zmq_ctx.socket(zmq.REQ)
requester.connect('tcp://%s:%s' % (ip, port))
requester.send_string('PUB_PORT')
pub_port = requester.recv_string()
requester.close()
# Setup publisher
ipc_pub = zmq_tools.Msg_Streamer(zmq_ctx, 'tcp://%s:%s' % (ip, pub_port))
sleep(1) # see Async connect in the paragraphs below
# Emulate Unity running the HMD calibration
# https://github.com/pupil-labs/pupil/blob/master/pupil_src/shared_modules/calibration_routines/hmd_calibration.py
# Start calibration plugin
n = {"subject": "start_plugin", "name": "HMD_Calibration_3D"}
n["topic"] = "notify." + n["subject"]
ipc_pub.send(n)
# Create message to publish
# e.g. notification to calibrate
# This assumes an IPD of 66.7
# https://github.com/pupil-labs/hmd-eyes/blob/develop/plugin/Scripts/Calibration.cs#L46-L67
n = {
'subject': 'calibration.should_start',
'hmd_video_frame_size': [1000, 1000],
'outlier_threshold': 35,
'translation_eye0': [33.35, 0, 0], # right
'translation_eye1': [-33.35, 0, 0] # left
}
n["topic"] = "notify." + n["subject"]
ipc_pub.send(n)
# Send reference data with "calibration.add_ref_data"
# https://github.com/pupil-labs/hmd-eyes/blob/develop/plugin/Scripts/Calibration.cs#L92-L98
notif_dat = fm.load_pldata_file(datapath, "notify")
ts = notif_dat.timestamps
notif_dat = notif_dat.data[-1]
ref_list = notif_dat['ref_list']
n = {'subject': 'calibration.add_ref_data', 'ref_data': tuple([dict(_) for _ in ref_list])}
n["topic"] = "notify." + n["subject"]
ipc_pub.send(n)
# Send pupil data through normal events
def dictify(_dat):
import types
if isinstance(_dat, types.MappingProxyType) or isinstance(_dat, fm.Serialized_Dict):
_dat = dict(_dat)
if isinstance(_dat, dict):
for k, v in _dat.items():
_dat[k] = dictify(v)
return _dat
pupil_list = notif_dat['pupil_list']
for pupil_datum in pupil_list:
temp = dictify(pupil_datum)
ipc_pub.send(temp)
n = {'subject': 'calibration.should_stop'}
n["topic"] = "notify." + n["subject"]
ipc_pub.send(n)
# Send pupil data through normal events.
pupil_dat = fm.load_pldata_file(datapath, "pupil")
gaze_dat = fm.load_pldata_file(datapath, "gaze")
data = np.asarray(pupil_dat.data)
ts = np.asarray(pupil_dat.timestamps)
now_timestamps = ts - ts[0] + time()
for ix, pupil_datum in enumerate(data):
while time() < now_timestamps[ix]:
sleep(0.001)
ipc_pub.send(dictify(pupil_datum))
@cboulay
Copy link
Author

cboulay commented Apr 29, 2019

I used this script to test the changes described in this issue: pupil-labs/pupil#1493

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment