Skip to content

Instantly share code, notes, and snippets.

@Informatic
Last active March 24, 2019 08:34
Show Gist options
  • Save Informatic/3d1fde7d04722831411b3d660673647b to your computer and use it in GitHub Desktop.
Save Informatic/3d1fde7d04722831411b3d660673647b to your computer and use it in GitHub Desktop.
Minimal janus-gateway REST API python client
# Reference:
# https://janus.conf.meetecho.com/docs/rest.html
# https://janus.conf.meetecho.com/docs/streaming.html
import requests
import json
import pprint
import random
import string
def gen_random_key(length, chars=string.ascii_letters+string.digits):
return ''.join([random.choice(chars) for n in range(length)])
class Janus(object):
def __init__(self, base_url, transaction=None):
self.s = requests.Session()
self.base_url = base_url
self.session_id = None
self.transaction = transaction
if self.transaction is None:
self.transaction = gen_random_key(16)
def request(self, url, data):
resp = self.s.post('%s%s' % (self.base_url, url),
data=json.dumps(data))
print(resp.json())
return resp
def acquire_session(self):
self.session_id = self.request('', {
'janus': 'create',
'transaction': self.transaction,
}).json()['data']['id']
def acquire_plugin(self, name):
return self.janus_call('attach', {
'plugin': name,
})['data']['id']
def janus_call(self, method, payload):
data = {
'janus': method,
'transaction': self.transaction,
}
data.update(payload)
return self.request('/%d' % self.session_id, data).json()
def plugin_call(self, handle, payload):
return self.request('/%d/%d' % (self.session_id, handle), {
'janus': 'message',
'body': payload,
'transaction': self.transaction,
}).json()['plugindata']['data']
def poll(self):
return self.s.get('%s/%d' % (self.base_url, self.session_id)).json()
if __name__ == '__main__':
j = Janus('http://localhost:8088/janus')
j.acquire_session()
handle = j.acquire_plugin('janus.plugin.streaming')
admin_key = 'testing'
for s in j.plugin_call(handle, {'request': 'list'})['list']:
pprint.pprint([s, j.plugin_call(handle, {
'request': 'destroy',
# 'admin_key': admin_key,
'id': s['id'],
})])
print(j.poll())
print(j.plugin_call(handle, {
'request': 'create',
'admin_key': admin_key,
'type': 'rtp',
'name': 'teststream',
'description': 'teststream',
'video': True,
'videoport': 2137,
'videopt': 96,
'videortpmap': 'H264/90000',
'videofmtp': 'profile-level-id=42e01f\;packetization-mode=1',
'videobufferkf': True,
}))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment