Skip to content

Instantly share code, notes, and snippets.

@Andrew-William-Smith
Last active January 15, 2019 19:37
Show Gist options
  • Save Andrew-William-Smith/3424574c4e2efd65c0652bdf9291e5b3 to your computer and use it in GitHub Desktop.
Save Andrew-William-Smith/3424574c4e2efd65c0652bdf9291e5b3 to your computer and use it in GitHub Desktop.
A simple library to control Crazyflie 2.0 quadcopters asynchronously
# -*- coding: utf-8 -*-
#
# CRAZYFLIE COMMANDER LIBRARY
# Copyright (C) 2019 Andrew William Lawrence Smith
# Written for St. Mark's School of Texas --- Courage and Honor
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
import queue
import threading
import time
from dataclasses import dataclass, field
from enum import IntEnum
from typing import Any, Callable, Optional
import cflib
from cflib.crazyflie import Crazyflie
from cflib.crazyflie.log import LogConfig
from cflib.crazyflie.syncCrazyflie import SyncCrazyflie
from cflib.crazyflie.syncLogger import SyncLogger
# Set up logging
logging.basicConfig(level=logging.ERROR)
class FunctionCommandMode(IntEnum):
"""
Specifies the mode in which to run a functional command
"""
SYNCHRONOUS = 1
HOVER = 2
BACKGROUND = 3
@dataclass(order=True)
class CrazyflieCommand:
"""
Data class for commands.
"""
priority: int = 0
duration: float = field(default=0.0, compare=False)
# Move command fields
velocity_x: float = field(default=0.0, compare=False)
velocity_y: float = field(default=0.0, compare=False)
yaw_rate: float = field(default=0.0, compare=False)
target_altitude: float = field(default=0.0, compare=False)
# Firmware parameter set command fields
param: str = field(default="", compare=False)
value: str = field(default="", compare=False)
# Function command fields
function: Callable[[], Optional[Any]] = field(default=None, compare=False)
run_mode: int = field(default=FunctionCommandMode.BACKGROUND, compare=False)
class CommandThread(threading.Thread):
"""
A threaded command queue to allow a program to execute a pre-loaded
set of movements.
"""
# Number of altitude readings to be used in the average calculation
ALTITUDE_READINGS: int=3
# Computed altitude
altitude: float=0.0
hardware_altitude: float=0.0
_hold_altitude: float=0.0
def __init__(self, address: int=0xE7E7E7E7E7, flow_deck_altitude: bool=True,
drop_threshold: float=0.05, interpolate_interval: float=0.05,
args=(), kwargs=None):
"""
Initialise a command thread for the specified Crazyflie.
:param address: The address of the Crazyflie to which to be
controlled by this commander.
:param flow_deck_altitude: Whether to reconcile the internal
altitude with the output of the laser
rangefinder on the flow deck.
:param drop_threshold: The distance in metres past which to halt
the Crazyflie's rotors.
:param interpolate_interval: The interval in seconds at which
motion commands should be
interpolated.
:raises IndexError: Raises if no device could be found on the
specified address.
"""
threading.Thread.__init__(self, args=(), kwargs=None)
# Threshold past which to halt motors
self.drop_threshold = drop_threshold
self.interpolate_interval = interpolate_interval
# Event to stop the execution of the current command
self.stop_event = threading.Event()
# Event to halt the Crazyflie and terminate the program
self.halt_event = threading.Event()
# Command queue, current and preemptive commands
self.command_queue = queue.Queue()
self.command_lock = threading.Lock()
self.current_command: CrazyflieCommand
self.preempt_command = None
# Register events for the Crazyflie controlled by this thread
self._cf = self._init_crazyflie(address)
self._cf.disconnected.add_callback(self._disconnected)
self._cf.connection_lost.add_callback(self._connection_lost)
# Initialise logger
self.logger = LogConfig(name='CrazyflieCommander Reporter',
period_in_ms=self.interpolate_interval * 1000)
# Set up flow deck altitude tracking if desired
self.flow_deck_altitude = flow_deck_altitude
self._altitude_queue = []
if flow_deck_altitude:
self.logger.add_variable('stateEstimate.z', 'float')
self.logger.data_received_cb.add_callback(self._handle_hardware_z)
# Calibrate flow deck
self.calibrate_flow_deck()
def _init_crazyflie(self, address: int) -> Crazyflie:
# Initialise drivers
cflib.crtp.init_drivers(enable_debug_driver=False)
# Scan for quadcopters on specified address
print(f'Scanning for devices on address 0x{address:x}...')
devices = cflib.crtp.scan_interfaces(address)
if len(devices) == 0:
raise IndexError(f'No devices found on address 0x{address:x}')
else:
# Get URI of highest-throughput channel
self.device_uri = devices[-1][0]
print(f'Using device with URI {self.device_uri}')
return Crazyflie(rw_cache='./cfcache')
def _handle_hardware_z(self, timestamp, data, logconf):
"""
Process an altitude reading from the flow deck.
"""
self._altitude_queue.append(data['stateEstimate.z'])
if len(self._altitude_queue) >= self.ALTITUDE_READINGS:
# Reconcile the tracked altitude with the hardware altitude
# if the Crazyflie has not landed
if self.altitude > 0:
self.hardware_altitude = sum(self._altitude_queue) / len(self._altitude_queue)
self._altitude_queue = []
def _disconnected(self, link_uri):
"""
Event to halt the command thread if the Crazyflie is
disconnected.
"""
self.halt_event.set()
print(f'Disconnected from Crazyflie on {link_uri}. Halting command thread.')
def _connection_lost(self, link_uri, msg):
"""
Event to halt the command thread if the connection to the
Crazyflie is lost.
"""
self.halt_event.set()
print(f'Connection to Crazyflie on {link_uri} lost. Halting command thread.')
def enqueue_rate_command(self, duration: float, vx: float=0.0, vy: float=0.0,
yaw_rate: float=0.0, z: float=-1,
priority: int=0):
"""
Add a manoeuvre defined in terms of rates to the command queue.
:param duration: The length of time over which to execute the
command in seconds.
:param vx: The velocity in the x (forward-backward) direction to
be maintained for this command in metres per second.
:param vy: The velocity in the y (side-to-side) direction to be
maintained for this command in metres per second.
:param yaw_rate: The angular velocity at which the quadcopter
should rotate in degrees per second.
:param z: The altitude to be reached by the end of this command
in metres. For safety, setting a rate of altitude
increase is not supported. The altitude must remain
below or at 1.5 metres, the maximum supported by the
laser rangefinder.
:param priority: The priority at which this command should be
inserted into the command queue. Commands with
lower priorities will be executed first.
"""
# Handle altitude hold
if z == -1:
z = self._hold_altitude
elif z < 0 or z > 1.5:
print('Invalid altitude: may not exceed 1.5 metres.')
return
self._hold_altitude = z
command = CrazyflieCommand(priority, duration, vx, vy, yaw_rate, z)
self.command_queue.put(command)
def enqueue_position_command(self, duration: float, x: float=0.0, y: float=0.0,
yaw: float=0.0, z: float=-1,
priority: int=0):
"""
Add a manoeuvre defined in terms of fixed distances to the
command queue.
:param duration: The length of time over which to execute the
command in seconds.
:param x: The distance in the x (forward-backward) direction to
move in metres.
:param y: The distance in the y (side-to-side) direction to move
in metres.
:param yaw: The degrees through which the quadcopter should
rotate as a result of this command.
:param z: The altitude to be reached by the end of this command
in metres.
:param priority: The priority at which this command should be
inserted into the command queue. Commands with
lower priorities will be executed first.
"""
vx = x / duration
vy = y / duration
yaw_rate = yaw / duration
self.enqueue_rate_command(duration, vx, vy, yaw_rate, z, priority)
def enqueue_hover(self, duration: float=0.0, priority: int=0):
"""Enqueue a command to hover for the specified duration.
:param duration: The length of time for which to hover.
"""
self.enqueue_rate_command(duration, 0, 0, 0, self._hold_altitude, priority)
def enqueue_param_command(self, param: str, value: str, wait: float=0.0,
priority: int=0):
"""
Add a firmware parameter set operation to the command queue.
:param param: The fully-qualified name of the parameter to set.
:param value: The value to which to set the parameter.
:param wait: The fractional number of seconds to wait after the
parameter is set.
:param priority: The priority at which this command should be
inserted into the command queue. Commands with
lower priorities will be executed first.
"""
command = CrazyflieCommand(priority, wait, param=param, value=value)
self.command_queue.put(command)
def enqueue_function_command(self, function: Callable[[], Optional[Any]],
mode: int=FunctionCommandMode.BACKGROUND,
priority: int=0):
"""
Add a function command to the command queue.
:param function: The function to be executed.
:param mode: The mode with which to execute the function (must be
one of FunctionCommandMode.
:param priority: The priority at which this command should be
inserted into the command queue. Commands with
lower priorities will be executed first.
"""
command = CrazyflieCommand(priority, function=function, run_mode=mode,
target_altitude=self._hold_altitude)
self.command_queue.put(command)
def enqueue_end(self):
"""
Enqueue an event for the Crazyflie to land at the highest priority.
"""
self.enqueue_rate_command(1.0, z=0.0)
self.enqueue_function_command(function=(lambda: self.halt_event.set()),
mode=FunctionCommandMode.SYNCHRONOUS)
def calibrate_flow_deck(self):
"""
Recalibrate the Crazyflie's flow deck. The quad must be on the
ground when this method is invoked.
:raises ValueError: If the Crazyflie is not landed.
"""
if self.altitude > 0:
raise ValueError('Crazyflie must be landed to calibrate flow deck.')
self.enqueue_param_command('kalman.resetEstimation', '1', 0.1)
self.enqueue_param_command('kalman.resetEstimation', '0', 1.0)
def _run_move_command(self, cmd: CrazyflieCommand):
"""
Execute the specified movement command.
:param cmd: The command to execute.
"""
# Determine the number of iterations into which to divide this command
iterations = cmd.duration / self.interpolate_interval
initial_z = self.altitude
for i in range(int(iterations), 0, -1):
# If stop event is set, clear it and end this command
if self.stop_event.is_set():
self.stop_event.clear()
return
# Update altitude
self.altitude += (cmd.target_altitude - self.altitude) / i
# Halt rotors on descent
if self.altitude < self.drop_threshold and self.altitude < initial_z:
self._cf.commander.send_stop_setpoint()
self.altitude = 0
return
# Send command
self._cf.commander.send_hover_setpoint(cmd.velocity_x, cmd.velocity_y,
cmd.yaw_rate, self.altitude)
time.sleep(self.interpolate_interval)
def _run_param_command(self, cmd: CrazyflieCommand):
"""
Execute the specified firmware parameter set command.
:param cmd: The command to execute.
"""
self._cf.param.set_value(cmd.param, cmd.value)
iterations = cmd.duration / self.interpolate_interval
for i in range(int(iterations)):
if self.altitude > 0:
self._cf.commander.send_hover_setpoint(0, 0, 0, self.altitude)
time.sleep(self.interpolate_interval)
def _run_function_command(self, cmd: CrazyflieCommand):
"""
Execute the specified functional command in the specified mode.
:param cmd: The command to execute.
"""
if cmd.run_mode == FunctionCommandMode.SYNCHRONOUS:
cmd.function()
else:
cmd_thread = threading.Thread(target=cmd.function)
cmd_thread.start()
if cmd.run_mode == FunctionCommandMode.HOVER:
while cmd_thread.isAlive():
self._cf.commander.send_hover_setpoint(0, 0, 0, self.altitude)
time.sleep(self.interpolate_interval)
def run(self):
"""
Main event loop for the CommandThread. Handles all interaction
with the Crazyflie.
"""
# Sync Crazyflie
with SyncCrazyflie(self.device_uri, cf=self._cf) as scf:
with SyncLogger(scf, self.logger) as log:
while not self.halt_event.is_set():
# Execute the preempted command immediately
if self.preempt_command:
self.preempt_command()
# Otherwise, pull the next command from the queue if one exists
elif self.command_queue.qsize() > 0:
cmd = self.command_queue.get_nowait()
self.current_command = cmd
# Determine type of command
if cmd.param != '':
self._run_param_command(cmd)
elif cmd.function:
self._run_function_command(cmd)
else:
self._run_move_command(cmd)
# Mark command as finished
self.command_queue.task_done()
# Otherwise, hover if off the ground
else:
if self.altitude > 0:
self._cf.commander.send_hover_setpoint(0, 0, 0,
self.altitude)
time.sleep(self.interpolate_interval)
return
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment