Created
October 28, 2025 16:48
-
-
Save companje/583b48cd8c564ab10e7985d6af9e5ae5 to your computer and use it in GitHub Desktop.
touch tracking YOLO
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import cv2, time, numpy as np, threading | |
| from flask import Flask, Response | |
| from picamera2 import Picamera2 | |
| from ultralytics import YOLO | |
| from pythonosc.udp_client import SimpleUDPClient | |
| TARGET_IP="192.168.0.121" | |
| TARGET_PORT=9000 | |
| OSC_ADDR="/fingertips" | |
| CLASS_NAME="fingertip" | |
| CONF_THR=0.35 | |
| tracks={} | |
| next_id=0 | |
| max_dist=60 | |
| max_missed=10 | |
| trail_len=60 | |
| def assign(tracks, cents): | |
| T=list(tracks.keys()); C=np.array(cents,float) | |
| if len(T)==0 or len(C)==0: return {}, set(T), set(range(len(C))) | |
| M=np.array([tracks[t]["centroid"] for t in T], float) | |
| D=np.linalg.norm(M[:,None,:]-C[None,:,:], axis=2) | |
| pairs,used_c,used_t={},set(),set() | |
| for _ in range(min(len(T),len(C))): | |
| i,j=divmod(D.argmin(),D.shape[1]) | |
| if i in used_t or j in used_c or D[i,j]>max_dist: D[i,j]=1e9; continue | |
| pairs[T[i]]=j; used_t.add(i); used_c.add(j); D[i,:]=1e9; D[:,j]=1e9 | |
| return pairs,{T[i] for i in range(len(T)) if i not in used_t},{j for j in range(len(C)) if j not in used_c} | |
| picam2=Picamera2() | |
| cfg=picam2.create_preview_configuration(main={"format":"RGB888","size":(320,320)}) | |
| picam2.configure(cfg) | |
| picam2.set_controls({"ExposureTime":30000,"AnalogueGain":1.0}) | |
| picam2.start() | |
| time.sleep(0.3) | |
| model=YOLO("best_float32_320.tflite", task="detect") | |
| client=SimpleUDPClient(TARGET_IP,TARGET_PORT) | |
| latest_jpeg=None | |
| lock=threading.Lock() | |
| ALPHA=0.001 | |
| TH=1 | |
| K=cv2.getStructuringElement(cv2.MORPH_ELLIPSE,(3,3)) | |
| bg=None | |
| app=Flask(__name__) | |
| def worker(): | |
| global next_id, latest_jpeg, bg | |
| while True: | |
| frame=picam2.capture_array() | |
| gray=cv2.cvtColor(frame,cv2.COLOR_RGB2GRAY) | |
| if bg is None: | |
| bg=gray.astype(np.float32) | |
| fg=cv2.absdiff(gray, cv2.convertScaleAbs(bg)) | |
| cv2.accumulateWeighted(gray, bg, ALPHA) | |
| fg=cv2.GaussianBlur(fg,(5,5),0) | |
| _,th=cv2.threshold(fg, TH, 255, cv2.THRESH_TOZERO) | |
| # th=cv2.morphologyEx(th, cv2.MORPH_OPEN, K, iterations=1) | |
| stretched=cv2.normalize(th,None,0,255,cv2.NORM_MINMAX) | |
| # stretched,_=remove_dominant_gray(stretched, tol=5) | |
| inp=cv2.merge([stretched]*3) | |
| r=model.predict(source=inp,imgsz=320,conf=CONF_THR,verbose=False)[0] | |
| names=r.names if hasattr(r,"names") else getattr(model,"names",{}) | |
| boxes=r.boxes | |
| xyxy=np.zeros((0,4)); conf=np.zeros((0,)) | |
| if boxes is not None and len(boxes)>0: | |
| xyxy=boxes.xyxy.cpu().numpy() | |
| cls=boxes.cls.cpu().numpy().astype(int) if boxes.cls is not None else np.zeros(len(xyxy),dtype=int) | |
| conf=boxes.conf.cpu().numpy() if boxes.conf is not None else np.ones(len(xyxy)) | |
| if names and CLASS_NAME in names.values(): | |
| keep=[i for i,c in enumerate(cls) if names.get(c,"")==CLASS_NAME] | |
| xyxy=xyxy[keep]; conf=conf[keep] | |
| for tid in list(tracks.keys()): | |
| tracks[tid]["missed"]+=1 | |
| H,W=inp.shape[:2] | |
| cents=[((b[0]+b[2])/2.0, (b[1]+b[3])/2.0) for b in xyxy] | |
| pairs,u_tracks,u_dets=assign(tracks,cents) | |
| for tid,j in pairs.items(): | |
| tracks[tid]["centroid"]=cents[j] | |
| tracks[tid]["box"]=xyxy[j] | |
| tracks[tid]["conf"]=float(conf[j]) | |
| tracks[tid]["missed"]=0 | |
| tracks[tid]["trail"].append(tuple(map(int,cents[j]))) | |
| if len(tracks[tid]["trail"])>trail_len: tracks[tid]["trail"].pop(0) | |
| for j in u_dets: | |
| color=tuple(int(c) for c in np.random.randint(64,256,3)) | |
| tracks[next_id]={"centroid":cents[j],"box":xyxy[j],"conf":float(conf[j]),"missed":0,"trail":[tuple(map(int,cents[j]))],"color":color} | |
| next_id+=1 | |
| for tid in list(tracks.keys()): | |
| if tracks[tid]["missed"]>max_missed: | |
| del tracks[tid] | |
| vis=cv2.cvtColor(stretched,cv2.COLOR_GRAY2BGR) | |
| if len(tracks)==0: | |
| client.send_message(OSC_ADDR,[-1.0,-1.0]) | |
| for tid,t in tracks.items(): | |
| x1,y1,x2,y2=t["box"].astype(int) | |
| cx,cy=(x1+x2)//2,(y1+y2)//2 | |
| r=max(2,int(0.05*max(1,min(x2-x1,y2-y1)))) | |
| cv2.circle(vis,(cx,cy),r,t["color"],1) | |
| client.send_message(OSC_ADDR,[float(cx)/W,float(cy)/H]) | |
| if len(t["trail"])>1: | |
| pts=np.array(t["trail"],dtype=np.int32).reshape(-1,1,2) | |
| cv2.polylines(vis,[pts],False,t["color"],1) | |
| cv2.putText(vis,f"{tid}",(x1,max(12,y1-4)),cv2.FONT_HERSHEY_SIMPLEX,0.4,t["color"],1,cv2.LINE_AA) | |
| ok,buf=cv2.imencode(".jpg",vis,[int(cv2.IMWRITE_JPEG_QUALITY),95]) | |
| if ok: | |
| with lock: | |
| latest_jpeg=buf.tobytes() | |
| def mjpeg(): | |
| import time as _t | |
| while True: | |
| with lock: | |
| b=latest_jpeg | |
| if b is not None: | |
| yield b"--frame\r\nContent-Type: image/jpeg\r\n\r\n"+b+b"\r\n" | |
| _t.sleep(0.01) | |
| @app.route("/") | |
| def video(): | |
| return Response(mjpeg(),mimetype="multipart/x-mixed-replace; boundary=frame") | |
| if __name__=="__main__": | |
| threading.Thread(target=worker,daemon=True).start() | |
| app.run(host="0.0.0.0",port=8000,threaded=True) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment