Last active
April 20, 2022 14:38
-
-
Save fredvol/8981c93d6e35de3a2ca42f454841894d to your computer and use it in GitHub Desktop.
WOrking code for : https://github.com/LaboratoireMecaniqueLille/crappy/issues/10
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#%% | |
#################################################################### | |
# wind tunel crappy file | |
# | |
# This script only read the data send via usb/serial by an esp32 | |
# | |
# | |
# 17/03/2022 | |
# https://crappy.readthedocs.io/en/latest/installation.html | |
#%% import | |
import datetime | |
from multiprocessing.connection import Pipe, Connection | |
import os | |
import tkinter as tk | |
from csv import reader | |
from pathlib import Path | |
from sys import platform | |
from time import time | |
from typing import Callable, Union | |
import crappy | |
import numpy as np | |
# from ..tool import Camera_config | |
from crappy._global import CrappyStop, OptionalModule | |
from crappy.blocks import Block | |
# from .block import Block | |
from crappy.camera import camera_list | |
# from ..camera import camera_list | |
from crappy.tool import Camera_config | |
# from .._global import OptionalModule | |
try: | |
import SimpleITK as Sitk | |
except (ModuleNotFoundError, ImportError): | |
Sitk = None | |
try: | |
import PIL | |
except (ModuleNotFoundError, ImportError): | |
PIL = None | |
try: | |
import cv2 | |
except (ModuleNotFoundError, ImportError): | |
cv2 = OptionalModule("opencv-python") | |
#%% CONFIG | |
# Log | |
log_folder = "logs" | |
study_name = "test_cam" | |
path_raw = Path(__file__).parent / log_folder / study_name / "data_raw.csv" | |
path_event = Path(__file__).parent / log_folder / study_name / "event.csv" | |
path = Path(__file__).parent / log_folder / study_name / "data.csv" | |
# %% class definition | |
# GUI | |
class WT_GUI(crappy.blocks.Block): | |
"""Block to send a signal based on a user input.""" | |
def __init__( | |
self, | |
label: str = ["t0", "step", "state", "cmd"], | |
spam: bool = False, | |
font_label=("Arial", 15), | |
ext_link=None, # external link to cahnge save image folder | |
) -> None: | |
super().__init__() | |
self.spam = spam # Send the values only once or at each loop ? | |
self.i = 0 # The value to be sent | |
self.state = False # The value to be sent | |
self.t_last_change = datetime.datetime.now().timestamp() # The value to be sent | |
self.label_font = font_label | |
self.abort = False | |
self.ext_link = ext_link | |
if isinstance(label, list): | |
self.labels = label | |
else: | |
self.labels = ["t0", label] | |
def prepare(self) -> None: | |
self.root = tk.Tk() | |
self.root.title("GUI block") | |
self.root.protocol("WM_DELETE_WINDOW", self.end) | |
self.label_title = tk.Label( | |
self.root, | |
text=f"Study name : {study_name} \n ---- \n", | |
font=self.label_font, | |
) | |
self.label_title.pack() | |
self.label = tk.Label(self.root, text="step: 0", font=self.label_font) | |
self.label.pack() | |
self.label_state = tk.Label( | |
self.root, text=f"State: {self.state}", font=self.label_font | |
) | |
self.label_state.pack() | |
self.label_run_time = tk.Label( | |
self.root, | |
text=f"run: {datetime.datetime.now().timestamp() - self.t_last_change} s", | |
font=self.label_font, | |
) | |
self.label_run_time.pack() | |
self.button = tk.Button( | |
self.root, text="Next step WT", command=self.callback, bg="#5CA098" | |
) | |
self.button.pack() | |
self.button_state = tk.Button( | |
self.root, text="toogle state", command=self.callback_state, bg="#5CA098" | |
) | |
self.button_state.pack() | |
self.send([0, self.i, self.state]) | |
def loop(self) -> None: | |
self.label_run_time.configure( | |
text=f"run: {round(datetime.datetime.now().timestamp() - self.t_last_change)} s" | |
) | |
if self.spam: | |
self.send([time() - self.t0, self.i, self.state]) | |
if self.abort: | |
raise CrappyStop | |
self.root.update() | |
def end(self) -> None: | |
self.abort = True | |
def callback(self) -> None: | |
self.i += 1 | |
self.send([time() - self.t0, self.i, self.state]) | |
self.label.configure(text="step: %d" % self.i) | |
def callback_state(self) -> None: | |
self.state = not self.state | |
self.t_last_change = datetime.datetime.now().timestamp() | |
self.send([time() - self.t0, self.i, self.state]) | |
self.ext_link.send({"state": self.state, "step": self.i}) # send to image | |
if self.state: | |
color_state = "#CDFF7F" | |
else: | |
color_state = "red" | |
self.label_state.configure(text="State: %s" % self.state, bg=color_state) | |
def finish(self) -> None: | |
self.root.destroy() | |
# save frame | |
class customCamera(Block): | |
"""Reads images from a camera object, saves and/or sends them to another | |
block. | |
It can be triggered by an other block, internally, or try to run at a given | |
framerate. | |
""" | |
def __init__( | |
self, | |
camera: str, | |
save_folder: str = None, | |
verbose: bool = False, | |
labels: list = None, | |
fps_label: str = False, | |
img_name: str = "{self.loops:06d}_{t-self.t0:.6f}", | |
ext: str = "tiff", | |
save_period: int = 1, | |
save_backend: str = None, | |
transform: Callable = None, | |
input_label: str = None, | |
config: bool = True, | |
no_loop: bool = False, | |
ext_link: Connection = None, | |
**kwargs, | |
) -> None: | |
"""Sets the args and initializes parent class. | |
Args: | |
camera (:obj:`str`): The name of the camera to control. See | |
:ref:`Cameras` for an exhaustive list of available ones. | |
save_folder (:obj:`str`, optional): The directory to save images to. If | |
it doesn't exist it will be created. If :obj:`None` the images won't be | |
saved. | |
verbose (:obj:`bool`, optional): If :obj:`True`, the block will print the | |
number of `loops/s`. | |
labels (:obj:`list`, optional): Names of the labels for respectively time | |
and the frame. | |
fps_label (:obj:`str`, optional): If set, ``self.max_fps`` will be set to | |
the value received by the block with this label. | |
img_name (:obj:`str`, optional): Template for the name of the image to | |
save. It is evaluated as an `f-string`. | |
ext (:obj:`str`, optional): Extension of the image. Make sure it is | |
supported by the saving backend. | |
save_period (:obj:`int`, optional): Will save only one in `x` images. | |
save_backend (:obj:`str`, optional): Module to use to save the images. | |
The supported backends are: :mod:`sitk` (SimpleITK), :mod:`cv2` | |
(OpenCV) and :mod:`pil` (Pillow). If :obj:`None`, will try :mod:`sitk` | |
and then :mod:`cv2` if not successful. | |
transform (:obj:`function`, optional): Function to be applied on the | |
image before sending. It will not be applied on the saved images. | |
input_label (:obj:`str`, optional): If specified, the image will not be | |
read from a camera object but from this label. | |
config (:obj:`bool`, optional): Show the popup for config ? | |
**kwargs: Any additional specific argument to pass to the camera. | |
""" | |
Block.__init__(self) | |
self.niceness = -10 | |
self.save_folder = save_folder | |
self.verbose = verbose | |
self.labels = ["t(s)", "frame"] if labels is None else labels | |
self.fps_label = fps_label | |
self.img_name = img_name | |
self.ext = ext | |
self.save_period = save_period | |
self.save_backend = save_backend | |
self.transform = transform | |
self.input_label = input_label | |
self.config = config | |
self.no_loop = no_loop | |
self.ext_link = ext_link | |
self.camera_name = camera.capitalize() | |
self.cam_kw = kwargs | |
assert ( | |
self.camera_name in camera_list or self.input_label | |
), "{} camera does not exist!".format(self.camera_name) | |
if self.save_backend is None: | |
if Sitk is None: | |
self.save_backend = "cv2" | |
else: | |
self.save_backend = "sitk" | |
assert self.save_backend in ["cv2", "sitk", "pil"], ( | |
"Unknown saving backend: " + self.save_backend | |
) | |
self.save = getattr(self, "save_" + self.save_backend) | |
self.loops = 0 | |
self.t0 = 0 | |
def create_folder(self): | |
"""Check if the folder to store the image exist otherwise created it.""" | |
sep = "\\" if "win" in platform else "/" | |
if self.save_folder and not self.save_folder.endswith(sep): | |
self.save_folder += sep | |
if self.save_folder and not os.path.exists(self.save_folder): | |
try: | |
os.makedirs(self.save_folder) | |
except OSError: | |
assert os.path.exists(self.save_folder), ( | |
"Error creating " + self.save_folder | |
) | |
def prepare(self, send_img: bool = True) -> None: | |
self.create_folder() | |
self.ext_trigger = bool( | |
self.inputs and not (self.fps_label or self.input_label) | |
) | |
if self.input_label is not None: | |
# Exception to the usual inner working of Crappy: | |
# We receive data from the link BEFORE the program is started | |
self.ref_img = self.inputs[0].recv()[self.input_label] | |
self.camera = camera_list["Camera"]() | |
self.camera.max_fps = 30 | |
self.camera.get_image = lambda: (0, self.ref_img) | |
return | |
self.camera = camera_list[self.camera_name]() | |
self.camera.open(**self.cam_kw) | |
if self.config: | |
conf = Camera_config(self.camera) | |
conf.main() | |
# Sending the first image before the actual start | |
if send_img: | |
t, img = self.get_img() | |
self.send([0, img]) | |
if self.no_loop: | |
self.loop = self.stop | |
@staticmethod | |
def save_sitk(img: np.ndarray, fname: str) -> None: | |
image = Sitk.GetImageFromArray(img) | |
Sitk.WriteImage(image, fname) | |
@staticmethod | |
def save_cv2(img: np.ndarray, fname: str) -> None: | |
cv2.imwrite(fname, img) | |
@staticmethod | |
def save_pil(img: np.ndarray, fname: str) -> None: | |
PIL.Image.fromarray(img).save(fname) | |
def get_img(self) -> Union[tuple, None]: | |
"""Waits the appropriate time/event to read an image, reads it, saves it if | |
asked to, applies the transformation and increases counter.""" | |
if self.input_label: | |
data = self.inputs[0].recv() | |
return data["t(s)"] + self.t0, data[self.input_label] | |
if not self.ext_trigger: | |
if self.fps_label: | |
while self.inputs[0].poll(): | |
self.camera.max_fps = self.inputs[0].recv()[self.fps_label] | |
t, img = self.camera.read_image() # NOT constrained to max_fps | |
else: | |
data = self.inputs[0].recv() # wait for a signal | |
if data is None: | |
return | |
t, img = self.camera.get_image() # self limiting to max_fps | |
self.loops += 1 | |
if self.save_folder and self.loops % self.save_period == 0: | |
self.save( | |
img, | |
self.save_folder + eval('f"{}"'.format(self.img_name)) + f".{self.ext}", | |
) | |
if self.transform: | |
img = self.transform(img) | |
return t, img | |
def loop(self) -> None: | |
if self.ext_link.poll(): | |
print("something in poll") | |
received_data = self.ext_link.recv() | |
# print("received_data:" + str(received_data)) | |
# print("type received_data:" + str(type(received_data))) | |
if received_data["state"] == 1: | |
step = received_data["step"] | |
self.save_folder = ( | |
f"{study_name}/{step}/" # Path creation to store the images | |
) | |
self.create_folder() | |
else: | |
self.save_folder = None | |
t, img = self.get_img() | |
self.send([t - self.t0, img]) | |
def finish(self) -> None: | |
if self.input_label is None: | |
self.camera.close() | |
#%% MAIN | |
if __name__ == "__main__": | |
# BLOCKS | |
link_side_a, link_side_b = Pipe() | |
record_event = crappy.blocks.Recorder(filename=path_event) | |
espion = crappy.blocks.Reader() | |
mgui = WT_GUI(ext_link=link_side_a, spam=False) | |
# camera | |
camera = customCamera( | |
camera="Webcam", verbose=True, max_fps=25, ext="png", ext_link=link_side_b | |
) | |
disp = crappy.blocks.Displayer(framerate=27) | |
# LINKS | |
crappy.link(mgui, record_event) | |
# crappy.link(mgui, save_cam_blck) | |
# crappy.link(camera, save_cam_blck) | |
crappy.link(camera, disp) | |
crappy.start() | |
# Versions: | |
# python: sys.version_info(major=3, minor=8, micro=10, releaselevel='final', serial=0) | |
# crappy: 1.5.7 | |
# numpy: 1.22.0 | |
# pandas: 1.4.0rc0 | |
# serial: 3.5 | |
#################################################################"" | |
# %% | |
# https://askubuntu.com/questions/348838/how-to-check-available-webcams-from-the-command-line | |
## kill cam when lock after crappy crash | |
# fuser /dev/video0 | |
# kill -9 22160 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment