-
-
Save dwoffinden/20be1f532c3d34f6311a2ca5e99cad54 to your computer and use it in GitHub Desktop.
Python implementation of the endscopetool (sic!) Android application used for the Vitcoco ear wax remover camera thingy.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Python implementation of the endscopetool (sic!) Android application used for the Vitcoco ear wax remover camera thingy. | |
# CC-0 / Public Domain | |
# (0) 2023 Raphael Wimmer | |
# v0.1.0 | |
# reverse-engineered using a packet capture log - this means that I have no idea what all those magic numbers mean | |
# and whether there are further features that might be supported by the hardware | |
# usage: first connect to the 'softish-XXXX' wifi, then run this script. Check code for keyboard shortcuts. | |
import socket | |
import sys | |
import cv2 | |
import numpy as np | |
import time | |
import collections | |
from PIL import Image | |
from io import BytesIO | |
from urllib.parse import parse_qs | |
def get_battery_level(query_string): | |
""" | |
Extracts the battery level from a string like 'type=2001&data=23'. | |
Returns an integer or None if not found or invalid. | |
""" | |
try: | |
params = parse_qs(query_string) | |
return int(params["data"][0]) / 100 | |
except (KeyError, IndexError, ValueError): | |
return None | |
def draw_battery(img, x, y, width, height, level, thickness): | |
""" | |
Draw a battery icon at (x, y) with given width, height and charge level (0 to 1). | |
""" | |
# Clamp level to [0, 1] | |
level = max(0, min(float(level), 1.0)) | |
# Colors | |
border_color = (255, 255, 255) | |
fill_color = (0, 255, 0) if level > 0.3 else (0, 0, 255) # Red if low battery | |
# Draw battery outline | |
cv2.rectangle(img, (x, y), (x + width, y + height), border_color, thickness) | |
# Draw battery tip | |
tip_width = int(width * 0.08) | |
tip_x = x + width | |
tip_y = y + int(height * 0.3) | |
tip_height = int(height * 0.4) | |
cv2.rectangle( | |
img, (tip_x, tip_y), (tip_x + tip_width, tip_y + tip_height), border_color, -1 | |
) | |
# Fill battery level | |
fill_width = int((width - 4) * level) | |
cv2.rectangle( | |
img, (x + 2, y + 2), (x + 2 + fill_width, y + height - 2), fill_color, -1 | |
) | |
def absolute_frame_from_raw(raw_frame, latest_abs_frame): | |
# Find the multiple of 256 that makes raw_frame closest to latest_abs_frame | |
base = (latest_abs_frame // 256) * 256 | |
candidates = [base - 256 + raw_frame, base + raw_frame, base + 256 + raw_frame] | |
# pick the candidate closest to latest_abs_frame | |
abs_frame = min(candidates, key=lambda x: abs(x - latest_abs_frame)) | |
return abs_frame | |
debug = False | |
buffer_size = 1500 | |
target_ip = "192.168.1.1" | |
target_port_meta = 61502 | |
source_port_meta = 50262 | |
target_port_vid = 61503 | |
source_port_vid = 51320 | |
sock_meta = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
sock_meta.bind(("0.0.0.0", source_port_meta)) | |
sock_vid = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
sock_vid.bind(("0.0.0.0", source_port_vid)) | |
sock_vid.settimeout(5.0) | |
brightness = 100 | |
win_name = "Video Stream" | |
firstframe = True | |
def query_battery(): | |
# Battery? | |
data = "type=1001\x0a".encode() | |
sock_meta.sendto(data, (target_ip, target_port_meta)) | |
reply, addr = sock_meta.recvfrom(buffer_size) | |
received_data = reply.decode() | |
return get_battery_level(received_data) | |
try: | |
# get system info | |
data = "type=1002\x0a".encode() | |
sock_meta.sendto(data, (target_ip, target_port_meta)) | |
reply, addr = sock_meta.recvfrom(buffer_size) | |
received_data = reply.decode() | |
print("Received data:", received_data) | |
battery_level = query_battery() | |
print(f"Battery level: {battery_level}") | |
# three times according to captured traffic | |
data = "\x20\x36\x00\x02".encode() | |
sock_vid.sendto(data, (target_ip, target_port_vid)) | |
sock_vid.sendto(data, (target_ip, target_port_vid)) | |
sock_vid.sendto(data, (target_ip, target_port_vid)) | |
# set led brightness to 100% | |
data = "type=1003&value=100\x0a".encode() | |
# start with led off | |
# data = "type=1003&value=0\x0a".encode() | |
sock_meta.sendto(data, (target_ip, target_port_meta)) | |
reply, addr = sock_meta.recvfrom(buffer_size) | |
# handle UnicodeDecodeError: 'utf-8' codec can't decode byte 0xaa in position 21: invalid start byte gracefully | |
try: | |
received_data = reply.decode() | |
print("Received data:", received_data) | |
except UnicodeDecodeError: | |
print("UnicodeDecodeError, can be ignored") | |
# print("Sender address:", addr) | |
cv2.namedWindow(win_name, flags=cv2.WINDOW_GUI_NORMAL) | |
rotation_lock = False | |
fullframe = False | |
latest_frame = 0 | |
raw_frame = 0 | |
frame = 0 | |
part = 0 | |
pic_buf = bytearray() | |
JPEG_HEADER = bytes.fromhex("FF D8 FF E0 00 10 4A 46 49 46") | |
keep_awake_time = time.time() | |
# Store received parts per frame | |
frames_dict = {} # frame_number -> {part_number: pic_data} | |
parts_dict = {} # number of parts required per frame | |
while True: | |
# read video stream | |
reply, addr = sock_vid.recvfrom(buffer_size) | |
raw_frame = reply[0] | |
frame_end = reply[1] | |
part = reply[2] | |
part_end = reply[3] | |
misc_data = reply[4:8] | |
if not rotation_lock: | |
rotation = int.from_bytes(reply[4:6], "big") | |
pic_data = reply[8:] | |
frame = absolute_frame_from_raw(raw_frame, frame) | |
# store the part | |
if frame not in frames_dict: | |
frames_dict[frame] = {} | |
frames_dict[frame][part] = pic_data | |
if debug: | |
print( | |
f"raw_frame={raw_frame}, frame={frame}, frame_end={frame_end}, part={part}, part_end={part_end}" | |
) | |
# find number of parts required | |
if frame_end == 1: | |
parts_dict[frame] = part_end | |
if frame in parts_dict: | |
num_parts = parts_dict[frame] | |
parts = frames_dict[frame] | |
if all(p in parts for p in range(num_parts)): | |
pic_buf = b"".join(parts[i] for i in range(num_parts)) | |
try: | |
image = Image.open(BytesIO(pic_buf)) | |
image_np = np.array(image) | |
image_cv = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR) | |
num_rows, num_cols = image_cv.shape[:2] | |
if not fullframe: | |
# Case 1: Masked circle. The window will be a square of the SHORTER dimension. | |
square_size = min(num_rows, num_cols) | |
# Create a circular mask on the original image dimensions | |
mask = np.zeros((num_rows, num_cols), np.uint8) | |
cv2.circle( | |
mask, | |
(num_cols // 2, num_rows // 2), | |
square_size // 2, | |
255, | |
-1, | |
) | |
image_masked = cv2.bitwise_and(image_cv, image_cv, mask=mask) | |
# Get rotation matrix for the original image | |
rotation_matrix = cv2.getRotationMatrix2D( | |
(num_cols / 2, num_rows / 2), rotation + 90, 1 | |
) | |
# Rotate the masked image within its original frame | |
image_rotated = cv2.warpAffine( | |
image_masked, rotation_matrix, (num_cols, num_rows) | |
) | |
# Crop the center square from the rotated image | |
center_x, center_y = num_cols // 2, num_rows // 2 | |
half_size = square_size // 2 | |
image_to_show = image_rotated[ | |
center_y - half_size : center_y + half_size, | |
center_x - half_size : center_x + half_size, | |
] | |
else: | |
# Case 2: Full frame, ensuring no corners are ever cropped. | |
# The window will be a square with side length equal to the image diagonal. | |
# Calculate the length of the image diagonal | |
diagonal = np.sqrt(num_cols**2 + num_rows**2) | |
# The new square size is the diagonal, rounded up to the nearest integer | |
square_size = int(np.ceil(diagonal)) | |
# Get the rotation matrix centered on the original image | |
rotation_matrix = cv2.getRotationMatrix2D( | |
(num_cols / 2, num_rows / 2), rotation + 90, 1 | |
) | |
# Adjust the matrix's translation component to center the image on the new, larger canvas | |
tx = (square_size - num_cols) / 2 | |
ty = (square_size - num_rows) / 2 | |
rotation_matrix[0, 2] += tx | |
rotation_matrix[1, 2] += ty | |
# Warp the original image onto the new square canvas | |
image_to_show = cv2.warpAffine( | |
image_cv, rotation_matrix, (square_size, square_size) | |
) | |
if debug: | |
print( | |
f"image {num_rows}x{num_cols}, using window {square_size}x{square_size}" | |
) | |
draw_battery( | |
image_to_show, | |
x=square_size // 100, | |
y=square_size // 100, | |
width=square_size // 10, | |
height=square_size // 20, | |
level=battery_level, | |
thickness=square_size // 200, | |
) | |
cv2.imshow(win_name, image_to_show) | |
if firstframe: | |
cv2.resizeWindow(win_name, square_size, square_size) | |
firstframe = False | |
# delete earlier frame data | |
frames_dict = {f: frames_dict[f] for f in frames_dict if f >= frame} | |
parts_dict = {f: parts_dict[f] for f in parts_dict if f >= frame} | |
if time.time() > keep_awake_time: | |
keep_awake_time = time.time() + 10 | |
battery_level = query_battery() | |
print(f"Battery level: {battery_level}") | |
except OSError: | |
print("image corrupted") | |
# process UI events (e.g. window closing) and poll for a keypress | |
key = cv2.pollKey() & 0xFF | |
if key == ord("1"): | |
rotation_lock = True | |
rotation = 0 | |
elif key == ord("2"): | |
rotation_lock = True | |
rotation = 90 | |
elif key == ord("3"): | |
rotation_lock = True | |
rotation = 180 | |
elif key == ord("4"): | |
rotation_lock = True | |
rotation = 270 | |
elif key == ord("r"): | |
rotation_lock = False | |
elif ( | |
key == ord("q") | |
or key == 27 | |
or cv2.getWindowProperty(win_name, cv2.WND_PROP_AUTOSIZE) == -1 | |
): | |
print("window closed") | |
break | |
elif key == ord("w"): | |
fd = open("out.jpg", "wb") | |
ret = fd.write(pic_buf) | |
fd.close() | |
print("Wrote " + str(ret) + " bytes to out.jpg") | |
elif key == ord("+"): | |
if brightness < 100: | |
brightness += 10 | |
data = ("type=1003&value=" + str(brightness) + "\x0a").encode() | |
print("Send data: ", data) | |
sock_meta.sendto(data, (target_ip, target_port_meta)) | |
reply, addr = sock_meta.recvfrom(buffer_size) | |
received_data = reply.decode() | |
print("Received data:", received_data) | |
elif key == ord("-"): | |
if brightness > 0: | |
brightness -= 10 | |
data = ("type=1003&value=" + str(brightness) + "\x0a").encode() | |
print("Send data: ", data) | |
sock_meta.sendto(data, (target_ip, target_port_meta)) | |
reply, addr = sock_meta.recvfrom(buffer_size) | |
received_data = reply.decode() | |
print("Received data:", received_data) | |
elif key == ord("f"): | |
fullframe = not fullframe | |
elif key == ord("d"): | |
debug = not debug | |
finally: | |
# stop stream | |
data = "\x20\x37".encode() | |
sock_vid.sendto(data, (target_ip, target_port_vid)) | |
# Close the socket | |
sock_meta.close() | |
sock_vid.close() | |
cv2.destroyAllWindows() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let | |
pkgs = import <nixpkgs> { }; | |
in | |
pkgs.mkShell { | |
packages = [ | |
pkgs.nixfmt-rfc-style | |
pkgs.gtk3 | |
(pkgs.python313.withPackages (ps: [ | |
(ps.opencv4.override { enableGtk3 = true; }) | |
ps.numpy | |
ps.pillow | |
])) | |
]; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment