Skip to content

Instantly share code, notes, and snippets.

@leedm777
Created May 30, 2013 20:18
Show Gist options
  • Save leedm777/5680822 to your computer and use it in GitHub Desktop.
Save leedm777/5680822 to your computer and use it in GitHub Desktop.
Hello, RESTful Asterisk API
#!/usr/bin/env python
#
# Hello, REST API!
#
# Requires requests and websocket-client (which may be installed via pip)
#
# You need something in the dialplan to send a channel to the app.
#
# exten => 7000,1,Stasis(hello)
# same => n,Hangup()
#
import websocket
import sys
import json
import urllib
import requests
host = "localhost:8088"
#host = "dleenux.digium.internal:8088"
def build_url(*path, **params):
path = ["http://%s" % host, 'stasis'] + list(path)
path = '/'.join(path)
params = urllib.urlencode(params)
if not params:
return path
return "%s?%s" % (path, params)
def ws_url(app):
path = ["ws://%s" % host, 'ws']
path = '/'.join(path)
params=urllib.urlencode({'app': app})
return "%s?%s" % (path, params)
class Channel(object):
def __init__(self, id):
self.id = id
def answer(self):
r = requests.post(build_url('channels', self.id, 'answer'))
print r
def continue_to_dialplan(self):
r = requests.post(build_url('channels', self.id, 'continue'))
print r
def hangup(self):
r = requests.delete(build_url('channels', self.id))
print r
def play(self, media):
r = requests.post(build_url('channels', self.id, 'play', media=media))
print r.headers['Location']
print r
def handle_event(msg):
print json.dumps(msg, sort_keys=True,
indent=2, separators=(',', ': '))
if "stasis_start" in msg:
id = msg["stasis_start"]["channel"]["uniqueid"]
print "New channel: %s" % id
c = Channel(id)
c.answer()
c.play('sound:hello-world')
c.continue_to_dialplan()
def main(argv):
ws = websocket.create_connection(ws_url("hello"),
header={"Sec-WebSocket-Protocol: stasis"})
try:
print "Ready."
msg_str = ws.recv()
while msg_str is not None:
msg_json = json.loads(msg_str)
handle_event(msg_json)
msg_str = ws.recv()
except KeyboardInterrupt:
print "*** Closing"
except websocket.WebSocketConnectionClosedException:
print "*** Closed"
ws.close()
if __name__ == "__main__":
sys.exit(main(sys.argv) or 0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment