Skip to content

Instantly share code, notes, and snippets.

Created June 21, 2023 23:46
Show Gist options
  • Save RaphaelWimmer/5bcb286414e6cd38ed38724f9a6a6129 to your computer and use it in GitHub Desktop.
Save RaphaelWimmer/5bcb286414e6cd38ed38724f9a6a6129 to your computer and use it in GitHub Desktop.
Python implementation of the endscopetool (sic!) Android application used for the Vitcoco ear wax remover camera thingy.
#!/usr/bin/env python3
# Python implementation of the endscopetool (sic!) Android application used for the Vitcoco ear wax remover camera thingy.
# CC-0 / Public Domain
# (0) 2023 Raphael Wimmer
# v0.01
# reverse-engineered using a packet capture log - this means that I have no idea what all those magic numbers mean
# and whether there are further features that might be supported by the hardware
# usage: first connect to the 'softish-XXXX' wifi, then run this script. Check code for keyboard shortcuts.
import socket
import sys
import cv2
import numpy as np
from PIL import Image
from io import BytesIO
buffer_size = 1500
target_ip = ""
target_port_meta = 61502
source_port_meta = 50262
target_port_vid = 61503
source_port_vid = 51320
sock_meta = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_meta.bind(('', source_port_meta))
sock_vid = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_vid.bind(('', source_port_vid))
# get system info
data = "type=1002\x0a".encode()
sock_meta.sendto(data, (target_ip, target_port_meta))
reply, addr = sock_meta.recvfrom(buffer_size)
received_data = reply.decode()
print("Received data:", received_data)
#print("Sender address:", addr)
# type=2002&protocol=2&w=640&h=480&fps=20&ratio=4:3&angle=270&hardware=V1.1&company=vitcoco&id=a07b4c3092607cf29daaab607cf20000&firmware=1820220727&ssid=softish-31986&dn=Y8&bl=30
# Battery?
data = "type=1001\x0a".encode()
sock_meta.sendto(data, (target_ip, target_port_meta))
reply, addr = sock_meta.recvfrom(buffer_size)
received_data = reply.decode()
print("Received data (Battery level?):", received_data)
#print("Sender address:", addr)
# three times according to captured traffic
data = "\x20\x36\x00\x02".encode()
sock_vid.sendto(data, (target_ip, target_port_vid))
sock_vid.sendto(data, (target_ip, target_port_vid))
sock_vid.sendto(data, (target_ip, target_port_vid))
# no idea what this command does
data = "type=1003&value=100\x0a".encode()
sock_meta.sendto(data, (target_ip, target_port_meta))
reply, addr = sock_meta.recvfrom(buffer_size)
received_data = reply.decode()
print("Received data:", received_data)
#print("Sender address:", addr)
cv2.namedWindow("Video Stream", cv2.WINDOW_NORMAL)
#cv2.setWindowProperty("Video Stream", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
rotation_lock = False
frame = 0
part = 0
pic_buf = bytearray()
JPEG_HEADER = bytes.fromhex("FF D8 FF E0 00 10 4A 46 49 46")
while True:
# read video stream
reply, addr = sock_vid.recvfrom(buffer_size)
frame = reply[0]
frame_end = reply[1]
part = reply[2]
part_end = reply[3]
misc_data = reply[4:8]
if not rotation_lock:
rotation = int.from_bytes(reply[4:6], "big")
pic_data = reply[8:]
if pic_data.find(JPEG_HEADER) > -1:
if len(pic_buf) > 0:
image =
image_np = np.array(image)
image_cv = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR)
num_rows, num_cols = image_cv.shape[:2]
mask = np.zeros((num_rows, num_cols), np.uint8), (num_cols//2,num_rows//2), num_rows//2, 255, -1)
image_masked = cv2.bitwise_and(image_cv, image_cv, mask = mask)
rotation_matrix = cv2.getRotationMatrix2D((num_cols/2, num_rows/2), rotation + 90, 1)
image_rotated = cv2.warpAffine(image_masked, rotation_matrix, (num_cols, num_rows))
cv2.imshow('Video Stream', image_rotated)
except OSError:
print("image corrupted")
key = cv2.waitKey(1) & 0xFF
if key == ord('1'):
rotation_lock = True
rotation = 0
elif key == ord('2'):
rotation_lock = True
rotation = 90
elif key == ord('3'):
rotation_lock = True
rotation = 180
elif key == ord('4'):
rotation_lock = True
rotation = 270
elif key == ord('r'):
rotation_lock = False
elif key == ord('q'):
elif key == ord('w'):
fd = open("out.jpg", "wb")
ret = fd.write(pic_buf)
print("Wrote " + str(ret) + " bytes to out.jpg")
#print("new frame")
pic_buf = bytearray()
pic_buf += pic_data
#print(frame, part, len(pic_buf))
#print(misc_data[0], misc_data[1], misc_data[2], misc_data[3])
# stop stream
data = "\x20\x37".encode()
sock_vid.sendto(data, (target_ip, target_port_vid))
# Close the socket
Copy link

Aghei2 commented Aug 13, 2023

Very nice, here some additions as I got some UnicodeDecodeErrors. Also LED control added and fullframe toggle :)

#!/usr/bin/env python3
# Python implementation of the endscopetool (sic!) Android application used for the Vitcoco ear wax remover camera thingy.
# CC-0 / Public Domain
# (0) 2023 Raphael Wimmer
# v0.1.0
# reverse-engineered using a packet capture log - this means that I have no idea what all those magic numbers mean
# and whether there are further features that might be supported by the hardware
# usage: first connect to the 'softish-XXXX' wifi, then run this script. Check code for keyboard shortcuts.

import socket
import sys
import cv2
import numpy as np
from PIL import Image
from io import BytesIO

buffer_size = 1500
target_ip = ""
target_port_meta = 61502
source_port_meta = 50262

target_port_vid = 61503
source_port_vid = 51320

sock_meta = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_meta.bind(('', source_port_meta))

sock_vid = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_vid.bind(('', source_port_vid))
brightness = 100

    # get system info
    data = "type=1002\x0a".encode()
    sock_meta.sendto(data, (target_ip, target_port_meta))
    reply, addr = sock_meta.recvfrom(buffer_size)
    received_data = reply.decode()

    print("Received data:", received_data)
    #print("Sender address:", addr)
    # type=2002&protocol=2&w=640&h=480&fps=20&ratio=4:3&angle=270&hardware=V1.1&company=vitcoco&id=a07b4c3092607cf29daaab607cf20000&firmware=1820220727&ssid=softish-31986&dn=Y8&bl=30

    # Battery?
    data = "type=1001\x0a".encode()
    sock_meta.sendto(data, (target_ip, target_port_meta))
    reply, addr = sock_meta.recvfrom(buffer_size)
    received_data = reply.decode()

    print("Received data (Battery level?):", received_data)
    #print("Sender address:", addr)

    # three times according to captured traffic
    data = "\x20\x36\x00\x02".encode()
    sock_vid.sendto(data, (target_ip, target_port_vid))
    sock_vid.sendto(data, (target_ip, target_port_vid))
    sock_vid.sendto(data, (target_ip, target_port_vid))

    # set led brightness to 100%
    data = "type=1003&value=100\x0a".encode()
    # start with led off
    #data = "type=1003&value=0\x0a".encode()
    sock_meta.sendto(data, (target_ip, target_port_meta))
    reply, addr = sock_meta.recvfrom(buffer_size)
    # handle UnicodeDecodeError: 'utf-8' codec can't decode byte 0xaa in position 21: invalid start byte gracefully
        received_data = reply.decode()
        print("Received data:", received_data)
    except UnicodeDecodeError:
        print ("UnicodeDecodeError, can be ignored")
    #print("Sender address:", addr)

    cv2.namedWindow("Video Stream", cv2.WINDOW_NORMAL)
    #cv2.setWindowProperty("Video Stream", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
    rotation_lock = False
    fullframe = False

    frame = 0
    part = 0
    pic_buf = bytearray()
    JPEG_HEADER = bytes.fromhex("FF D8 FF E0 00 10 4A 46 49 46")
    while True:
        # read video stream
        reply, addr = sock_vid.recvfrom(buffer_size)
        frame = reply[0]
        frame_end = reply[1]
        part = reply[2]
        part_end = reply[3]
        misc_data =  reply[4:8]
        if not rotation_lock:
            rotation = int.from_bytes(reply[4:6], "big")
        pic_data = reply[8:]
        if pic_data.find(JPEG_HEADER) > -1:
            if len(pic_buf) > 0:
                    image =
                    image_np = np.array(image)
                    image_cv = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR)
                    if not fullframe:
                        num_rows, num_cols = image_cv.shape[:2]
                        mask = np.zeros((num_rows, num_cols), np.uint8)
              , (num_cols//2,num_rows//2), num_rows//2, 255, -1)
                        image_masked = cv2.bitwise_and(image_cv, image_cv, mask = mask)
                        rotation_matrix = cv2.getRotationMatrix2D((num_cols/2, num_rows/2), rotation + 90, 1)
                        image_rotated = cv2.warpAffine(image_masked, rotation_matrix, (num_cols, num_rows))
                        cv2.imshow('Video Stream', image_rotated)
                        cv2.imshow('Video Stream', image_cv)
                except OSError:
                    print("image corrupted")
            key = cv2.waitKey(1) & 0xFF
            if key == ord('1'):
                rotation_lock = True
                rotation = 0
            elif key == ord('2'):
                rotation_lock = True
                rotation = 90
            elif key == ord('3'):
                rotation_lock = True
                rotation = 180
            elif key == ord('4'):
                rotation_lock = True
                rotation = 270
            elif key == ord('r'):
                rotation_lock = False
            elif key == ord('q'):
            elif key == ord('w'):
                fd = open("out.jpg", "wb")
                ret = fd.write(pic_buf)
                print("Wrote " + str(ret) + " bytes to out.jpg")
            elif key == ord('+'):
                if brightness < 100:
                    brightness += 10
                    data = ("type=1003&value=" + str(brightness) + "\x0a").encode()
                    print("Send data: ", data)
                    sock_meta.sendto(data, (target_ip, target_port_meta))
                    reply, addr = sock_meta.recvfrom(buffer_size)
                    received_data = reply.decode()
                    print("Received data:", received_data)
            elif key == ord('-'):
                if brightness > 0:
                    brightness -= 10
                    data = ("type=1003&value=" + str(brightness) + "\x0a").encode()
                    print("Send data: ", data)
                    sock_meta.sendto(data, (target_ip, target_port_meta))
                    reply, addr = sock_meta.recvfrom(buffer_size)
                    received_data = reply.decode()
                    print("Received data:", received_data)
            elif key == ord('f'):
                fullframe = not fullframe
            #print("new frame")
            pic_buf = bytearray()
        pic_buf += pic_data
        #print(frame, part, len(pic_buf))
        #print(misc_data[0], misc_data[1], misc_data[2], misc_data[3])

    # stop stream
    data = "\x20\x37".encode()
    sock_vid.sendto(data, (target_ip, target_port_vid))
    # Close the socket

Copy link

Very nice, here some additions as I got some UnicodeDecodeErrors. Also LED control added and fullframe toggle :)

Thanks. I added these features into a development version I'm currently refactoring.
However, LED control does not work with my device. Not sure whether this is a problem with the command or whether my device does not allow changing the LED brightness.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment