Skip to content

Instantly share code, notes, and snippets.

@fredvol
Last active April 20, 2022 14:38
Show Gist options
  • Save fredvol/8981c93d6e35de3a2ca42f454841894d to your computer and use it in GitHub Desktop.
Save fredvol/8981c93d6e35de3a2ca42f454841894d to your computer and use it in GitHub Desktop.
#%%
####################################################################
# wind tunel crappy file
#
# This script only read the data send via usb/serial by an esp32
#
#
# 17/03/2022
# https://crappy.readthedocs.io/en/latest/installation.html
#%% import
import datetime
from multiprocessing.connection import Pipe, Connection
import os
import tkinter as tk
from csv import reader
from pathlib import Path
from sys import platform
from time import time
from typing import Callable, Union
import crappy
import numpy as np
# from ..tool import Camera_config
from crappy._global import CrappyStop, OptionalModule
from crappy.blocks import Block
# from .block import Block
from crappy.camera import camera_list
# from ..camera import camera_list
from crappy.tool import Camera_config
# from .._global import OptionalModule
try:
import SimpleITK as Sitk
except (ModuleNotFoundError, ImportError):
Sitk = None
try:
import PIL
except (ModuleNotFoundError, ImportError):
PIL = None
try:
import cv2
except (ModuleNotFoundError, ImportError):
cv2 = OptionalModule("opencv-python")
#%% CONFIG
# Log
log_folder = "logs"
study_name = "test_cam"
path_raw = Path(__file__).parent / log_folder / study_name / "data_raw.csv"
path_event = Path(__file__).parent / log_folder / study_name / "event.csv"
path = Path(__file__).parent / log_folder / study_name / "data.csv"
# %% class definition
# GUI
class WT_GUI(crappy.blocks.Block):
"""Block to send a signal based on a user input."""
def __init__(
self,
label: str = ["t0", "step", "state", "cmd"],
spam: bool = False,
font_label=("Arial", 15),
ext_link=None, # external link to cahnge save image folder
) -> None:
super().__init__()
self.spam = spam # Send the values only once or at each loop ?
self.i = 0 # The value to be sent
self.state = False # The value to be sent
self.t_last_change = datetime.datetime.now().timestamp() # The value to be sent
self.label_font = font_label
self.abort = False
self.ext_link = ext_link
if isinstance(label, list):
self.labels = label
else:
self.labels = ["t0", label]
def prepare(self) -> None:
self.root = tk.Tk()
self.root.title("GUI block")
self.root.protocol("WM_DELETE_WINDOW", self.end)
self.label_title = tk.Label(
self.root,
text=f"Study name : {study_name} \n ---- \n",
font=self.label_font,
)
self.label_title.pack()
self.label = tk.Label(self.root, text="step: 0", font=self.label_font)
self.label.pack()
self.label_state = tk.Label(
self.root, text=f"State: {self.state}", font=self.label_font
)
self.label_state.pack()
self.label_run_time = tk.Label(
self.root,
text=f"run: {datetime.datetime.now().timestamp() - self.t_last_change} s",
font=self.label_font,
)
self.label_run_time.pack()
self.button = tk.Button(
self.root, text="Next step WT", command=self.callback, bg="#5CA098"
)
self.button.pack()
self.button_state = tk.Button(
self.root, text="toogle state", command=self.callback_state, bg="#5CA098"
)
self.button_state.pack()
self.send([0, self.i, self.state])
def loop(self) -> None:
self.label_run_time.configure(
text=f"run: {round(datetime.datetime.now().timestamp() - self.t_last_change)} s"
)
if self.spam:
self.send([time() - self.t0, self.i, self.state])
if self.abort:
raise CrappyStop
self.root.update()
def end(self) -> None:
self.abort = True
def callback(self) -> None:
self.i += 1
self.send([time() - self.t0, self.i, self.state])
self.label.configure(text="step: %d" % self.i)
def callback_state(self) -> None:
self.state = not self.state
self.t_last_change = datetime.datetime.now().timestamp()
self.send([time() - self.t0, self.i, self.state])
self.ext_link.send({"state": self.state, "step": self.i}) # send to image
if self.state:
color_state = "#CDFF7F"
else:
color_state = "red"
self.label_state.configure(text="State: %s" % self.state, bg=color_state)
def finish(self) -> None:
self.root.destroy()
# save frame
class customCamera(Block):
"""Reads images from a camera object, saves and/or sends them to another
block.
It can be triggered by an other block, internally, or try to run at a given
framerate.
"""
def __init__(
self,
camera: str,
save_folder: str = None,
verbose: bool = False,
labels: list = None,
fps_label: str = False,
img_name: str = "{self.loops:06d}_{t-self.t0:.6f}",
ext: str = "tiff",
save_period: int = 1,
save_backend: str = None,
transform: Callable = None,
input_label: str = None,
config: bool = True,
no_loop: bool = False,
ext_link: Connection = None,
**kwargs,
) -> None:
"""Sets the args and initializes parent class.
Args:
camera (:obj:`str`): The name of the camera to control. See
:ref:`Cameras` for an exhaustive list of available ones.
save_folder (:obj:`str`, optional): The directory to save images to. If
it doesn't exist it will be created. If :obj:`None` the images won't be
saved.
verbose (:obj:`bool`, optional): If :obj:`True`, the block will print the
number of `loops/s`.
labels (:obj:`list`, optional): Names of the labels for respectively time
and the frame.
fps_label (:obj:`str`, optional): If set, ``self.max_fps`` will be set to
the value received by the block with this label.
img_name (:obj:`str`, optional): Template for the name of the image to
save. It is evaluated as an `f-string`.
ext (:obj:`str`, optional): Extension of the image. Make sure it is
supported by the saving backend.
save_period (:obj:`int`, optional): Will save only one in `x` images.
save_backend (:obj:`str`, optional): Module to use to save the images.
The supported backends are: :mod:`sitk` (SimpleITK), :mod:`cv2`
(OpenCV) and :mod:`pil` (Pillow). If :obj:`None`, will try :mod:`sitk`
and then :mod:`cv2` if not successful.
transform (:obj:`function`, optional): Function to be applied on the
image before sending. It will not be applied on the saved images.
input_label (:obj:`str`, optional): If specified, the image will not be
read from a camera object but from this label.
config (:obj:`bool`, optional): Show the popup for config ?
**kwargs: Any additional specific argument to pass to the camera.
"""
Block.__init__(self)
self.niceness = -10
self.save_folder = save_folder
self.verbose = verbose
self.labels = ["t(s)", "frame"] if labels is None else labels
self.fps_label = fps_label
self.img_name = img_name
self.ext = ext
self.save_period = save_period
self.save_backend = save_backend
self.transform = transform
self.input_label = input_label
self.config = config
self.no_loop = no_loop
self.ext_link = ext_link
self.camera_name = camera.capitalize()
self.cam_kw = kwargs
assert (
self.camera_name in camera_list or self.input_label
), "{} camera does not exist!".format(self.camera_name)
if self.save_backend is None:
if Sitk is None:
self.save_backend = "cv2"
else:
self.save_backend = "sitk"
assert self.save_backend in ["cv2", "sitk", "pil"], (
"Unknown saving backend: " + self.save_backend
)
self.save = getattr(self, "save_" + self.save_backend)
self.loops = 0
self.t0 = 0
def create_folder(self):
"""Check if the folder to store the image exist otherwise created it."""
sep = "\\" if "win" in platform else "/"
if self.save_folder and not self.save_folder.endswith(sep):
self.save_folder += sep
if self.save_folder and not os.path.exists(self.save_folder):
try:
os.makedirs(self.save_folder)
except OSError:
assert os.path.exists(self.save_folder), (
"Error creating " + self.save_folder
)
def prepare(self, send_img: bool = True) -> None:
self.create_folder()
self.ext_trigger = bool(
self.inputs and not (self.fps_label or self.input_label)
)
if self.input_label is not None:
# Exception to the usual inner working of Crappy:
# We receive data from the link BEFORE the program is started
self.ref_img = self.inputs[0].recv()[self.input_label]
self.camera = camera_list["Camera"]()
self.camera.max_fps = 30
self.camera.get_image = lambda: (0, self.ref_img)
return
self.camera = camera_list[self.camera_name]()
self.camera.open(**self.cam_kw)
if self.config:
conf = Camera_config(self.camera)
conf.main()
# Sending the first image before the actual start
if send_img:
t, img = self.get_img()
self.send([0, img])
if self.no_loop:
self.loop = self.stop
@staticmethod
def save_sitk(img: np.ndarray, fname: str) -> None:
image = Sitk.GetImageFromArray(img)
Sitk.WriteImage(image, fname)
@staticmethod
def save_cv2(img: np.ndarray, fname: str) -> None:
cv2.imwrite(fname, img)
@staticmethod
def save_pil(img: np.ndarray, fname: str) -> None:
PIL.Image.fromarray(img).save(fname)
def get_img(self) -> Union[tuple, None]:
"""Waits the appropriate time/event to read an image, reads it, saves it if
asked to, applies the transformation and increases counter."""
if self.input_label:
data = self.inputs[0].recv()
return data["t(s)"] + self.t0, data[self.input_label]
if not self.ext_trigger:
if self.fps_label:
while self.inputs[0].poll():
self.camera.max_fps = self.inputs[0].recv()[self.fps_label]
t, img = self.camera.read_image() # NOT constrained to max_fps
else:
data = self.inputs[0].recv() # wait for a signal
if data is None:
return
t, img = self.camera.get_image() # self limiting to max_fps
self.loops += 1
if self.save_folder and self.loops % self.save_period == 0:
self.save(
img,
self.save_folder + eval('f"{}"'.format(self.img_name)) + f".{self.ext}",
)
if self.transform:
img = self.transform(img)
return t, img
def loop(self) -> None:
if self.ext_link.poll():
print("something in poll")
received_data = self.ext_link.recv()
# print("received_data:" + str(received_data))
# print("type received_data:" + str(type(received_data)))
if received_data["state"] == 1:
step = received_data["step"]
self.save_folder = (
f"{study_name}/{step}/" # Path creation to store the images
)
self.create_folder()
else:
self.save_folder = None
t, img = self.get_img()
self.send([t - self.t0, img])
def finish(self) -> None:
if self.input_label is None:
self.camera.close()
#%% MAIN
if __name__ == "__main__":
# BLOCKS
link_side_a, link_side_b = Pipe()
record_event = crappy.blocks.Recorder(filename=path_event)
espion = crappy.blocks.Reader()
mgui = WT_GUI(ext_link=link_side_a, spam=False)
# camera
camera = customCamera(
camera="Webcam", verbose=True, max_fps=25, ext="png", ext_link=link_side_b
)
disp = crappy.blocks.Displayer(framerate=27)
# LINKS
crappy.link(mgui, record_event)
# crappy.link(mgui, save_cam_blck)
# crappy.link(camera, save_cam_blck)
crappy.link(camera, disp)
crappy.start()
# Versions:
# python: sys.version_info(major=3, minor=8, micro=10, releaselevel='final', serial=0)
# crappy: 1.5.7
# numpy: 1.22.0
# pandas: 1.4.0rc0
# serial: 3.5
#################################################################""
# %%
# https://askubuntu.com/questions/348838/how-to-check-available-webcams-from-the-command-line
## kill cam when lock after crappy crash
# fuser /dev/video0
# kill -9 22160
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment