Skip to content

Instantly share code, notes, and snippets.

@companje
Created October 28, 2025 16:48
Show Gist options
  • Save companje/583b48cd8c564ab10e7985d6af9e5ae5 to your computer and use it in GitHub Desktop.
Save companje/583b48cd8c564ab10e7985d6af9e5ae5 to your computer and use it in GitHub Desktop.
touch tracking YOLO
import cv2, time, numpy as np, threading
from flask import Flask, Response
from picamera2 import Picamera2
from ultralytics import YOLO
from pythonosc.udp_client import SimpleUDPClient
TARGET_IP="192.168.0.121"
TARGET_PORT=9000
OSC_ADDR="/fingertips"
CLASS_NAME="fingertip"
CONF_THR=0.35
tracks={}
next_id=0
max_dist=60
max_missed=10
trail_len=60
def assign(tracks, cents):
T=list(tracks.keys()); C=np.array(cents,float)
if len(T)==0 or len(C)==0: return {}, set(T), set(range(len(C)))
M=np.array([tracks[t]["centroid"] for t in T], float)
D=np.linalg.norm(M[:,None,:]-C[None,:,:], axis=2)
pairs,used_c,used_t={},set(),set()
for _ in range(min(len(T),len(C))):
i,j=divmod(D.argmin(),D.shape[1])
if i in used_t or j in used_c or D[i,j]>max_dist: D[i,j]=1e9; continue
pairs[T[i]]=j; used_t.add(i); used_c.add(j); D[i,:]=1e9; D[:,j]=1e9
return pairs,{T[i] for i in range(len(T)) if i not in used_t},{j for j in range(len(C)) if j not in used_c}
picam2=Picamera2()
cfg=picam2.create_preview_configuration(main={"format":"RGB888","size":(320,320)})
picam2.configure(cfg)
picam2.set_controls({"ExposureTime":30000,"AnalogueGain":1.0})
picam2.start()
time.sleep(0.3)
model=YOLO("best_float32_320.tflite", task="detect")
client=SimpleUDPClient(TARGET_IP,TARGET_PORT)
latest_jpeg=None
lock=threading.Lock()
ALPHA=0.001
TH=1
K=cv2.getStructuringElement(cv2.MORPH_ELLIPSE,(3,3))
bg=None
app=Flask(__name__)
def worker():
global next_id, latest_jpeg, bg
while True:
frame=picam2.capture_array()
gray=cv2.cvtColor(frame,cv2.COLOR_RGB2GRAY)
if bg is None:
bg=gray.astype(np.float32)
fg=cv2.absdiff(gray, cv2.convertScaleAbs(bg))
cv2.accumulateWeighted(gray, bg, ALPHA)
fg=cv2.GaussianBlur(fg,(5,5),0)
_,th=cv2.threshold(fg, TH, 255, cv2.THRESH_TOZERO)
# th=cv2.morphologyEx(th, cv2.MORPH_OPEN, K, iterations=1)
stretched=cv2.normalize(th,None,0,255,cv2.NORM_MINMAX)
# stretched,_=remove_dominant_gray(stretched, tol=5)
inp=cv2.merge([stretched]*3)
r=model.predict(source=inp,imgsz=320,conf=CONF_THR,verbose=False)[0]
names=r.names if hasattr(r,"names") else getattr(model,"names",{})
boxes=r.boxes
xyxy=np.zeros((0,4)); conf=np.zeros((0,))
if boxes is not None and len(boxes)>0:
xyxy=boxes.xyxy.cpu().numpy()
cls=boxes.cls.cpu().numpy().astype(int) if boxes.cls is not None else np.zeros(len(xyxy),dtype=int)
conf=boxes.conf.cpu().numpy() if boxes.conf is not None else np.ones(len(xyxy))
if names and CLASS_NAME in names.values():
keep=[i for i,c in enumerate(cls) if names.get(c,"")==CLASS_NAME]
xyxy=xyxy[keep]; conf=conf[keep]
for tid in list(tracks.keys()):
tracks[tid]["missed"]+=1
H,W=inp.shape[:2]
cents=[((b[0]+b[2])/2.0, (b[1]+b[3])/2.0) for b in xyxy]
pairs,u_tracks,u_dets=assign(tracks,cents)
for tid,j in pairs.items():
tracks[tid]["centroid"]=cents[j]
tracks[tid]["box"]=xyxy[j]
tracks[tid]["conf"]=float(conf[j])
tracks[tid]["missed"]=0
tracks[tid]["trail"].append(tuple(map(int,cents[j])))
if len(tracks[tid]["trail"])>trail_len: tracks[tid]["trail"].pop(0)
for j in u_dets:
color=tuple(int(c) for c in np.random.randint(64,256,3))
tracks[next_id]={"centroid":cents[j],"box":xyxy[j],"conf":float(conf[j]),"missed":0,"trail":[tuple(map(int,cents[j]))],"color":color}
next_id+=1
for tid in list(tracks.keys()):
if tracks[tid]["missed"]>max_missed:
del tracks[tid]
vis=cv2.cvtColor(stretched,cv2.COLOR_GRAY2BGR)
if len(tracks)==0:
client.send_message(OSC_ADDR,[-1.0,-1.0])
for tid,t in tracks.items():
x1,y1,x2,y2=t["box"].astype(int)
cx,cy=(x1+x2)//2,(y1+y2)//2
r=max(2,int(0.05*max(1,min(x2-x1,y2-y1))))
cv2.circle(vis,(cx,cy),r,t["color"],1)
client.send_message(OSC_ADDR,[float(cx)/W,float(cy)/H])
if len(t["trail"])>1:
pts=np.array(t["trail"],dtype=np.int32).reshape(-1,1,2)
cv2.polylines(vis,[pts],False,t["color"],1)
cv2.putText(vis,f"{tid}",(x1,max(12,y1-4)),cv2.FONT_HERSHEY_SIMPLEX,0.4,t["color"],1,cv2.LINE_AA)
ok,buf=cv2.imencode(".jpg",vis,[int(cv2.IMWRITE_JPEG_QUALITY),95])
if ok:
with lock:
latest_jpeg=buf.tobytes()
def mjpeg():
import time as _t
while True:
with lock:
b=latest_jpeg
if b is not None:
yield b"--frame\r\nContent-Type: image/jpeg\r\n\r\n"+b+b"\r\n"
_t.sleep(0.01)
@app.route("/")
def video():
return Response(mjpeg(),mimetype="multipart/x-mixed-replace; boundary=frame")
if __name__=="__main__":
threading.Thread(target=worker,daemon=True).start()
app.run(host="0.0.0.0",port=8000,threaded=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment