Last active
August 22, 2017 18:17
-
-
Save tmkasun/9bc44a271d9330d4d2c4393153216446 to your computer and use it in GitHub Desktop.
WSO2 API Manager get all subscriptions available for APIs created by given provider (user)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests, base64 | |
configs = { | |
'credentials': { | |
'username': 'admin', | |
'password': 'admin', | |
}, | |
'host': 'localhost', | |
'port': '9443', | |
'gateway_port': '8243', | |
'scheme': 'https' | |
} | |
host = configs['host'] | |
port = configs['port'] | |
scheme = configs['scheme'] | |
def get_application(username, password): | |
print("INFO: Creating new OAuth application(Dynamic Client Registration)") | |
endpoint = "{}://{}:{}".format(scheme, host, port) + "/client-registration/v0.11/register" | |
b64string = base64.b64encode("{}:{}".format(username, password).encode()) | |
json_data = { | |
"callbackUrl": "www.wso2.org", | |
"clientName": "testing_app", | |
"tokenScope": "Production", | |
"owner": username, | |
"grantType": "password refresh_token", | |
"saasApp": True | |
} | |
headers = { | |
'Authorization': "Basic {}".format(b64string.decode()) | |
} | |
apps_response = requests.post(endpoint, headers=headers, json=json_data, verify=False) | |
if not apps_response.ok: | |
return False | |
app = apps_response.json() | |
return app | |
def get_access_token(app, username, password): | |
print("INFO: Generating access token for newly created/existing OAuth application (Publisher REST API) ") | |
port = configs['gateway_port'] # Gateway SSL port | |
endpoint = "{}://{}:{}".format(scheme, host, port) + "/token" | |
b64string = base64.b64encode("{}:{}".format(app['clientId'], app['clientSecret']).encode()) | |
form_data = { | |
'grant_type': "password", | |
"scope": "apim:api_view apim:subscription_view", | |
"username": username, | |
"password": password | |
} | |
headers = { | |
'Authorization': "Basic {}".format(b64string.decode()) | |
} | |
token_response = requests.post(endpoint, headers=headers, data=form_data, verify=False) | |
if not token_response.ok: | |
return False | |
tokens = token_response.json() | |
return tokens | |
def get_apis(query, access_token): | |
print("INFO: Get all APIs filtered by given query params") | |
endpoint = "{}://{}:{}".format(scheme, host, port) + "/api/am/publisher/v0.11/apis" | |
if query: | |
endpoint = "{}?query={}".format(endpoint, query) | |
headers = { | |
'Authorization': "Bearer {}".format(access_token) | |
} | |
apis_response = requests.get(endpoint, headers=headers, verify=False) | |
if not apis_response.ok: | |
return False | |
resources = apis_response.json() | |
return resources['list'] | |
def get_subscriptions(api, access_token): | |
print("INFO: Get all subscriptions for the given API UUID") | |
api_id = api['id'] | |
endpoint = "{}://{}:{}".format(scheme, host, port) + "/api/am/publisher/v0.11/subscriptions?apiId=" + api_id | |
headers = { | |
'Authorization': "Bearer {}".format(access_token) | |
} | |
subscriptions_response = requests.get(endpoint, headers=headers, verify=False) | |
if not subscriptions_response.ok: | |
return False | |
resources = subscriptions_response.json() | |
return resources['list'] | |
def get_subscriptions_by_provider(provider_name): | |
print("INFO: Get all subscriptions which are there for the APIs created by given `provider` user") | |
username = configs['credentials']['username'] | |
password = configs['credentials']['password'] | |
app = get_application(username, password) | |
tokens = get_access_token(app, username, password) | |
apis = get_apis("provider:{}".format(provider_name), tokens['access_token']) | |
aggregate_subs = [] | |
for api in apis: | |
api_subscriptions = get_subscriptions(api, tokens['access_token']) | |
aggregate_subs += api_subscriptions | |
return aggregate_subs | |
def main(): | |
print("Sample Client for get subscriptions by provider") | |
provider_name = "admin" | |
all_subscriptions = get_subscriptions_by_provider(provider_name) | |
print("\nINFO: There are {} subscriptions for the user {} as follows:\n".format(len(all_subscriptions), | |
provider_name)) | |
for subs in all_subscriptions: | |
print(subs) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment