-
-
Save RaphaelWimmer/5bcb286414e6cd38ed38724f9a6a6129 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3 | |
# Python implementation of the endscopetool (sic!) Android application used for the Vitcoco ear wax remover camera thingy. | |
# CC-0 / Public Domain | |
# (0) 2023 Raphael Wimmer | |
# v0.01 | |
# reverse-engineered using a packet capture log - this means that I have no idea what all those magic numbers mean | |
# and whether there are further features that might be supported by the hardware | |
# usage: first connect to the 'softish-XXXX' wifi, then run this script. Check code for keyboard shortcuts. | |
import socket | |
import sys | |
import cv2 | |
import numpy as np | |
from PIL import Image | |
from io import BytesIO | |
buffer_size = 1500 | |
target_ip = "192.168.1.1" | |
target_port_meta = 61502 | |
source_port_meta = 50262 | |
target_port_vid = 61503 | |
source_port_vid = 51320 | |
sock_meta = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
sock_meta.bind(('0.0.0.0', source_port_meta)) | |
sock_vid = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
sock_vid.bind(('0.0.0.0', source_port_vid)) | |
sock_vid.settimeout(5.0) | |
try: | |
# get system info | |
data = "type=1002\x0a".encode() | |
sock_meta.sendto(data, (target_ip, target_port_meta)) | |
reply, addr = sock_meta.recvfrom(buffer_size) | |
received_data = reply.decode() | |
print("Received data:", received_data) | |
#print("Sender address:", addr) | |
# type=2002&protocol=2&w=640&h=480&fps=20&ratio=4:3&angle=270&hardware=V1.1&company=vitcoco&id=a07b4c3092607cf29daaab607cf20000&firmware=1820220727&ssid=softish-31986&dn=Y8&bl=30 | |
# Battery? | |
data = "type=1001\x0a".encode() | |
sock_meta.sendto(data, (target_ip, target_port_meta)) | |
reply, addr = sock_meta.recvfrom(buffer_size) | |
received_data = reply.decode() | |
print("Received data (Battery level?):", received_data) | |
#print("Sender address:", addr) | |
# three times according to captured traffic | |
data = "\x20\x36\x00\x02".encode() | |
sock_vid.sendto(data, (target_ip, target_port_vid)) | |
sock_vid.sendto(data, (target_ip, target_port_vid)) | |
sock_vid.sendto(data, (target_ip, target_port_vid)) | |
# no idea what this command does | |
data = "type=1003&value=100\x0a".encode() | |
sock_meta.sendto(data, (target_ip, target_port_meta)) | |
reply, addr = sock_meta.recvfrom(buffer_size) | |
received_data = reply.decode() | |
print("Received data:", received_data) | |
#print("Sender address:", addr) | |
cv2.namedWindow("Video Stream", cv2.WINDOW_NORMAL) | |
#cv2.setWindowProperty("Video Stream", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN) | |
rotation_lock = False | |
frame = 0 | |
part = 0 | |
pic_buf = bytearray() | |
JPEG_HEADER = bytes.fromhex("FF D8 FF E0 00 10 4A 46 49 46") | |
while True: | |
# read video stream | |
reply, addr = sock_vid.recvfrom(buffer_size) | |
frame = reply[0] | |
frame_end = reply[1] | |
part = reply[2] | |
part_end = reply[3] | |
misc_data = reply[4:8] | |
if not rotation_lock: | |
rotation = int.from_bytes(reply[4:6], "big") | |
pic_data = reply[8:] | |
if pic_data.find(JPEG_HEADER) > -1: | |
if len(pic_buf) > 0: | |
try: | |
image = Image.open(BytesIO(pic_buf)) | |
image_np = np.array(image) | |
image_cv = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR) | |
num_rows, num_cols = image_cv.shape[:2] | |
mask = np.zeros((num_rows, num_cols), np.uint8) | |
cv2.circle(mask, (num_cols//2,num_rows//2), num_rows//2, 255, -1) | |
image_masked = cv2.bitwise_and(image_cv, image_cv, mask = mask) | |
rotation_matrix = cv2.getRotationMatrix2D((num_cols/2, num_rows/2), rotation + 90, 1) | |
image_rotated = cv2.warpAffine(image_masked, rotation_matrix, (num_cols, num_rows)) | |
cv2.imshow('Video Stream', image_rotated) | |
except OSError: | |
print("image corrupted") | |
key = cv2.waitKey(1) & 0xFF | |
if key == ord('1'): | |
rotation_lock = True | |
rotation = 0 | |
elif key == ord('2'): | |
rotation_lock = True | |
rotation = 90 | |
elif key == ord('3'): | |
rotation_lock = True | |
rotation = 180 | |
elif key == ord('4'): | |
rotation_lock = True | |
rotation = 270 | |
elif key == ord('r'): | |
rotation_lock = False | |
elif key == ord('q'): | |
break | |
elif key == ord('w'): | |
fd = open("out.jpg", "wb") | |
ret = fd.write(pic_buf) | |
fd.close() | |
print("Wrote " + str(ret) + " bytes to out.jpg") | |
#print("new frame") | |
pic_buf = bytearray() | |
pic_buf += pic_data | |
#print(frame, part, len(pic_buf)) | |
#print(misc_data[0], misc_data[1], misc_data[2], misc_data[3]) | |
#print(rotation) | |
finally: | |
# stop stream | |
data = "\x20\x37".encode() | |
sock_vid.sendto(data, (target_ip, target_port_vid)) | |
# Close the socket | |
sock_meta.close() | |
sock_vid.close() |
Very nice, here some additions as I got some UnicodeDecodeErrors. Also LED control added and fullframe toggle :)
Thanks. I added these features into a development version I'm currently refactoring.
However, LED control does not work with my device. Not sure whether this is a problem with the command or whether my device does not allow changing the LED brightness.
Hello
Thanks very much for your code which was really helpful for me!
I had a few issues:
- The device seemed to turn itself off. This was resolved by implementing checking of the battery level, which seemed to keep it awake.
- I kept getting corrupted images, which seemed to get worse after a while. This was resolved by implementing a simple bit of code to keep all of the responses and then only display them when all of a frame had been received.
I've attached my revised code below in case it's of use!
#!/usr/bin/env python3
# Python implementation of the endscopetool (sic!) Android application used for the Vitcoco ear wax remover camera thingy.
# CC-0 / Public Domain
# (0) 2023 Raphael Wimmer
# v0.1.0
# reverse-engineered using a packet capture log - this means that I have no idea what all those magic numbers mean
# and whether there are further features that might be supported by the hardware
# usage: first connect to the 'softish-XXXX' wifi, then run this script. Check code for keyboard shortcuts.
import socket
import sys
import cv2
import numpy as np
import time
import collections
from PIL import Image
from io import BytesIO
from urllib.parse import parse_qs
def get_battery_level(query_string):
"""
Extracts the battery level from a string like 'type=2001&data=23'.
Returns an integer or None if not found or invalid.
"""
try:
params = parse_qs(query_string)
return int(params["data"][0]) / 100
except (KeyError, IndexError, ValueError):
return None
def draw_battery(img, x, y, width, height, level):
"""
Draw a battery icon at (x, y) with given width, height and charge level (0 to 1).
"""
# Clamp level to [0, 1]
level = max(0, min(float(level), 1.0))
# Colors
border_color = (255, 255, 255)
fill_color = (0, 255, 0) if level > 0.3 else (0, 0, 255) # Red if low battery
# Draw battery outline
cv2.rectangle(img, (x, y), (x + width, y + height), border_color, 2)
# Draw battery tip
tip_width = int(width * 0.08)
tip_x = x + width
tip_y = y + int(height * 0.3)
tip_height = int(height * 0.4)
cv2.rectangle(img, (tip_x, tip_y), (tip_x + tip_width, tip_y + tip_height), border_color, -1)
# Fill battery level
fill_width = int((width - 4) * level)
cv2.rectangle(img, (x + 2, y + 2), (x + 2 + fill_width, y + height - 2), fill_color, -1)
def absolute_frame_from_raw(raw_frame, latest_abs_frame):
# Find the multiple of 256 that makes raw_frame closest to latest_abs_frame
base = (latest_abs_frame // 256) * 256
candidates = [base - 256 + raw_frame, base + raw_frame, base + 256 + raw_frame]
# pick the candidate closest to latest_abs_frame
abs_frame = min(candidates, key=lambda x: abs(x - latest_abs_frame))
return abs_frame
buffer_size = 1500
target_ip = "192.168.1.1"
target_port_meta = 61502
source_port_meta = 50262
target_port_vid = 61503
source_port_vid = 51320
sock_meta = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_meta.bind(('0.0.0.0', source_port_meta))
sock_vid = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_vid.bind(('0.0.0.0', source_port_vid))
sock_vid.settimeout(5.0)
brightness = 100
try:
# get system info
data = "type=1002\x0a".encode()
sock_meta.sendto(data, (target_ip, target_port_meta))
reply, addr = sock_meta.recvfrom(buffer_size)
received_data = reply.decode()
print("Received data:", received_data)
# Battery?
data = "type=1001\x0a".encode()
sock_meta.sendto(data, (target_ip, target_port_meta))
reply, addr = sock_meta.recvfrom(buffer_size)
received_data = reply.decode()
print("Received data (Battery level?):", received_data)
#print("Sender address:", addr)
# three times according to captured traffic
data = "\x20\x36\x00\x02".encode()
sock_vid.sendto(data, (target_ip, target_port_vid))
sock_vid.sendto(data, (target_ip, target_port_vid))
sock_vid.sendto(data, (target_ip, target_port_vid))
# set led brightness to 100%
data = "type=1003&value=100\x0a".encode()
# start with led off
#data = "type=1003&value=0\x0a".encode()
sock_meta.sendto(data, (target_ip, target_port_meta))
reply, addr = sock_meta.recvfrom(buffer_size)
# handle UnicodeDecodeError: 'utf-8' codec can't decode byte 0xaa in position 21: invalid start byte gracefully
try:
received_data = reply.decode()
print("Received data:", received_data)
except UnicodeDecodeError:
print ("UnicodeDecodeError, can be ignored")
#print("Sender address:", addr)
cv2.namedWindow("Video Stream", cv2.WINDOW_NORMAL)
cv2.resizeWindow("Video Stream", 1280, 720)
rotation_lock = False
fullframe = False
battery_level = 0
latest_frame = 0
raw_frame = 0
frame = 0
part = 0
pic_buf = bytearray()
JPEG_HEADER = bytes.fromhex("FF D8 FF E0 00 10 4A 46 49 46")
keep_awake_time = time.time()
# Store received parts per frame
frames_dict = {} # frame_number -> {part_number: pic_data}
parts_dict = {} # number of parts required per frame
while True:
# read video stream
reply, addr = sock_vid.recvfrom(buffer_size)
raw_frame = reply[0]
frame_end = reply[1]
part = reply[2]
part_end = reply[3]
misc_data = reply[4:8]
if not rotation_lock:
rotation = int.from_bytes(reply[4:6], "big")
pic_data = reply[8:]
frame = absolute_frame_from_raw(raw_frame, frame)
# store the part
if frame not in frames_dict:
frames_dict[frame] = {}
frames_dict[frame][part] = pic_data
# find number of frames required
if frame_end == 1:
parts_dict[frame] = part_end
if frame in parts_dict:
if parts_dict[frame] == len(frames_dict[frame]):
pic_buf = b''.join(frames_dict[frame][i] for i in range(parts_dict[frame]))
try:
image = Image.open(BytesIO(pic_buf))
image_np = np.array(image)
image_cv = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR)
if not fullframe:
num_rows, num_cols = image_cv.shape[:2]
mask = np.zeros((num_rows, num_cols), np.uint8)
cv2.circle(mask, (num_cols//2,num_rows//2), num_rows//2, 255, -1)
image_masked = cv2.bitwise_and(image_cv, image_cv, mask = mask)
rotation_matrix = cv2.getRotationMatrix2D((num_cols/2, num_rows/2), rotation + 90, 1)
image_rotated = cv2.warpAffine(image_masked, rotation_matrix, (num_cols, num_rows))
draw_battery(image_rotated, x=5, y=5, width=40, height=20, level=battery_level)
cv2.imshow('Video Stream', image_rotated)
else:
draw_battery(image_cv, x=5, y=5, width=15, height=8, level=battery_level)
cv2.imshow('Video Stream', image_cv)
#delete earlier frame data
frames_dict = {f: frames_dict[f] for f in frames_dict if f >= frame}
parts_dict = {f: parts_dict[f] for f in parts_dict if f >= frame}
if time.time() > keep_awake_time:
keep_awake_time = time.time() + 10
# Battery?
data = "type=1001\x0a".encode()
sock_meta.sendto(data, (target_ip, target_port_meta))
reply, addr = sock_meta.recvfrom(buffer_size)
battery_level = get_battery_level(reply.decode())
except OSError:
print("image corrupted")
key = cv2.waitKey(1) & 0xFF
if key == ord('1'):
rotation_lock = True
rotation = 0
elif key == ord('2'):
rotation_lock = True
rotation = 90
elif key == ord('3'):
rotation_lock = True
rotation = 180
elif key == ord('4'):
rotation_lock = True
rotation = 270
elif key == ord('r'):
rotation_lock = False
elif key == ord('q'):
break
elif key == ord('w'):
fd = open("out.jpg", "wb")
ret = fd.write(pic_buf)
fd.close()
print("Wrote " + str(ret) + " bytes to out.jpg")
elif key == ord('+'):
if brightness < 100:
brightness += 10
data = ("type=1003&value=" + str(brightness) + "\x0a").encode()
print("Send data: ", data)
sock_meta.sendto(data, (target_ip, target_port_meta))
reply, addr = sock_meta.recvfrom(buffer_size)
received_data = reply.decode()
print("Received data:", received_data)
elif key == ord('-'):
if brightness > 0:
brightness -= 10
data = ("type=1003&value=" + str(brightness) + "\x0a").encode()
print("Send data: ", data)
sock_meta.sendto(data, (target_ip, target_port_meta))
reply, addr = sock_meta.recvfrom(buffer_size)
received_data = reply.decode()
print("Received data:", received_data)
elif key == ord('f'):
fullframe = not fullframe
finally:
# stop stream
data = "\x20\x37".encode()
sock_vid.sendto(data, (target_ip, target_port_vid))
# Close the socket
sock_meta.close()
sock_vid.close()
Thanks @jamaggs ! I've been playing with this as well and have encountered both of those issues, I'll have to try your version 😀
I've been modifying the @Aghei2 version with full-frame rotation: https://gist.github.com/dwoffinden/20be1f532c3d34f6311a2ca5e99cad54
FYI the LED control also doesn't work for my device.
I had been meaning to ask @RaphaelWimmer if the development version was published anywhere?
Your version worked great for me, thanks again @jamaggs! I've merged it into my gist.
I also created a repo because gists don't display history as well, but the content is the same: https://github.com/dwoffinden/endscopetool
Hello @dwoffinden, glad to have helped!
I tried your version and the full frame works really well. However, I can't resize the cv2 window at all any more which is a shame.
On a purely aesthetic note, I also tinkered and found that cv2.namedWindow("Video Stream", flags=cv2.WINDOW_GUI_NORMAL) would get rid of the icons/toolbar at the top of the window.
Many thanks
James
Thanks @jamaggs ! I use a tiling window manager so hadn't noticed. I tried Gnome and saw similar.
My branch has that flag change, avoids resizing too much (cv2's highgui is a bit picky it seems), has a crash fix and should shut down cleanly if you close the window
Very nice, here some additions as I got some UnicodeDecodeErrors. Also LED control added and fullframe toggle :)