Skip to content

Instantly share code, notes, and snippets.

@josephbima
Created April 9, 2021 15:32
Show Gist options
  • Select an option

  • Save josephbima/1455022a3d9e1d4e460b6fd438d96b95 to your computer and use it in GitHub Desktop.

Select an option

Save josephbima/1455022a3d9e1d4e460b6fd438d96b95 to your computer and use it in GitHub Desktop.
import asyncio
import os
from concurrent.futures import ThreadPoolExecutor
import PySpin
BUFFER_SIZE = 1500
NUM_IMAGES = 4000 # The number of images to capture
NUM_SAVERS = 100 # The number of saver coroutines
# The directories to save to camera i will save images to SAVE_DIRS[i]
SAVE_DIRS = ['K:/Google Drive/Katz-Lab_Otter-Data/Projects/3D-Behavior/code/Test1','K:/Google Drive/Katz-Lab_Otter-Data/Projects/3D-Behavior/code/Test2']
async def acquire_images(queue: asyncio.Queue, cam: PySpin.Camera):
"""
A coroutine that captures `NUM_IMAGES` images from `cam` and puts them along
with the camera serial number as a tuple into the `queue`.
"""
# Set up camera
cam_id = cam.GetUniqueID()
# cam.Init()
cam.BeginAcquisition()
prev_frame_ID = 0
# Acquisition loop
for i in range(NUM_IMAGES):
img = cam.GetNextImage(3000)
frame_ID = img.GetFrameID()
if img.IsIncomplete():
print('WARNING: img incomplete', frame_ID,
'with status',
PySpin.Image_GetImageStatusDescription(img.GetImageStatus()))
prev_frame_ID = frame_ID
continue
if frame_ID != prev_frame_ID + 1:
print('WARNING: skipped frames', frame_ID)
prev_frame_ID = frame_ID
queue.put_nowait((img, cam_id))
print('Queue size:', queue.qsize())
print('[{}] Acquired image {}'.format(cam_id, frame_ID))
await asyncio.sleep(0) # This is necessary for context switches
print(f'current NUM_IMAGES iteration: {i}')
# Clean up
await queue.join() # Wait for all images to be saved before EndAcquisition
cam.EndAcquisition()
cam.DeInit()
del cam
async def main():
# Set up cam_list and queue
system = PySpin.System.GetInstance()
cam_list = system.GetCameras()
queue = asyncio.Queue()
# print(len(cam_list))
# Match serial numbers to save locations
assert len(cam_list) <= len(SAVE_DIRS), 'More cameras than save directories'
camera_sns = [cam.GetUniqueID() for cam in cam_list]
print(camera_sns)
save_dir_per_cam = dict(zip(camera_sns, SAVE_DIRS))
# Init cam and changed buffer size here
for cam in cam_list:
cam.Init()
s_node_map = cam.GetTLStreamNodeMap()
# Retrieve Buffer Handling Mode Information
handling_mode = PySpin.CEnumerationPtr(s_node_map.GetNode('StreamBufferHandlingMode'))
if not PySpin.IsAvailable(handling_mode) or not PySpin.IsWritable(handling_mode):
print('Unable to set Buffer Handling mode (node retrieval). Aborting...\n')
return False
handling_mode_entry = PySpin.CEnumEntryPtr(handling_mode.GetCurrentEntry())
if not PySpin.IsAvailable(handling_mode_entry) or not PySpin.IsReadable(handling_mode_entry):
print('Unable to set Buffer Handling mode (Entry retrieval). Aborting...\n')
return False
# Set stream buffer Count Mode to manual
stream_buffer_count_mode = PySpin.CEnumerationPtr(s_node_map.GetNode('StreamBufferCountMode'))
if not PySpin.IsAvailable(stream_buffer_count_mode) or not PySpin.IsWritable(stream_buffer_count_mode):
print('Unable to set Buffer Count Mode (node retrieval). Aborting...\n')
return False
stream_buffer_count_mode_manual = PySpin.CEnumEntryPtr(stream_buffer_count_mode.GetEntryByName('Manual'))
if not PySpin.IsAvailable(stream_buffer_count_mode_manual) or not PySpin.IsReadable(stream_buffer_count_mode_manual):
print('Unable to set Buffer Count Mode entry (Entry retrieval). Aborting...\n')
return False
stream_buffer_count_mode.SetIntValue(stream_buffer_count_mode_manual.GetValue())
print('Stream Buffer Count Mode set to manual...')
# Retrieve and modify Stream Buffer Count
buffer_count = PySpin.CIntegerPtr(s_node_map.GetNode('StreamBufferCountManual'))
if not PySpin.IsAvailable(buffer_count) or not PySpin.IsWritable(buffer_count):
print('Unable to set Buffer Count (Integer node retrieval). Aborting...\n')
# return False
# Display Buffer Info
print('\nDefault Buffer Handling Mode: %s' % handling_mode_entry.GetDisplayName())
print('Default Buffer Count: %d' % buffer_count.GetValue())
print('Maximum Buffer Count: %d' % buffer_count.GetMax())
handling_mode_entry = handling_mode.GetEntryByName('OldestFirstOverwrite')
handling_mode.SetIntValue(handling_mode_entry.GetValue())
buffer_count.SetValue(BUFFER_SIZE)
print('Buffer count now set to: %d' % buffer_count.GetValue())
print('Now Buffer Handling Mode: %s' % handling_mode_entry.GetDisplayName())
# Start the acquisition and save coroutines
print('Starting acquisition')
acquisition = [asyncio.gather(acquire_images(queue, cam)) for cam in cam_list]
savers = [asyncio.gather(save_images(queue, save_dir_per_cam)) for _ in range(NUM_SAVERS)]
# Wait for all images to be captured and saved
await asyncio.gather(*acquisition)
print('Acquisition complete.')
# Cancel the now idle savers
for c in savers:
c.cancel()
# Clean up
cam_list.Clear()
system.ReleaseInstance()
# The event loop and Thread Pool Executor are global for convenience.
loop = asyncio.get_event_loop()
tpe = ThreadPoolExecutor(None)
loop.run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment