Created
March 20, 2023 09:50
-
-
Save Allexik/b64f0eb3112c75fc86295e6b6c88068e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# Copyright (C) 2018-2022 Vasily Evseenko <[email protected]> | |
# | |
# This program is free software; you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation; version 3. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License along | |
# with this program; if not, write to the Free Software Foundation, Inc., | |
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | |
# | |
import time | |
import json | |
import os | |
import sys | |
import re | |
import hashlib | |
import json | |
from wfb_ng.antenna.antenna_protocols import AntennaFactory, AntennaProtocol | |
from wfb_ng import log | |
from twisted.internet import reactor, defer, main as ti_main | |
from twisted.internet.protocol import ProcessProtocol, DatagramProtocol, Protocol, Factory | |
from twisted.protocols.basic import LineReceiver | |
from twisted.internet.serialport import SerialPort | |
from twisted.python import failure | |
from wfb_ng.common import abort_on_crash, exit_status, df_sleep | |
from wfb_ng.lora.lora_state_management import LoraStatusOnGSManager | |
from wfb_ng.lora.telemetry_filter import MavlinkFilter | |
from wfb_ng.proxy import UDPProxyProtocol, SerialProxyProtocol, ARMProtocol, call_and_check_rc, ExecError | |
from wfb_ng.tuntap import TUNTAPProtocol, TUNTAPTransport | |
from wfb_ng.conf import settings, cfg_files | |
from wfb_ng.lora.lora_initializer import init_lora_tx, init_lora_rx | |
from wfb_ng.sdr.sdr_protocol import SDRManager | |
from wfb_ng.www import server as ui_server | |
connect_re = re.compile(r'^connect://(?P<addr>[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+):(?P<port>[0-9]+)$', re.IGNORECASE) | |
listen_re = re.compile(r'^listen://(?P<addr>[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+):(?P<port>[0-9]+)$', re.IGNORECASE) | |
serial_re = re.compile(r'^serial:(?P<dev>[a-z0-9\-\_/]+):(?P<baud>[0-9]+)$', re.IGNORECASE) | |
class DbgProtocol(LineReceiver): | |
delimiter = b'\n' | |
def __init__(self, rx_id): | |
self.rx_id = rx_id | |
def lineReceived(self, line): | |
log.msg('%s: %s' % (self.rx_id, line.decode('utf-8'))) | |
class RXProtocol(ProcessProtocol): | |
def __init__(self, antenna_stat, cmd, rx_id): | |
self.cmd = cmd | |
self.rx_id = rx_id | |
self.ant = AntennaProtocol(antenna_stat, rx_id) if antenna_stat is not None else None | |
self.dbg = DbgProtocol(rx_id) | |
self.df = defer.Deferred() | |
def connectionMade(self): | |
log.msg('Started %s' % (self.rx_id,)) | |
def outReceived(self, data): | |
if self.ant is not None: | |
self.ant.dataReceived(data) | |
def errReceived(self, data): | |
self.dbg.dataReceived(data) | |
def processEnded(self, status): | |
rc = status.value.exitCode | |
log.msg('Stopped RX %s with code %s' % (self.rx_id, rc)) | |
if rc == 0: | |
self.df.callback(str(status.value)) | |
else: | |
self.df.errback(status) | |
def start(self): | |
df = defer.maybeDeferred(reactor.spawnProcess, self, self.cmd[0], self.cmd, env=os.environ, | |
childFDs={0: "w", 1: "r", 2: "r"}) | |
return df.addCallback(lambda _: self.df) | |
class TXProtocol(ProcessProtocol): | |
def __init__(self, cmd, tx_id): | |
self.cmd = cmd | |
self.tx_id = tx_id | |
self.dbg = DbgProtocol(tx_id) | |
self.df = defer.Deferred() | |
def connectionMade(self): | |
log.msg('Started %s' % (self.tx_id,)) | |
def outReceived(self, data): | |
self.dbg.dataReceived(data) | |
def errReceived(self, data): | |
self.dbg.dataReceived(data) | |
def processEnded(self, status): | |
rc = status.value.exitCode | |
log.msg('Stopped TX %s with code %s' % (self.tx_id, rc)) | |
if rc == 0: | |
self.df.callback(str(status.value)) | |
else: | |
self.df.errback(status) | |
def start(self): | |
df = defer.maybeDeferred(reactor.spawnProcess, self, self.cmd[0], self.cmd, env=os.environ, | |
childFDs={0: "w", 1: "r", 2: "r"}) | |
return df.addCallback(lambda _: self.df) | |
@defer.inlineCallbacks | |
def init_wlans(profile, wlans): | |
max_bw = max(getattr(getattr(settings, '%s_mavlink' % profile), 'bandwidth'), | |
getattr(getattr(settings, '%s_video' % profile), 'bandwidth')) | |
if max_bw == 20: | |
ht_mode = 'HT20' | |
elif max_bw == 40: | |
ht_mode = 'HT40+' | |
else: | |
raise Exception('Unsupported bandwith %d MHz' % (max_bw,)) | |
try: | |
yield call_and_check_rc('iw', 'reg', 'set', settings.common.wifi_region) | |
for wlan in wlans: | |
if settings.common.set_nm_unmanaged and os.path.exists('/usr/bin/nmcli'): | |
device_status = yield call_and_check_rc('nmcli', 'device', 'show', wlan, log_stdout=False) | |
if not b'(unmanaged)' in device_status: | |
log.msg('Switch %s to unmanaged state' % (wlan,)) | |
yield call_and_check_rc('nmcli', 'device', 'set', wlan, 'managed', 'no') | |
yield df_sleep(1) | |
yield call_and_check_rc('ip', 'link', 'set', wlan, 'down') | |
yield call_and_check_rc('iw', 'dev', wlan, 'set', 'monitor', 'otherbss') | |
yield call_and_check_rc('ip', 'link', 'set', wlan, 'up') | |
yield call_and_check_rc('iw', 'dev', wlan, 'set', 'freq', str(settings.common.wifi_channel), ht_mode) | |
if settings.common.wifi_txpower: | |
yield call_and_check_rc('iw', 'dev', wlan, 'set', 'txpower', 'fixed', str(settings.common.wifi_txpower)) | |
except ExecError as v: | |
if v.stdout: | |
log.msg(v.stdout, isError=1) | |
if v.stderr: | |
log.msg(v.stderr, isError=1) | |
raise | |
def run_ui(): | |
return reactor.callInThread(ui_server.start) | |
def init(profile, wlans): | |
def _init_services(_): | |
link_id = int.from_bytes(hashlib.sha1(settings.common.link_id.encode('utf-8')).digest()[:3], 'big') | |
return defer.gatherResults([defer.maybeDeferred(init_mavlink, profile, wlans, link_id), | |
defer.maybeDeferred(init_video, profile, wlans, link_id), | |
defer.maybeDeferred(init_tunnel, profile, wlans, link_id), | |
defer.maybeDeferred(run_ui)]) \ | |
.addErrback(lambda f: f.trap(defer.FirstError) and f.value.subFailure) | |
return defer.maybeDeferred(init_wlans, profile, wlans).addCallback(_init_services) | |
def init_mavlink(profile, wlans, link_id): | |
cfg = getattr(settings, '%s_mavlink' % (profile,)) | |
cmd_rx = ('%s -p %d -u %d -K %s -k %d -n %d -i %d' % \ | |
(os.path.join(settings.path.bin_dir, 'wfb_rx'), cfg.stream_rx, | |
cfg.port_rx, os.path.join(settings.path.conf_dir, cfg.keypair), cfg.fec_k, cfg.fec_n, | |
link_id)).split() + wlans | |
cmd_tx = ('%s -p %d -u %d -K %s -B %d -G %s -S %d -L %d -M %d -k %d -n %d -T %d -i %d' % \ | |
(os.path.join(settings.path.bin_dir, 'wfb_tx'), | |
cfg.stream_tx, cfg.port_tx, os.path.join(settings.path.conf_dir, cfg.keypair), | |
cfg.bandwidth, "short" if cfg.short_gi else "long", cfg.stbc, cfg.ldpc, cfg.mcs_index, | |
cfg.fec_k, cfg.fec_n, cfg.fec_timeout, link_id)).split() + wlans | |
listen = None | |
connect = None | |
serial = None | |
mirror = None | |
if connect_re.match(cfg.peer): | |
m = connect_re.match(cfg.peer) | |
connect = m.group('addr'), int(m.group('port')) | |
log.msg('Connect telem stream %d(RX), %d(TX) to %s:%d' % (cfg.stream_rx, cfg.stream_tx, connect[0], connect[1])) | |
elif listen_re.match(cfg.peer): | |
m = listen_re.match(cfg.peer) | |
listen = m.group('addr'), int(m.group('port')) | |
log.msg( | |
'Listen for telem stream %d(RX), %d(TX) on %s:%d' % (cfg.stream_rx, cfg.stream_tx, listen[0], listen[1])) | |
elif serial_re.match(cfg.peer): | |
m = serial_re.match(cfg.peer) | |
serial = m.group('dev'), int(m.group('baud')) | |
log.msg('Open serial port %s on speed %d' % (serial[0], serial[1])) | |
else: | |
raise Exception('Unsupport peer address: %s' % (cfg.peer,)) | |
if cfg.mirror is not None and connect_re.match(cfg.mirror): | |
m = connect_re.match(cfg.mirror) | |
mirror = m.group('addr'), int(m.group('port')) | |
log.msg('Mirror telem stream to %s:%d' % (mirror[0], mirror[1])) | |
if cfg.call_on_arm or cfg.call_on_disarm: | |
arm_proto = ARMProtocol(cfg.call_on_arm, cfg.call_on_disarm) | |
else: | |
arm_proto = None | |
lora_status_manager = None | |
if serial: | |
# Drone <--> Flight Controller | |
p_in = SerialProxyProtocol(agg_max_size=settings.common.radio_mtu, | |
agg_timeout=settings.common.mavlink_agg_timeout, | |
inject_rssi=True, | |
arm_proto=arm_proto, | |
mavlink_sys_id=cfg.mavlink_sys_id, | |
mavlink_comp_id=cfg.mavlink_comp_id) | |
else: | |
# The first argument is not None only if we initiate mavlink connection | |
# GS <--> Mission Planner | |
lora_status_manager = LoraStatusOnGSManager() | |
p_in = UDPProxyProtocol(connect, agg_max_size=settings.common.radio_mtu, | |
agg_timeout=settings.common.mavlink_agg_timeout, | |
inject_rssi=cfg.inject_rssi, | |
# only controlling msgs should be mirrored | |
mirror=('127.0.0.1', settings.lora_tx.lora_tx_internal_port), | |
mirror_all=False, # only controlling msgs (lora) will be mirrored | |
arm_proto=arm_proto, | |
mavlink_sys_id=cfg.mavlink_sys_id, | |
mavlink_comp_id=cfg.mavlink_comp_id, | |
split_before_write=True, | |
lora_status_manager=lora_status_manager) | |
sockets = [] | |
lora_serial_tx_port_1 = lora_serial_tx_port_2 = lora_serial_rx_port_1 = lora_serial_rx_port_2 = None | |
# [GS] with Lora | |
if (settings.lora_tx_1.lora_tx_serial is not None and serial_re.match(settings.lora_tx_1.lora_tx_serial) or | |
settings.lora_tx_2.lora_tx_serial is not None and serial_re.match(settings.lora_tx_2.lora_tx_serial)): | |
lora_tx_listener, lora_serial_tx_port_1, lora_serial_tx_port_2 = \ | |
init_lora_tx( | |
arm_proto, | |
serial_re.match(settings.lora_tx_1.lora_tx_serial or ''), | |
serial_re.match(settings.lora_tx_2.lora_tx_serial or '')) | |
sockets += [reactor.listenUDP(settings.lora_tx.lora_tx_internal_port, lora_tx_listener)] | |
p_tx_l = [UDPProxyProtocol( | |
addr=('127.0.0.1', cfg.port_tx + i), | |
split_before_write=True, | |
arm_proto=arm_proto, | |
# everything should be mirrored | |
mirror=('127.0.0.1', settings.lora_tx.lora_tx_internal_port), | |
mirror_all=True, | |
noisy=False | |
) for i, _ in enumerate(wlans)] | |
# [Drone] with Lora | |
elif (settings.lora_rx_1.lora_rx_serial is not None and serial_re.match(settings.lora_rx_1.lora_rx_serial) or | |
settings.lora_rx_2.lora_rx_serial is not None and serial_re.match(settings.lora_rx_2.lora_rx_serial)): | |
mavlink_filter = MavlinkFilter() | |
p_tx_l = [UDPProxyProtocol(('127.0.0.1', cfg.port_tx + i), | |
mavlink_sys_id=cfg.mavlink_sys_id, | |
mavlink_comp_id=cfg.mavlink_comp_id, | |
inject_rssi=True) for i, _ in enumerate(wlans)] | |
def _start_with_lora(lora_number): | |
lora_settings = getattr(settings, f'lora_rx_{lora_number}') | |
if settings.sdr.sdr_enabled: | |
log.msg('[SDR] testing START <<<<') | |
sdr_manager = SDRManager(sdr_path=settings.sdr.sdr_path, | |
channels=json.loads(lora_settings.channels), | |
bandwidth=settings.sdr.bandwidth, | |
sample_rate=settings.sdr.sample_rate) | |
sdr_manager.get_new_channel(json.loads(lora_settings.channels)[0]) | |
log.msg('[SDR] testing END >>>>') | |
lora_serial_rx_proxy, lora_serial_rx_port = \ | |
init_lora_rx(1, | |
p_tx_l[0], | |
arm_proto, | |
serial_re.match(lora_settings.lora_rx_serial), | |
mavlink_filter=mavlink_filter) | |
lora_serial_rx_proxy.peer = p_in | |
return lora_serial_rx_proxy, lora_serial_rx_port | |
if (settings.lora_rx_1.lora_rx_serial is not None and | |
serial_re.match(settings.lora_rx_1.lora_rx_serial)): | |
lora_serial_rx_proxy_1, lora_serial_rx_port_1 = _start_with_lora(1) | |
if (settings.lora_rx_2.lora_rx_serial is not None and | |
serial_re.match(settings.lora_rx_2.lora_rx_serial)): | |
lora_serial_rx_proxy_2, lora_serial_rx_port_2 = _start_with_lora(2) | |
# without Lora | |
else: | |
log.msg('No Lora endpoints defined, "only wfb" mode ON') | |
p_tx_l = [UDPProxyProtocol(('127.0.0.1', cfg.port_tx + i)) for i, _ in enumerate(wlans)] | |
p_rx = UDPProxyProtocol(arm_proto=arm_proto, noisy=cfg.noisy_rx) | |
p_rx.peer = p_in | |
if serial: | |
serial_port = SerialPort(p_in, os.path.join('/dev', serial[0]), reactor, baudrate=serial[1]) | |
serial_port._serial.exclusive = True | |
else: | |
serial_port = None | |
sockets += [reactor.listenUDP(listen[1] if listen else 0, p_in)] | |
sockets += [reactor.listenUDP(cfg.port_rx, p_rx)] | |
sockets += [reactor.listenUDP(0, p_tx) for p_tx in p_tx_l] | |
log.msg('Telem RX: %s' % (' '.join(cmd_rx),)) | |
log.msg('Telem TX: %s' % (' '.join(cmd_tx),)) | |
ant_f = AntennaFactory(p_in, p_tx_l, lora_status=lora_status_manager) | |
if cfg.stats_port: | |
reactor.listenTCP(cfg.stats_port, ant_f) | |
# [Drone] disable normal wfb protocol to test only Lora msgs | |
if settings.lora_rx.disable_wifi_rx: | |
dl = [TXProtocol(cmd_tx, 'telem tx').start()] | |
log.msg("!!!!!!!!!!!!!!!!!!!!!!!!!!!!") | |
log.msg("Normal wifi communication from GS is disabled, only Lora packets will be handled.") | |
else: | |
dl = [RXProtocol(ant_f, cmd_rx, 'telem rx').start(), TXProtocol(cmd_tx, 'telem tx').start()] | |
def _cleanup(x): | |
if serial_port is not None: | |
serial_port.loseConnection() | |
serial_port.connectionLost(failure.Failure(ti_main.CONNECTION_DONE)) | |
if lora_serial_tx_port_1 is not None: | |
lora_serial_tx_port_1.loseConnection() | |
lora_serial_tx_port_1.connectionLost(failure.Failure(ti_main.CONNECTION_DONE)) | |
if lora_serial_tx_port_2 is not None: | |
lora_serial_tx_port_2.loseConnection() | |
lora_serial_tx_port_2.connectionLost(failure.Failure(ti_main.CONNECTION_DONE)) | |
if lora_serial_rx_port_1 is not None: | |
lora_serial_rx_port_1.loseConnection() | |
lora_serial_rx_port_1.connectionLost(failure.Failure(ti_main.CONNECTION_DONE)) | |
if lora_serial_rx_port_2 is not None: | |
lora_serial_rx_port_2.loseConnection() | |
lora_serial_rx_port_2.connectionLost(failure.Failure(ti_main.CONNECTION_DONE)) | |
for s in sockets: | |
s.stopListening() | |
return x | |
return defer.gatherResults(dl, consumeErrors=True).addBoth(_cleanup) \ | |
.addErrback(lambda f: f.trap(defer.FirstError) and f.value.subFailure) | |
def init_video(profile, wlans, link_id): | |
cfg = getattr(settings, '%s_video' % (profile,)) | |
if listen_re.match(cfg.peer): | |
m = listen_re.match(cfg.peer) | |
listen = m.group('addr'), int(m.group('port')) | |
log.msg('Listen for video stream %d on %s:%d' % (cfg.stream, listen[0], listen[1])) | |
# We don't use TX diversity for video streaming due to only one transmitter on the vehichle | |
cmd = ('%s -p %d -u %d -K %s -B %d -G %s -S %d -L %d -M %d -k %d -n %d -T %d -i %d %s' % \ | |
(os.path.join(settings.path.bin_dir, 'wfb_tx'), cfg.stream, | |
listen[1], os.path.join(settings.path.conf_dir, cfg.keypair), | |
cfg.bandwidth, "short" if cfg.short_gi else "long", cfg.stbc, cfg.ldpc, cfg.mcs_index, | |
cfg.fec_k, cfg.fec_n, cfg.fec_timeout, link_id, wlans[0])).split() | |
df = TXProtocol(cmd, 'video tx').start() | |
elif connect_re.match(cfg.peer): | |
m = connect_re.match(cfg.peer) | |
connect = m.group('addr'), int(m.group('port')) | |
log.msg('Send video stream %d to %s:%d' % (cfg.stream, connect[0], connect[1])) | |
ant_f = AntennaFactory(None, None) | |
if cfg.stats_port: | |
reactor.listenTCP(cfg.stats_port, ant_f) | |
cmd = ('%s -p %d -c %s -u %d -K %s -k %d -n %d -i %d' % \ | |
(os.path.join(settings.path.bin_dir, 'wfb_rx'), | |
cfg.stream, connect[0], connect[1], | |
os.path.join(settings.path.conf_dir, cfg.keypair), | |
cfg.fec_k, cfg.fec_n, link_id)).split() + wlans | |
df = RXProtocol(ant_f, cmd, 'video rx').start() | |
else: | |
raise Exception('Unsupport peer address: %s' % (cfg.peer,)) | |
log.msg('Video: %s' % (' '.join(cmd),)) | |
return df | |
def init_tunnel(profile, wlans, link_id): | |
cfg = getattr(settings, '%s_tunnel' % (profile,)) | |
cmd_rx = ('%s -p %d -u %d -K %s -k %d -n %d -i %d' % \ | |
(os.path.join(settings.path.bin_dir, 'wfb_rx'), cfg.stream_rx, | |
cfg.port_rx, os.path.join(settings.path.conf_dir, cfg.keypair), cfg.fec_k, cfg.fec_n, | |
link_id)).split() + wlans | |
cmd_tx = ('%s -p %d -u %d -K %s -B %d -G %s -S %d -L %d -M %d -k %d -n %d -T %d -i %d' % \ | |
(os.path.join(settings.path.bin_dir, 'wfb_tx'), | |
cfg.stream_tx, cfg.port_tx, os.path.join(settings.path.conf_dir, cfg.keypair), | |
cfg.bandwidth, "short" if cfg.short_gi else "long", cfg.stbc, cfg.ldpc, cfg.mcs_index, | |
cfg.fec_k, cfg.fec_n, cfg.fec_timeout, link_id)).split() + wlans | |
p_in = TUNTAPProtocol() | |
p_tx_l = [UDPProxyProtocol(('127.0.0.1', cfg.port_tx + i)) for i, _ in enumerate(wlans)] | |
p_rx = UDPProxyProtocol() | |
p_rx.peer = p_in | |
tun_ep = TUNTAPTransport(reactor, p_in, cfg.ifname, cfg.ifaddr, mtu=settings.common.radio_mtu) | |
sockets = [reactor.listenUDP(cfg.port_rx, p_rx)] | |
sockets += [reactor.listenUDP(0, p_tx) for p_tx in p_tx_l] | |
log.msg('Tunnel RX: %s' % (' '.join(cmd_rx),)) | |
log.msg('Tunnel TX: %s' % (' '.join(cmd_tx),)) | |
ant_f = AntennaFactory(p_in, p_tx_l) | |
if cfg.stats_port: | |
reactor.listenTCP(cfg.stats_port, ant_f) | |
dl = [RXProtocol(ant_f, cmd_rx, 'tunnel rx').start(), | |
TXProtocol(cmd_tx, 'tunnel tx').start()] | |
def _cleanup(x): | |
tun_ep.loseConnection() | |
for s in sockets: | |
s.stopListening() | |
return x | |
return defer.gatherResults(dl, consumeErrors=True).addBoth(_cleanup) \ | |
.addErrback(lambda f: f.trap(defer.FirstError) and f.value.subFailure) | |
def main(): | |
log.msg('WFB version %s-%s' % (settings.common.version, settings.common.commit[:8])) | |
profile, wlans = sys.argv[1], list(wlan for arg in sys.argv[2:] for wlan in arg.split()) | |
uname = os.uname() | |
log.msg('Run on %s/%s @%s, profile %s using %s' % (uname[4], uname[2], uname[1], profile, ', '.join(wlans))) | |
log.msg('Using cfg files:\n%s' % ('\n'.join(cfg_files),)) | |
reactor.callWhenRunning(lambda: defer.maybeDeferred(init, profile, wlans) | |
.addErrback(abort_on_crash)) | |
reactor.run() | |
rc = exit_status() | |
log.msg('Exiting with code %d' % rc) | |
sys.exit(rc) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment