Last active
January 15, 2019 19:37
-
-
Save Andrew-William-Smith/3424574c4e2efd65c0652bdf9291e5b3 to your computer and use it in GitHub Desktop.
A simple library to control Crazyflie 2.0 quadcopters asynchronously
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# | |
# CRAZYFLIE COMMANDER LIBRARY | |
# Copyright (C) 2019 Andrew William Lawrence Smith | |
# Written for St. Mark's School of Texas --- Courage and Honor | |
# | |
# | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with this program. If not, see <https://www.gnu.org/licenses/>. | |
import logging | |
import queue | |
import threading | |
import time | |
from dataclasses import dataclass, field | |
from enum import IntEnum | |
from typing import Any, Callable, Optional | |
import cflib | |
from cflib.crazyflie import Crazyflie | |
from cflib.crazyflie.log import LogConfig | |
from cflib.crazyflie.syncCrazyflie import SyncCrazyflie | |
from cflib.crazyflie.syncLogger import SyncLogger | |
# Set up logging | |
logging.basicConfig(level=logging.ERROR) | |
class FunctionCommandMode(IntEnum): | |
""" | |
Specifies the mode in which to run a functional command | |
""" | |
SYNCHRONOUS = 1 | |
HOVER = 2 | |
BACKGROUND = 3 | |
@dataclass(order=True) | |
class CrazyflieCommand: | |
""" | |
Data class for commands. | |
""" | |
priority: int = 0 | |
duration: float = field(default=0.0, compare=False) | |
# Move command fields | |
velocity_x: float = field(default=0.0, compare=False) | |
velocity_y: float = field(default=0.0, compare=False) | |
yaw_rate: float = field(default=0.0, compare=False) | |
target_altitude: float = field(default=0.0, compare=False) | |
# Firmware parameter set command fields | |
param: str = field(default="", compare=False) | |
value: str = field(default="", compare=False) | |
# Function command fields | |
function: Callable[[], Optional[Any]] = field(default=None, compare=False) | |
run_mode: int = field(default=FunctionCommandMode.BACKGROUND, compare=False) | |
class CommandThread(threading.Thread): | |
""" | |
A threaded command queue to allow a program to execute a pre-loaded | |
set of movements. | |
""" | |
# Number of altitude readings to be used in the average calculation | |
ALTITUDE_READINGS: int=3 | |
# Computed altitude | |
altitude: float=0.0 | |
hardware_altitude: float=0.0 | |
_hold_altitude: float=0.0 | |
def __init__(self, address: int=0xE7E7E7E7E7, flow_deck_altitude: bool=True, | |
drop_threshold: float=0.05, interpolate_interval: float=0.05, | |
args=(), kwargs=None): | |
""" | |
Initialise a command thread for the specified Crazyflie. | |
:param address: The address of the Crazyflie to which to be | |
controlled by this commander. | |
:param flow_deck_altitude: Whether to reconcile the internal | |
altitude with the output of the laser | |
rangefinder on the flow deck. | |
:param drop_threshold: The distance in metres past which to halt | |
the Crazyflie's rotors. | |
:param interpolate_interval: The interval in seconds at which | |
motion commands should be | |
interpolated. | |
:raises IndexError: Raises if no device could be found on the | |
specified address. | |
""" | |
threading.Thread.__init__(self, args=(), kwargs=None) | |
# Threshold past which to halt motors | |
self.drop_threshold = drop_threshold | |
self.interpolate_interval = interpolate_interval | |
# Event to stop the execution of the current command | |
self.stop_event = threading.Event() | |
# Event to halt the Crazyflie and terminate the program | |
self.halt_event = threading.Event() | |
# Command queue, current and preemptive commands | |
self.command_queue = queue.Queue() | |
self.command_lock = threading.Lock() | |
self.current_command: CrazyflieCommand | |
self.preempt_command = None | |
# Register events for the Crazyflie controlled by this thread | |
self._cf = self._init_crazyflie(address) | |
self._cf.disconnected.add_callback(self._disconnected) | |
self._cf.connection_lost.add_callback(self._connection_lost) | |
# Initialise logger | |
self.logger = LogConfig(name='CrazyflieCommander Reporter', | |
period_in_ms=self.interpolate_interval * 1000) | |
# Set up flow deck altitude tracking if desired | |
self.flow_deck_altitude = flow_deck_altitude | |
self._altitude_queue = [] | |
if flow_deck_altitude: | |
self.logger.add_variable('stateEstimate.z', 'float') | |
self.logger.data_received_cb.add_callback(self._handle_hardware_z) | |
# Calibrate flow deck | |
self.calibrate_flow_deck() | |
def _init_crazyflie(self, address: int) -> Crazyflie: | |
# Initialise drivers | |
cflib.crtp.init_drivers(enable_debug_driver=False) | |
# Scan for quadcopters on specified address | |
print(f'Scanning for devices on address 0x{address:x}...') | |
devices = cflib.crtp.scan_interfaces(address) | |
if len(devices) == 0: | |
raise IndexError(f'No devices found on address 0x{address:x}') | |
else: | |
# Get URI of highest-throughput channel | |
self.device_uri = devices[-1][0] | |
print(f'Using device with URI {self.device_uri}') | |
return Crazyflie(rw_cache='./cfcache') | |
def _handle_hardware_z(self, timestamp, data, logconf): | |
""" | |
Process an altitude reading from the flow deck. | |
""" | |
self._altitude_queue.append(data['stateEstimate.z']) | |
if len(self._altitude_queue) >= self.ALTITUDE_READINGS: | |
# Reconcile the tracked altitude with the hardware altitude | |
# if the Crazyflie has not landed | |
if self.altitude > 0: | |
self.hardware_altitude = sum(self._altitude_queue) / len(self._altitude_queue) | |
self._altitude_queue = [] | |
def _disconnected(self, link_uri): | |
""" | |
Event to halt the command thread if the Crazyflie is | |
disconnected. | |
""" | |
self.halt_event.set() | |
print(f'Disconnected from Crazyflie on {link_uri}. Halting command thread.') | |
def _connection_lost(self, link_uri, msg): | |
""" | |
Event to halt the command thread if the connection to the | |
Crazyflie is lost. | |
""" | |
self.halt_event.set() | |
print(f'Connection to Crazyflie on {link_uri} lost. Halting command thread.') | |
def enqueue_rate_command(self, duration: float, vx: float=0.0, vy: float=0.0, | |
yaw_rate: float=0.0, z: float=-1, | |
priority: int=0): | |
""" | |
Add a manoeuvre defined in terms of rates to the command queue. | |
:param duration: The length of time over which to execute the | |
command in seconds. | |
:param vx: The velocity in the x (forward-backward) direction to | |
be maintained for this command in metres per second. | |
:param vy: The velocity in the y (side-to-side) direction to be | |
maintained for this command in metres per second. | |
:param yaw_rate: The angular velocity at which the quadcopter | |
should rotate in degrees per second. | |
:param z: The altitude to be reached by the end of this command | |
in metres. For safety, setting a rate of altitude | |
increase is not supported. The altitude must remain | |
below or at 1.5 metres, the maximum supported by the | |
laser rangefinder. | |
:param priority: The priority at which this command should be | |
inserted into the command queue. Commands with | |
lower priorities will be executed first. | |
""" | |
# Handle altitude hold | |
if z == -1: | |
z = self._hold_altitude | |
elif z < 0 or z > 1.5: | |
print('Invalid altitude: may not exceed 1.5 metres.') | |
return | |
self._hold_altitude = z | |
command = CrazyflieCommand(priority, duration, vx, vy, yaw_rate, z) | |
self.command_queue.put(command) | |
def enqueue_position_command(self, duration: float, x: float=0.0, y: float=0.0, | |
yaw: float=0.0, z: float=-1, | |
priority: int=0): | |
""" | |
Add a manoeuvre defined in terms of fixed distances to the | |
command queue. | |
:param duration: The length of time over which to execute the | |
command in seconds. | |
:param x: The distance in the x (forward-backward) direction to | |
move in metres. | |
:param y: The distance in the y (side-to-side) direction to move | |
in metres. | |
:param yaw: The degrees through which the quadcopter should | |
rotate as a result of this command. | |
:param z: The altitude to be reached by the end of this command | |
in metres. | |
:param priority: The priority at which this command should be | |
inserted into the command queue. Commands with | |
lower priorities will be executed first. | |
""" | |
vx = x / duration | |
vy = y / duration | |
yaw_rate = yaw / duration | |
self.enqueue_rate_command(duration, vx, vy, yaw_rate, z, priority) | |
def enqueue_hover(self, duration: float=0.0, priority: int=0): | |
"""Enqueue a command to hover for the specified duration. | |
:param duration: The length of time for which to hover. | |
""" | |
self.enqueue_rate_command(duration, 0, 0, 0, self._hold_altitude, priority) | |
def enqueue_param_command(self, param: str, value: str, wait: float=0.0, | |
priority: int=0): | |
""" | |
Add a firmware parameter set operation to the command queue. | |
:param param: The fully-qualified name of the parameter to set. | |
:param value: The value to which to set the parameter. | |
:param wait: The fractional number of seconds to wait after the | |
parameter is set. | |
:param priority: The priority at which this command should be | |
inserted into the command queue. Commands with | |
lower priorities will be executed first. | |
""" | |
command = CrazyflieCommand(priority, wait, param=param, value=value) | |
self.command_queue.put(command) | |
def enqueue_function_command(self, function: Callable[[], Optional[Any]], | |
mode: int=FunctionCommandMode.BACKGROUND, | |
priority: int=0): | |
""" | |
Add a function command to the command queue. | |
:param function: The function to be executed. | |
:param mode: The mode with which to execute the function (must be | |
one of FunctionCommandMode. | |
:param priority: The priority at which this command should be | |
inserted into the command queue. Commands with | |
lower priorities will be executed first. | |
""" | |
command = CrazyflieCommand(priority, function=function, run_mode=mode, | |
target_altitude=self._hold_altitude) | |
self.command_queue.put(command) | |
def enqueue_end(self): | |
""" | |
Enqueue an event for the Crazyflie to land at the highest priority. | |
""" | |
self.enqueue_rate_command(1.0, z=0.0) | |
self.enqueue_function_command(function=(lambda: self.halt_event.set()), | |
mode=FunctionCommandMode.SYNCHRONOUS) | |
def calibrate_flow_deck(self): | |
""" | |
Recalibrate the Crazyflie's flow deck. The quad must be on the | |
ground when this method is invoked. | |
:raises ValueError: If the Crazyflie is not landed. | |
""" | |
if self.altitude > 0: | |
raise ValueError('Crazyflie must be landed to calibrate flow deck.') | |
self.enqueue_param_command('kalman.resetEstimation', '1', 0.1) | |
self.enqueue_param_command('kalman.resetEstimation', '0', 1.0) | |
def _run_move_command(self, cmd: CrazyflieCommand): | |
""" | |
Execute the specified movement command. | |
:param cmd: The command to execute. | |
""" | |
# Determine the number of iterations into which to divide this command | |
iterations = cmd.duration / self.interpolate_interval | |
initial_z = self.altitude | |
for i in range(int(iterations), 0, -1): | |
# If stop event is set, clear it and end this command | |
if self.stop_event.is_set(): | |
self.stop_event.clear() | |
return | |
# Update altitude | |
self.altitude += (cmd.target_altitude - self.altitude) / i | |
# Halt rotors on descent | |
if self.altitude < self.drop_threshold and self.altitude < initial_z: | |
self._cf.commander.send_stop_setpoint() | |
self.altitude = 0 | |
return | |
# Send command | |
self._cf.commander.send_hover_setpoint(cmd.velocity_x, cmd.velocity_y, | |
cmd.yaw_rate, self.altitude) | |
time.sleep(self.interpolate_interval) | |
def _run_param_command(self, cmd: CrazyflieCommand): | |
""" | |
Execute the specified firmware parameter set command. | |
:param cmd: The command to execute. | |
""" | |
self._cf.param.set_value(cmd.param, cmd.value) | |
iterations = cmd.duration / self.interpolate_interval | |
for i in range(int(iterations)): | |
if self.altitude > 0: | |
self._cf.commander.send_hover_setpoint(0, 0, 0, self.altitude) | |
time.sleep(self.interpolate_interval) | |
def _run_function_command(self, cmd: CrazyflieCommand): | |
""" | |
Execute the specified functional command in the specified mode. | |
:param cmd: The command to execute. | |
""" | |
if cmd.run_mode == FunctionCommandMode.SYNCHRONOUS: | |
cmd.function() | |
else: | |
cmd_thread = threading.Thread(target=cmd.function) | |
cmd_thread.start() | |
if cmd.run_mode == FunctionCommandMode.HOVER: | |
while cmd_thread.isAlive(): | |
self._cf.commander.send_hover_setpoint(0, 0, 0, self.altitude) | |
time.sleep(self.interpolate_interval) | |
def run(self): | |
""" | |
Main event loop for the CommandThread. Handles all interaction | |
with the Crazyflie. | |
""" | |
# Sync Crazyflie | |
with SyncCrazyflie(self.device_uri, cf=self._cf) as scf: | |
with SyncLogger(scf, self.logger) as log: | |
while not self.halt_event.is_set(): | |
# Execute the preempted command immediately | |
if self.preempt_command: | |
self.preempt_command() | |
# Otherwise, pull the next command from the queue if one exists | |
elif self.command_queue.qsize() > 0: | |
cmd = self.command_queue.get_nowait() | |
self.current_command = cmd | |
# Determine type of command | |
if cmd.param != '': | |
self._run_param_command(cmd) | |
elif cmd.function: | |
self._run_function_command(cmd) | |
else: | |
self._run_move_command(cmd) | |
# Mark command as finished | |
self.command_queue.task_done() | |
# Otherwise, hover if off the ground | |
else: | |
if self.altitude > 0: | |
self._cf.commander.send_hover_setpoint(0, 0, 0, | |
self.altitude) | |
time.sleep(self.interpolate_interval) | |
return |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment