Skip to content

Instantly share code, notes, and snippets.

@ghing
Created April 25, 2013 19:35
Show Gist options
  • Save ghing/5462485 to your computer and use it in GitHub Desktop.
Save ghing/5462485 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
## The equivalent of:
## "Working with the Skeleton"
## in the OpenNI user guide.
"""
This shows how to identify when a new user is detected, look for a pose for
that user, calibrate the users when they are in the pose, and track them.
Specifically, it prints out the location of the users' head,
as they are tracked.
"""
import os
import json
import gevent
from gevent.pywsgi import WSGIServer
import geventwebsocket
from openni import (Context, UserGenerator, SkeletonJoint,
CALIBRATION_STATUS_OK, SKEL_PROFILE_ALL)
# Pose to use to calibrate the user
pose_to_use = 'Psi'
ctx = Context()
ctx.init()
# Create the user generator
user = UserGenerator()
user.create(ctx)
# Obtain the skeleton & pose detection capabilities
skel_cap = user.skeleton_cap
pose_cap = user.pose_detection_cap
# Declare the callbacks
def new_user(src, id):
print "1/4 User {} detected. Looking for pose..." .format(id)
pose_cap.start_detection(pose_to_use, id)
def pose_detected(src, pose, id):
print "2/4 Detected pose {} on user {}. Requesting calibration..." .format(pose,id)
pose_cap.stop_detection(id)
skel_cap.request_calibration(id, True)
def calibration_start(src, id):
print "3/4 Calibration started for user {}." .format(id)
def calibration_complete(src, id, status):
if status == CALIBRATION_STATUS_OK:
print "4/4 User {} calibrated successfully! Starting to track." .format(id)
skel_cap.start_tracking(id)
else:
print "ERR User {} failed to calibrate. Restarting process." .format(id)
new_user(user, id)
def lost_user(src, id):
print "--- User {} lost." .format(id)
# Register them
user.register_user_cb(new_user, lost_user)
pose_cap.register_pose_detected_cb(pose_detected)
skel_cap.register_c_start_cb(calibration_start)
skel_cap.register_c_complete_cb(calibration_complete)
# Set the profile
skel_cap.set_profile(SKEL_PROFILE_ALL)
# Start generating
ctx.start_generating_all()
print "0/4 Starting to detect users. Press Ctrl-C to exit."
def echo(environ, start_response):
websocket = environ.get("wsgi.websocket")
if websocket is None:
return http_handler(environ, start_response)
try:
while True:
# Update to next frame
ctx.wait_and_update_all()
# Extract head position of each tracked user
for id in user.users:
if skel_cap.is_tracking(id):
for joint_name, joint in SkeletonJoint.names.iteritems():
if skel_cap.is_joint_active(joint):
pos = skel_cap.get_joint_position(id, joint)
joint_event_msg = {
'user': id,
'joint': joint_name,
'x': pos.point[0],
'y': pos.point[1],
'z': pos.point[0]
}
websocket.send(json.dumps(joint_event_msg))
websocket.close()
except geventwebsocket.WebSocketError, ex:
print "{0}: {1}".format(ex.__class__.__name__, ex)
def http_handler(environ, start_response):
if environ["PATH_INFO"].strip("/") == "version":
start_response("200 OK", [])
return [agent]
else:
start_response("400 Bad Request", [])
return ["WebSocket connection is expected here."]
if __name__ == '__main__':
path = os.path.dirname(geventwebsocket.__file__)
agent = "gevent-websocket/%s" % (geventwebsocket.__version__)
print "Running %s from %s" % (agent, path)
WSGIServer(("", 8000), echo, handler_class=geventwebsocket.WebSocketHandler).serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment