Created
April 25, 2013 19:35
-
-
Save ghing/5462485 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
## The equivalent of: | |
## "Working with the Skeleton" | |
## in the OpenNI user guide. | |
""" | |
This shows how to identify when a new user is detected, look for a pose for | |
that user, calibrate the users when they are in the pose, and track them. | |
Specifically, it prints out the location of the users' head, | |
as they are tracked. | |
""" | |
import os | |
import json | |
import gevent | |
from gevent.pywsgi import WSGIServer | |
import geventwebsocket | |
from openni import (Context, UserGenerator, SkeletonJoint, | |
CALIBRATION_STATUS_OK, SKEL_PROFILE_ALL) | |
# Pose to use to calibrate the user | |
pose_to_use = 'Psi' | |
ctx = Context() | |
ctx.init() | |
# Create the user generator | |
user = UserGenerator() | |
user.create(ctx) | |
# Obtain the skeleton & pose detection capabilities | |
skel_cap = user.skeleton_cap | |
pose_cap = user.pose_detection_cap | |
# Declare the callbacks | |
def new_user(src, id): | |
print "1/4 User {} detected. Looking for pose..." .format(id) | |
pose_cap.start_detection(pose_to_use, id) | |
def pose_detected(src, pose, id): | |
print "2/4 Detected pose {} on user {}. Requesting calibration..." .format(pose,id) | |
pose_cap.stop_detection(id) | |
skel_cap.request_calibration(id, True) | |
def calibration_start(src, id): | |
print "3/4 Calibration started for user {}." .format(id) | |
def calibration_complete(src, id, status): | |
if status == CALIBRATION_STATUS_OK: | |
print "4/4 User {} calibrated successfully! Starting to track." .format(id) | |
skel_cap.start_tracking(id) | |
else: | |
print "ERR User {} failed to calibrate. Restarting process." .format(id) | |
new_user(user, id) | |
def lost_user(src, id): | |
print "--- User {} lost." .format(id) | |
# Register them | |
user.register_user_cb(new_user, lost_user) | |
pose_cap.register_pose_detected_cb(pose_detected) | |
skel_cap.register_c_start_cb(calibration_start) | |
skel_cap.register_c_complete_cb(calibration_complete) | |
# Set the profile | |
skel_cap.set_profile(SKEL_PROFILE_ALL) | |
# Start generating | |
ctx.start_generating_all() | |
print "0/4 Starting to detect users. Press Ctrl-C to exit." | |
def echo(environ, start_response): | |
websocket = environ.get("wsgi.websocket") | |
if websocket is None: | |
return http_handler(environ, start_response) | |
try: | |
while True: | |
# Update to next frame | |
ctx.wait_and_update_all() | |
# Extract head position of each tracked user | |
for id in user.users: | |
if skel_cap.is_tracking(id): | |
for joint_name, joint in SkeletonJoint.names.iteritems(): | |
if skel_cap.is_joint_active(joint): | |
pos = skel_cap.get_joint_position(id, joint) | |
joint_event_msg = { | |
'user': id, | |
'joint': joint_name, | |
'x': pos.point[0], | |
'y': pos.point[1], | |
'z': pos.point[0] | |
} | |
websocket.send(json.dumps(joint_event_msg)) | |
websocket.close() | |
except geventwebsocket.WebSocketError, ex: | |
print "{0}: {1}".format(ex.__class__.__name__, ex) | |
def http_handler(environ, start_response): | |
if environ["PATH_INFO"].strip("/") == "version": | |
start_response("200 OK", []) | |
return [agent] | |
else: | |
start_response("400 Bad Request", []) | |
return ["WebSocket connection is expected here."] | |
if __name__ == '__main__': | |
path = os.path.dirname(geventwebsocket.__file__) | |
agent = "gevent-websocket/%s" % (geventwebsocket.__version__) | |
print "Running %s from %s" % (agent, path) | |
WSGIServer(("", 8000), echo, handler_class=geventwebsocket.WebSocketHandler).serve_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment