Created
May 30, 2013 20:18
-
-
Save leedm777/5680822 to your computer and use it in GitHub Desktop.
Hello, RESTful Asterisk API
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# Hello, REST API! | |
# | |
# Requires requests and websocket-client (which may be installed via pip) | |
# | |
# You need something in the dialplan to send a channel to the app. | |
# | |
# exten => 7000,1,Stasis(hello) | |
# same => n,Hangup() | |
# | |
import websocket | |
import sys | |
import json | |
import urllib | |
import requests | |
host = "localhost:8088" | |
#host = "dleenux.digium.internal:8088" | |
def build_url(*path, **params): | |
path = ["http://%s" % host, 'stasis'] + list(path) | |
path = '/'.join(path) | |
params = urllib.urlencode(params) | |
if not params: | |
return path | |
return "%s?%s" % (path, params) | |
def ws_url(app): | |
path = ["ws://%s" % host, 'ws'] | |
path = '/'.join(path) | |
params=urllib.urlencode({'app': app}) | |
return "%s?%s" % (path, params) | |
class Channel(object): | |
def __init__(self, id): | |
self.id = id | |
def answer(self): | |
r = requests.post(build_url('channels', self.id, 'answer')) | |
print r | |
def continue_to_dialplan(self): | |
r = requests.post(build_url('channels', self.id, 'continue')) | |
print r | |
def hangup(self): | |
r = requests.delete(build_url('channels', self.id)) | |
print r | |
def play(self, media): | |
r = requests.post(build_url('channels', self.id, 'play', media=media)) | |
print r.headers['Location'] | |
print r | |
def handle_event(msg): | |
print json.dumps(msg, sort_keys=True, | |
indent=2, separators=(',', ': ')) | |
if "stasis_start" in msg: | |
id = msg["stasis_start"]["channel"]["uniqueid"] | |
print "New channel: %s" % id | |
c = Channel(id) | |
c.answer() | |
c.play('sound:hello-world') | |
c.continue_to_dialplan() | |
def main(argv): | |
ws = websocket.create_connection(ws_url("hello"), | |
header={"Sec-WebSocket-Protocol: stasis"}) | |
try: | |
print "Ready." | |
msg_str = ws.recv() | |
while msg_str is not None: | |
msg_json = json.loads(msg_str) | |
handle_event(msg_json) | |
msg_str = ws.recv() | |
except KeyboardInterrupt: | |
print "*** Closing" | |
except websocket.WebSocketConnectionClosedException: | |
print "*** Closed" | |
ws.close() | |
if __name__ == "__main__": | |
sys.exit(main(sys.argv) or 0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment