Skip to content

Instantly share code, notes, and snippets.

@RaphaelWimmer
Created June 21, 2023 23:46
Show Gist options
  • Save RaphaelWimmer/5bcb286414e6cd38ed38724f9a6a6129 to your computer and use it in GitHub Desktop.
Save RaphaelWimmer/5bcb286414e6cd38ed38724f9a6a6129 to your computer and use it in GitHub Desktop.
Python implementation of the endscopetool (sic!) Android application used for the Vitcoco ear wax remover camera thingy.
#!/usr/bin/env python3
# Python implementation of the endscopetool (sic!) Android application used for the Vitcoco ear wax remover camera thingy.
# CC-0 / Public Domain
# (0) 2023 Raphael Wimmer
# v0.01
# reverse-engineered using a packet capture log - this means that I have no idea what all those magic numbers mean
# and whether there are further features that might be supported by the hardware
# usage: first connect to the 'softish-XXXX' wifi, then run this script. Check code for keyboard shortcuts.
import socket
import sys
import cv2
import numpy as np
from PIL import Image
from io import BytesIO
buffer_size = 1500
target_ip = "192.168.1.1"
target_port_meta = 61502
source_port_meta = 50262
target_port_vid = 61503
source_port_vid = 51320
sock_meta = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_meta.bind(('0.0.0.0', source_port_meta))
sock_vid = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_vid.bind(('0.0.0.0', source_port_vid))
sock_vid.settimeout(5.0)
try:
# get system info
data = "type=1002\x0a".encode()
sock_meta.sendto(data, (target_ip, target_port_meta))
reply, addr = sock_meta.recvfrom(buffer_size)
received_data = reply.decode()
print("Received data:", received_data)
#print("Sender address:", addr)
# type=2002&protocol=2&w=640&h=480&fps=20&ratio=4:3&angle=270&hardware=V1.1&company=vitcoco&id=a07b4c3092607cf29daaab607cf20000&firmware=1820220727&ssid=softish-31986&dn=Y8&bl=30
# Battery?
data = "type=1001\x0a".encode()
sock_meta.sendto(data, (target_ip, target_port_meta))
reply, addr = sock_meta.recvfrom(buffer_size)
received_data = reply.decode()
print("Received data (Battery level?):", received_data)
#print("Sender address:", addr)
# three times according to captured traffic
data = "\x20\x36\x00\x02".encode()
sock_vid.sendto(data, (target_ip, target_port_vid))
sock_vid.sendto(data, (target_ip, target_port_vid))
sock_vid.sendto(data, (target_ip, target_port_vid))
# no idea what this command does
data = "type=1003&value=100\x0a".encode()
sock_meta.sendto(data, (target_ip, target_port_meta))
reply, addr = sock_meta.recvfrom(buffer_size)
received_data = reply.decode()
print("Received data:", received_data)
#print("Sender address:", addr)
cv2.namedWindow("Video Stream", cv2.WINDOW_NORMAL)
#cv2.setWindowProperty("Video Stream", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
rotation_lock = False
frame = 0
part = 0
pic_buf = bytearray()
JPEG_HEADER = bytes.fromhex("FF D8 FF E0 00 10 4A 46 49 46")
while True:
# read video stream
reply, addr = sock_vid.recvfrom(buffer_size)
frame = reply[0]
frame_end = reply[1]
part = reply[2]
part_end = reply[3]
misc_data = reply[4:8]
if not rotation_lock:
rotation = int.from_bytes(reply[4:6], "big")
pic_data = reply[8:]
if pic_data.find(JPEG_HEADER) > -1:
if len(pic_buf) > 0:
try:
image = Image.open(BytesIO(pic_buf))
image_np = np.array(image)
image_cv = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR)
num_rows, num_cols = image_cv.shape[:2]
mask = np.zeros((num_rows, num_cols), np.uint8)
cv2.circle(mask, (num_cols//2,num_rows//2), num_rows//2, 255, -1)
image_masked = cv2.bitwise_and(image_cv, image_cv, mask = mask)
rotation_matrix = cv2.getRotationMatrix2D((num_cols/2, num_rows/2), rotation + 90, 1)
image_rotated = cv2.warpAffine(image_masked, rotation_matrix, (num_cols, num_rows))
cv2.imshow('Video Stream', image_rotated)
except OSError:
print("image corrupted")
key = cv2.waitKey(1) & 0xFF
if key == ord('1'):
rotation_lock = True
rotation = 0
elif key == ord('2'):
rotation_lock = True
rotation = 90
elif key == ord('3'):
rotation_lock = True
rotation = 180
elif key == ord('4'):
rotation_lock = True
rotation = 270
elif key == ord('r'):
rotation_lock = False
elif key == ord('q'):
break
elif key == ord('w'):
fd = open("out.jpg", "wb")
ret = fd.write(pic_buf)
fd.close()
print("Wrote " + str(ret) + " bytes to out.jpg")
#print("new frame")
pic_buf = bytearray()
pic_buf += pic_data
#print(frame, part, len(pic_buf))
#print(misc_data[0], misc_data[1], misc_data[2], misc_data[3])
#print(rotation)
finally:
# stop stream
data = "\x20\x37".encode()
sock_vid.sendto(data, (target_ip, target_port_vid))
# Close the socket
sock_meta.close()
sock_vid.close()
@Aghei2
Copy link

Aghei2 commented Aug 13, 2023

Very nice, here some additions as I got some UnicodeDecodeErrors. Also LED control added and fullframe toggle :)

#!/usr/bin/env python3
# Python implementation of the endscopetool (sic!) Android application used for the Vitcoco ear wax remover camera thingy.
# CC-0 / Public Domain
# (0) 2023 Raphael Wimmer
# v0.1.0
# reverse-engineered using a packet capture log - this means that I have no idea what all those magic numbers mean
# and whether there are further features that might be supported by the hardware
# usage: first connect to the 'softish-XXXX' wifi, then run this script. Check code for keyboard shortcuts.

import socket
import sys
import cv2
import numpy as np
from PIL import Image
from io import BytesIO


buffer_size = 1500
target_ip = "192.168.1.1"
target_port_meta = 61502
source_port_meta = 50262

target_port_vid = 61503
source_port_vid = 51320

sock_meta = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_meta.bind(('0.0.0.0', source_port_meta))

sock_vid = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_vid.bind(('0.0.0.0', source_port_vid))
sock_vid.settimeout(5.0)
brightness = 100


try:
    # get system info
    data = "type=1002\x0a".encode()
    sock_meta.sendto(data, (target_ip, target_port_meta))
    reply, addr = sock_meta.recvfrom(buffer_size)
    received_data = reply.decode()

    print("Received data:", received_data)
    #print("Sender address:", addr)
    # type=2002&protocol=2&w=640&h=480&fps=20&ratio=4:3&angle=270&hardware=V1.1&company=vitcoco&id=a07b4c3092607cf29daaab607cf20000&firmware=1820220727&ssid=softish-31986&dn=Y8&bl=30

    # Battery?
    data = "type=1001\x0a".encode()
    sock_meta.sendto(data, (target_ip, target_port_meta))
    reply, addr = sock_meta.recvfrom(buffer_size)
    received_data = reply.decode()

    print("Received data (Battery level?):", received_data)
    #print("Sender address:", addr)

    # three times according to captured traffic
    data = "\x20\x36\x00\x02".encode()
    sock_vid.sendto(data, (target_ip, target_port_vid))
    sock_vid.sendto(data, (target_ip, target_port_vid))
    sock_vid.sendto(data, (target_ip, target_port_vid))

    # set led brightness to 100%
    data = "type=1003&value=100\x0a".encode()
    # start with led off
    #data = "type=1003&value=0\x0a".encode()
    sock_meta.sendto(data, (target_ip, target_port_meta))
    reply, addr = sock_meta.recvfrom(buffer_size)
    # handle UnicodeDecodeError: 'utf-8' codec can't decode byte 0xaa in position 21: invalid start byte gracefully
    try:
        received_data = reply.decode()
        print("Received data:", received_data)
    except UnicodeDecodeError:
        print ("UnicodeDecodeError, can be ignored")
    #print("Sender address:", addr)

    cv2.namedWindow("Video Stream", cv2.WINDOW_NORMAL)
    #cv2.setWindowProperty("Video Stream", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
    
    rotation_lock = False
    fullframe = False

    frame = 0
    part = 0
    pic_buf = bytearray()
    JPEG_HEADER = bytes.fromhex("FF D8 FF E0 00 10 4A 46 49 46")
    while True:
        # read video stream
        reply, addr = sock_vid.recvfrom(buffer_size)
        frame = reply[0]
        frame_end = reply[1]
        part = reply[2]
        part_end = reply[3]
        misc_data =  reply[4:8]
        if not rotation_lock:
            rotation = int.from_bytes(reply[4:6], "big")
        pic_data = reply[8:]
        if pic_data.find(JPEG_HEADER) > -1:
            if len(pic_buf) > 0:
                try:
                    image = Image.open(BytesIO(pic_buf))
                    image_np = np.array(image)
                    image_cv = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR)
                    if not fullframe:
                        num_rows, num_cols = image_cv.shape[:2]
                        mask = np.zeros((num_rows, num_cols), np.uint8)
                        cv2.circle(mask, (num_cols//2,num_rows//2), num_rows//2, 255, -1)
                        image_masked = cv2.bitwise_and(image_cv, image_cv, mask = mask)
                        rotation_matrix = cv2.getRotationMatrix2D((num_cols/2, num_rows/2), rotation + 90, 1)
                        image_rotated = cv2.warpAffine(image_masked, rotation_matrix, (num_cols, num_rows))
                        cv2.imshow('Video Stream', image_rotated)
                    else:
                        cv2.imshow('Video Stream', image_cv)
                except OSError:
                    print("image corrupted")
            key = cv2.waitKey(1) & 0xFF
            if key == ord('1'):
                rotation_lock = True
                rotation = 0
            elif key == ord('2'):
                rotation_lock = True
                rotation = 90
            elif key == ord('3'):
                rotation_lock = True
                rotation = 180
            elif key == ord('4'):
                rotation_lock = True
                rotation = 270
            elif key == ord('r'):
                rotation_lock = False
            elif key == ord('q'):
                break
            elif key == ord('w'):
                fd = open("out.jpg", "wb")
                ret = fd.write(pic_buf)
                fd.close()
                print("Wrote " + str(ret) + " bytes to out.jpg")
            elif key == ord('+'):
                if brightness < 100:
                    brightness += 10
                    data = ("type=1003&value=" + str(brightness) + "\x0a").encode()
                    print("Send data: ", data)
                    sock_meta.sendto(data, (target_ip, target_port_meta))
                    reply, addr = sock_meta.recvfrom(buffer_size)
                    received_data = reply.decode()
                    print("Received data:", received_data)
            elif key == ord('-'):
                if brightness > 0:
                    brightness -= 10
                    data = ("type=1003&value=" + str(brightness) + "\x0a").encode()
                    print("Send data: ", data)
                    sock_meta.sendto(data, (target_ip, target_port_meta))
                    reply, addr = sock_meta.recvfrom(buffer_size)
                    received_data = reply.decode()
                    print("Received data:", received_data)
            elif key == ord('f'):
                fullframe = not fullframe
            #print("new frame")
            pic_buf = bytearray()
        pic_buf += pic_data
        #print(frame, part, len(pic_buf))
        #print(misc_data[0], misc_data[1], misc_data[2], misc_data[3])
        #print(rotation)


finally:
    # stop stream
    data = "\x20\x37".encode()
    sock_vid.sendto(data, (target_ip, target_port_vid))
    # Close the socket
    sock_meta.close()
    sock_vid.close()

@RaphaelWimmer
Copy link
Author

Very nice, here some additions as I got some UnicodeDecodeErrors. Also LED control added and fullframe toggle :)

Thanks. I added these features into a development version I'm currently refactoring.
However, LED control does not work with my device. Not sure whether this is a problem with the command or whether my device does not allow changing the LED brightness.

@jamaggs
Copy link

jamaggs commented Sep 21, 2025

Hello

Thanks very much for your code which was really helpful for me!

I had a few issues:

  1. The device seemed to turn itself off. This was resolved by implementing checking of the battery level, which seemed to keep it awake.
  2. I kept getting corrupted images, which seemed to get worse after a while. This was resolved by implementing a simple bit of code to keep all of the responses and then only display them when all of a frame had been received.

I've attached my revised code below in case it's of use!

#!/usr/bin/env python3
# Python implementation of the endscopetool (sic!) Android application used for the Vitcoco ear wax remover camera thingy.
# CC-0 / Public Domain
# (0) 2023 Raphael Wimmer
# v0.1.0
# reverse-engineered using a packet capture log - this means that I have no idea what all those magic numbers mean
# and whether there are further features that might be supported by the hardware
# usage: first connect to the 'softish-XXXX' wifi, then run this script. Check code for keyboard shortcuts.

import socket
import sys
import cv2
import numpy as np
import time
import collections
from PIL import Image
from io import BytesIO
from urllib.parse import parse_qs

def get_battery_level(query_string):
    """
    Extracts the battery level from a string like 'type=2001&data=23'.
    Returns an integer or None if not found or invalid.
    """
    try:
        params = parse_qs(query_string)
        return int(params["data"][0]) / 100
    except (KeyError, IndexError, ValueError):
        return None

def draw_battery(img, x, y, width, height, level):
    """
    Draw a battery icon at (x, y) with given width, height and charge level (0 to 1).
    """
    # Clamp level to [0, 1]
    level = max(0, min(float(level), 1.0))

    # Colors
    border_color = (255, 255, 255)
    fill_color = (0, 255, 0) if level > 0.3 else (0, 0, 255)  # Red if low battery

    # Draw battery outline
    cv2.rectangle(img, (x, y), (x + width, y + height), border_color, 2)

    # Draw battery tip
    tip_width = int(width * 0.08)
    tip_x = x + width
    tip_y = y + int(height * 0.3)
    tip_height = int(height * 0.4)
    cv2.rectangle(img, (tip_x, tip_y), (tip_x + tip_width, tip_y + tip_height), border_color, -1)

    # Fill battery level
    fill_width = int((width - 4) * level)
    cv2.rectangle(img, (x + 2, y + 2), (x + 2 + fill_width, y + height - 2), fill_color, -1)

def absolute_frame_from_raw(raw_frame, latest_abs_frame):
    # Find the multiple of 256 that makes raw_frame closest to latest_abs_frame
    base = (latest_abs_frame // 256) * 256
    candidates = [base - 256 + raw_frame, base + raw_frame, base + 256 + raw_frame]
    # pick the candidate closest to latest_abs_frame
    abs_frame = min(candidates, key=lambda x: abs(x - latest_abs_frame))
    return abs_frame

buffer_size = 1500
target_ip = "192.168.1.1"
target_port_meta = 61502
source_port_meta = 50262

target_port_vid = 61503
source_port_vid = 51320

sock_meta = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_meta.bind(('0.0.0.0', source_port_meta))

sock_vid = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_vid.bind(('0.0.0.0', source_port_vid))
sock_vid.settimeout(5.0)
brightness = 100

try:
    # get system info
    data = "type=1002\x0a".encode()
    sock_meta.sendto(data, (target_ip, target_port_meta))
    reply, addr = sock_meta.recvfrom(buffer_size)
    received_data = reply.decode()

    print("Received data:", received_data)

    # Battery?
    data = "type=1001\x0a".encode()
    sock_meta.sendto(data, (target_ip, target_port_meta))
    reply, addr = sock_meta.recvfrom(buffer_size)
    received_data = reply.decode()

    print("Received data (Battery level?):", received_data)
    #print("Sender address:", addr)

    # three times according to captured traffic
    data = "\x20\x36\x00\x02".encode()
    sock_vid.sendto(data, (target_ip, target_port_vid))
    sock_vid.sendto(data, (target_ip, target_port_vid))
    sock_vid.sendto(data, (target_ip, target_port_vid))

    # set led brightness to 100%
    data = "type=1003&value=100\x0a".encode()
    # start with led off
    #data = "type=1003&value=0\x0a".encode()
    sock_meta.sendto(data, (target_ip, target_port_meta))
    reply, addr = sock_meta.recvfrom(buffer_size)
    # handle UnicodeDecodeError: 'utf-8' codec can't decode byte 0xaa in position 21: invalid start byte gracefully
    try:
        received_data = reply.decode()
        print("Received data:", received_data)
    except UnicodeDecodeError:
        print ("UnicodeDecodeError, can be ignored")
    #print("Sender address:", addr)

    cv2.namedWindow("Video Stream", cv2.WINDOW_NORMAL)
    cv2.resizeWindow("Video Stream", 1280, 720)

    rotation_lock = False
    fullframe = False

    battery_level = 0
    latest_frame = 0
    raw_frame = 0
    frame = 0
    part = 0
    pic_buf = bytearray()
    JPEG_HEADER = bytes.fromhex("FF D8 FF E0 00 10 4A 46 49 46")
    keep_awake_time = time.time()

    # Store received parts per frame
    frames_dict = {}  # frame_number -> {part_number: pic_data}
    parts_dict = {} # number of parts required per frame

    while True:
        # read video stream
        reply, addr = sock_vid.recvfrom(buffer_size)
        raw_frame = reply[0]
        frame_end = reply[1]
        part = reply[2]
        part_end = reply[3]
        misc_data =  reply[4:8]
        if not rotation_lock:
            rotation = int.from_bytes(reply[4:6], "big")
        pic_data = reply[8:]

        frame = absolute_frame_from_raw(raw_frame, frame)

        # store the part
        if frame not in frames_dict:
            frames_dict[frame] = {}
        frames_dict[frame][part] = pic_data

        # find number of frames required
        if frame_end == 1:
            parts_dict[frame] = part_end

        if frame in parts_dict:
            if parts_dict[frame] == len(frames_dict[frame]):
                pic_buf = b''.join(frames_dict[frame][i] for i in range(parts_dict[frame]))

                try:
                    image = Image.open(BytesIO(pic_buf))
                    image_np = np.array(image)
                    image_cv = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR)
                    if not fullframe:
                        num_rows, num_cols = image_cv.shape[:2]
                        mask = np.zeros((num_rows, num_cols), np.uint8)
                        cv2.circle(mask, (num_cols//2,num_rows//2), num_rows//2, 255, -1)
                        image_masked = cv2.bitwise_and(image_cv, image_cv, mask = mask)
                        rotation_matrix = cv2.getRotationMatrix2D((num_cols/2, num_rows/2), rotation + 90, 1)
                        image_rotated = cv2.warpAffine(image_masked, rotation_matrix, (num_cols, num_rows))
                        draw_battery(image_rotated, x=5, y=5, width=40, height=20, level=battery_level)
                        cv2.imshow('Video Stream', image_rotated)
                    else:
                        draw_battery(image_cv, x=5, y=5, width=15, height=8, level=battery_level)
                        cv2.imshow('Video Stream', image_cv)

                    #delete earlier frame data
                    frames_dict = {f: frames_dict[f] for f in frames_dict if f >= frame}
                    parts_dict = {f: parts_dict[f] for f in parts_dict if f >= frame}

                    if time.time() > keep_awake_time:
                        keep_awake_time = time.time() + 10
                        # Battery?
                        data = "type=1001\x0a".encode()
                        sock_meta.sendto(data, (target_ip, target_port_meta))
                        reply, addr = sock_meta.recvfrom(buffer_size)
                        battery_level = get_battery_level(reply.decode())

                except OSError:
                    print("image corrupted")
            key = cv2.waitKey(1) & 0xFF
            if key == ord('1'):
                rotation_lock = True
                rotation = 0
            elif key == ord('2'):
                rotation_lock = True
                rotation = 90
            elif key == ord('3'):
                rotation_lock = True
                rotation = 180
            elif key == ord('4'):
                rotation_lock = True
                rotation = 270
            elif key == ord('r'):
                rotation_lock = False
            elif key == ord('q'):
                break
            elif key == ord('w'):
                fd = open("out.jpg", "wb")
                ret = fd.write(pic_buf)
                fd.close()
                print("Wrote " + str(ret) + " bytes to out.jpg")
            elif key == ord('+'):
                if brightness < 100:
                    brightness += 10
                    data = ("type=1003&value=" + str(brightness) + "\x0a").encode()
                    print("Send data: ", data)
                    sock_meta.sendto(data, (target_ip, target_port_meta))
                    reply, addr = sock_meta.recvfrom(buffer_size)
                    received_data = reply.decode()
                    print("Received data:", received_data)
            elif key == ord('-'):
                if brightness > 0:
                    brightness -= 10
                    data = ("type=1003&value=" + str(brightness) + "\x0a").encode()
                    print("Send data: ", data)
                    sock_meta.sendto(data, (target_ip, target_port_meta))
                    reply, addr = sock_meta.recvfrom(buffer_size)
                    received_data = reply.decode()
                    print("Received data:", received_data)
            elif key == ord('f'):
                fullframe = not fullframe


finally:
    # stop stream
    data = "\x20\x37".encode()
    sock_vid.sendto(data, (target_ip, target_port_vid))
    # Close the socket
    sock_meta.close()
    sock_vid.close()

@dwoffinden
Copy link

Thanks @jamaggs ! I've been playing with this as well and have encountered both of those issues, I'll have to try your version 😀

I've been modifying the @Aghei2 version with full-frame rotation: https://gist.github.com/dwoffinden/20be1f532c3d34f6311a2ca5e99cad54

FYI the LED control also doesn't work for my device.

I had been meaning to ask @RaphaelWimmer if the development version was published anywhere?

@dwoffinden
Copy link

Your version worked great for me, thanks again @jamaggs! I've merged it into my gist.

I also created a repo because gists don't display history as well, but the content is the same: https://github.com/dwoffinden/endscopetool

@jamaggs
Copy link

jamaggs commented Sep 23, 2025

Hello @dwoffinden, glad to have helped!

I tried your version and the full frame works really well. However, I can't resize the cv2 window at all any more which is a shame.

On a purely aesthetic note, I also tinkered and found that cv2.namedWindow("Video Stream", flags=cv2.WINDOW_GUI_NORMAL) would get rid of the icons/toolbar at the top of the window.

Many thanks

James

@dwoffinden
Copy link

Thanks @jamaggs ! I use a tiling window manager so hadn't noticed. I tried Gnome and saw similar.

My branch has that flag change, avoids resizing too much (cv2's highgui is a bit picky it seems), has a crash fix and should shut down cleanly if you close the window

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment