Created
May 13, 2019 13:11
-
-
Save davidhoness/b16a666403c46bfbe159fa2ec907df1d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import division | |
from pysstv.sstv import SSTV | |
from pysstv.color import PD120 | |
from itertools import islice | |
from PIL import Image | |
from shapely.geometry import Polygon, Point | |
import struct | |
import serial | |
import pyaudio | |
import glob | |
import time | |
import qrcode | |
import uuid | |
import datetime | |
import ephem | |
import math | |
import os | |
import sys | |
W = 0 | |
H = 1 | |
class py_audio_sstv(object): | |
def __init__(self, sstv): | |
self.pa = pyaudio.PyAudio() | |
self.sstv = sstv | |
self.fmt = '<' + SSTV.BITS_TO_STRUCT[sstv.bits] | |
def __del__(self): | |
self.pa.terminate() | |
def execute(self, keep_playing=None): | |
self.sampler = self.sstv.gen_samples() | |
stream = self.pa.open( | |
format=self.pa.get_format_from_width(self.sstv.bits // 8), | |
channels=1, | |
rate=self.sstv.samples_per_sec, | |
output=True, | |
stream_callback=self.callback) | |
stream.start_stream() | |
while stream.is_active(): | |
if keep_playing is not None and not keep_playing(): | |
break | |
time.sleep(0.5) | |
stream.stop_stream() | |
stream.close() | |
def callback(self, in_data, frame_count, time_info, status): | |
frames = bytes().join(struct.pack(self.fmt, b) for b in islice(self.sampler, frame_count)) | |
return frames, pyaudio.paContinue | |
class sstv_event(object): | |
def __init__(self, sstv_mode, off_time_seconds, in_image_path_glob, out_image_path, tle_file, log_file, keep_out_zones=[], shutdown_time=None, begin_transmission_cb=None, end_transmission_cb=None): | |
self.sstv_mode = sstv_mode | |
self.off_time_seconds = off_time_seconds | |
self.in_image_path_glob = in_image_path_glob | |
self.out_image_path = out_image_path | |
self.tle_file = tle_file | |
self.log_file = log_file | |
self.keep_out_zones = keep_out_zones | |
self.shutdown_time = shutdown_time | |
self.begin_transmission_cb = begin_transmission_cb | |
self.end_transmission_cb = end_transmission_cb | |
self.last_txing_time_utc = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.off_time_seconds) | |
self.validate_images() | |
self.validate_keep_out_zones() | |
self.validate_path(self.out_image_path) | |
self.load_tle() | |
def validate_images(self): | |
self.images = [] | |
for image_file in glob.glob(self.in_image_path_glob): | |
try: | |
img = Image.open(image_file) | |
self.images.append(image_file) | |
except Exception as e: | |
print(e) | |
if len(self.images) == 0: | |
print("no images found") | |
sys.exit(0) | |
def validate_keep_out_zones(self): | |
for zone in self.keep_out_zones: | |
if not type(zone) is Polygon or not zone.is_valid: | |
print("invalid polygon in keep out zones") | |
print(zone) | |
sys.exit(0) | |
def validate_path(self, p): | |
if not os.path.exists(p): | |
os.makedirs(p) | |
def load_tle(self): | |
with open(self.tle_file) as f: | |
lines = f.readlines() | |
self.tle = ephem.readtle(lines[0], lines[1], lines[2]) | |
self.tle.compute() | |
@property | |
def running(self): | |
result = True | |
if self.shutdown_time is not None: | |
result = datetime.datetime.utcnow() < self.shutdown_time | |
return result | |
def load_size_image(self, image_file): | |
img = Image.open(image_file) | |
if img.size[W] * img.size[H] != self.sstv_mode.WIDTH * self.sstv_mode.HEIGHT: | |
img = img.resize((self.sstv_mode.WIDTH, self.sstv_mode.HEIGHT), Image.ANTIALIAS) | |
return img | |
def create_qr_code(self, data): | |
qr = qrcode.QRCode( | |
version=2, | |
error_correction=qrcode.constants.ERROR_CORRECT_H, # max redundancy | |
box_size=4, # min for clear contrast in decoded pictures | |
border=2) | |
qr.add_data(data) | |
qr.make(fit=True) | |
return qr.make_image() | |
def overlay_qr_codes(self, main_img, uid): | |
qr_img = self.create_qr_code(uid) | |
qr_count = main_img.size[H] // qr_img.size[H] | |
spacer = (main_img.size[H] - (qr_img.size[H] * qr_count)) // qr_count | |
qr_box = Image.new("RGB", (qr_img.size[W], main_img.size[H]), (255, 255, 255)) | |
main_img.paste(qr_box, (main_img.size[W] - qr_img.size[W], 0)) | |
for i in range(qr_count): | |
offset = i * (qr_img.size[H] + spacer) | |
offset += spacer // 2 | |
main_img.paste(qr_img, (main_img.size[W] - qr_img.size[W], offset)) | |
qr_img = qr_img.transpose(Image.ROTATE_90) # rotate to mitigate interference | |
def clear_to_transmit(self): | |
result = self.running | |
# off time check | |
if result and datetime.datetime.utcnow() < self.last_txing_time_utc + datetime.timedelta(seconds=self.off_time_seconds): | |
result = False | |
# keep out zone check | |
if result and len(self.keep_out_zones) > 0: | |
self.tle.compute() | |
p = Point(math.degrees(self.tle.sublat), math.degrees(self.tle.sublong)) | |
for zone in self.keep_out_zones: | |
if zone.contains(p): | |
result = False | |
break | |
return result | |
def get_timestamp_location(self): | |
t = datetime.datetime.utcnow() | |
self.tle.compute(t) | |
lat = round(math.degrees(self.tle.sublat), 4) | |
long = round(math.degrees(self.tle.sublong), 4) | |
return t, lat, long | |
def run_call_back(self, cb): | |
if cb is not None: | |
cb() | |
def execute(self): | |
self.log = open(self.log_file, "w") | |
while self.running: | |
for image_file in self.images: | |
while not self.clear_to_transmit() and self.running: | |
print("waiting for clear to transmit") | |
time.sleep(1) | |
if not self.running: | |
break | |
# create unique ID for the image | |
uid = str(uuid.uuid4())[:8] | |
# load and resize the image if needed | |
main_img = self.load_size_image(image_file) | |
# repeat QR code down right side of image | |
self.overlay_qr_codes(main_img, uid) | |
# save reference image to disk | |
tx_file = os.path.join(self.out_image_path, "%s.jpg" % uid) | |
main_img.save(tx_file, "JPEG") | |
# convert image to sstv data | |
sstv = self.sstv_mode(main_img, 48000, 16) | |
sstv.vox_enabled = True | |
# capture start time and location | |
start_time, start_lat, start_long = self.get_timestamp_location() | |
# turn on radio | |
self.run_call_back(self.begin_transmission_cb) | |
# play sstv audio and stop if clear to transmit becomes False | |
py_audio_sstv(sstv).execute(self.clear_to_transmit) | |
# turn off radio | |
self.run_call_back(self.end_transmission_cb) | |
# capture stop time and location | |
stop_time, stop_lat, stop_long = self.get_timestamp_location() | |
# write to log file | |
self.log.write(",".join((uid, str(start_time), str(start_lat), str(start_long), str(stop_time), str(stop_lat), str(stop_long), tx_file, str(self.clear_to_transmit())))) | |
self.log.write("\n") | |
self.log.flush() | |
# enforce off time by updating last_txing_time_utc | |
self.last_txing_time_utc = stop_time | |
self.log.close() | |
# very rough polygon | |
china = Polygon([ | |
(48.74, 87.17), | |
(39.09, 74.05), | |
(33.25, 79.13), | |
(28.24, 86.44), | |
(29.58, 95.74), | |
(26.33, 98.82), | |
(24.29, 97.89), | |
(21.85, 100.82), | |
(23.57, 105.32), | |
(21.65, 108.14), | |
(23.04, 116.23), | |
(27.10, 120.31), | |
(30.63, 122.08), | |
(39.81, 124.09), | |
(46.87, 133.90), | |
(53.37, 121.73), | |
(46.57, 119.57), | |
(41.64, 105.26), | |
(42.75, 96.28), | |
(45.30, 90.76)]) | |
def start_radio(): | |
print("start radio") | |
# os.system("linux command") | |
with serial.Serial(port="/dev/ttyAMA0", baudrate=115200) as ser: | |
ser.write(b"set_mode FM\r\n") | |
ser.write(b"set_freq 145800000\r\n") | |
ser.write(b"set_ptt TX\r\n") | |
ser.flush() | |
def stop_radio(): | |
print("stop radio") | |
# os.system("linux command") | |
with serial.Serial(port="/dev/ttyAMA0", baudrate=115200) as ser: | |
ser.write(b"set_ptt RX\r\n") | |
ser.flush() | |
ev = sstv_event(sstv_mode=PD120, | |
off_time_seconds=120, | |
in_image_path_glob="/usr/share/rpd-wallpaper/*.jpg", | |
out_image_path="/home/pi/qsstv/tx_sstv", | |
tle_file="iss.tle", | |
log_file="log.csv", | |
keep_out_zones=[china], | |
shutdown_time=datetime.datetime(2099, 12, 31, 18, 0, 0), | |
begin_transmission_cb=start_radio, | |
end_transmission_cb=stop_radio) | |
ev.execute() |
Author
davidhoness
commented
May 13, 2019
•
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment