Last active
March 24, 2019 08:34
-
-
Save Informatic/3d1fde7d04722831411b3d660673647b to your computer and use it in GitHub Desktop.
Minimal janus-gateway REST API python client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Reference: | |
# https://janus.conf.meetecho.com/docs/rest.html | |
# https://janus.conf.meetecho.com/docs/streaming.html | |
import requests | |
import json | |
import pprint | |
import random | |
import string | |
def gen_random_key(length, chars=string.ascii_letters+string.digits): | |
return ''.join([random.choice(chars) for n in range(length)]) | |
class Janus(object): | |
def __init__(self, base_url, transaction=None): | |
self.s = requests.Session() | |
self.base_url = base_url | |
self.session_id = None | |
self.transaction = transaction | |
if self.transaction is None: | |
self.transaction = gen_random_key(16) | |
def request(self, url, data): | |
resp = self.s.post('%s%s' % (self.base_url, url), | |
data=json.dumps(data)) | |
print(resp.json()) | |
return resp | |
def acquire_session(self): | |
self.session_id = self.request('', { | |
'janus': 'create', | |
'transaction': self.transaction, | |
}).json()['data']['id'] | |
def acquire_plugin(self, name): | |
return self.janus_call('attach', { | |
'plugin': name, | |
})['data']['id'] | |
def janus_call(self, method, payload): | |
data = { | |
'janus': method, | |
'transaction': self.transaction, | |
} | |
data.update(payload) | |
return self.request('/%d' % self.session_id, data).json() | |
def plugin_call(self, handle, payload): | |
return self.request('/%d/%d' % (self.session_id, handle), { | |
'janus': 'message', | |
'body': payload, | |
'transaction': self.transaction, | |
}).json()['plugindata']['data'] | |
def poll(self): | |
return self.s.get('%s/%d' % (self.base_url, self.session_id)).json() | |
if __name__ == '__main__': | |
j = Janus('http://localhost:8088/janus') | |
j.acquire_session() | |
handle = j.acquire_plugin('janus.plugin.streaming') | |
admin_key = 'testing' | |
for s in j.plugin_call(handle, {'request': 'list'})['list']: | |
pprint.pprint([s, j.plugin_call(handle, { | |
'request': 'destroy', | |
# 'admin_key': admin_key, | |
'id': s['id'], | |
})]) | |
print(j.poll()) | |
print(j.plugin_call(handle, { | |
'request': 'create', | |
'admin_key': admin_key, | |
'type': 'rtp', | |
'name': 'teststream', | |
'description': 'teststream', | |
'video': True, | |
'videoport': 2137, | |
'videopt': 96, | |
'videortpmap': 'H264/90000', | |
'videofmtp': 'profile-level-id=42e01f\;packetization-mode=1', | |
'videobufferkf': True, | |
})) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment