Created
July 14, 2021 01:16
-
-
Save heyseus1/e63c548c792f7d6dfee2d59b218a915a to your computer and use it in GitHub Desktop.
requires Okta OIDC and refresh token with proper scopes. skips deactivated and suspended users
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import requests | |
import csv | |
import os | |
import json | |
import pandas as pd | |
scope = "okta.users.read okta.groups.read okta.groups.manage okta.users.manage okta.logs.read okta.events.read okta.trustedOrigins.manage okta.trustedOrigins.read" | |
client_id = os.environ['OKTA_CLIENT_ID'] | |
client_secret = os.environ['OKTA_SECRET_ID'] | |
authorization = os.environ['OKTA_AUTH'] | |
refresh_token = os.environ['OKTA_REFRESH_TOKEN'] | |
redirect_uri = "https://oauth.pstmn.io/v1/browser-callback" | |
domain = input('enter subdomain:' ) | |
class Events(): | |
def okta_token(self): | |
grant_type = "refresh_token" | |
url = f"https://{domain}.okta.com/oauth2/v1/token" | |
headers = { | |
'Accept': 'application/json', | |
'Content-Type': 'application/x-www-form-urlencoded' | |
} | |
payload=f"grant_type={grant_type}&scope={scope}&redirect_uri={redirect_uri}&Authorization={authorization}&refresh_token={refresh_token}&client_id={client_id}&client_secret={client_secret}" | |
response = requests.request("POST", url, headers=headers, data=payload) | |
json = response.json() | |
access_token = json.get("access_token") | |
scope_allowed = json.get("scope") | |
expires_in = json.get("expires_in") | |
return access_token | |
def call_group(self): | |
search_url = f"https://{domain}.okta.com/api/v1/groups?q=everyone&limit=1" | |
response = requests.get(search_url, | |
headers={'Accept': 'application/json', | |
'authorization': 'Bearer '+ Events().okta_token()}).json() | |
for data in response: | |
group_id = data['id'] | |
Events().call_group_users_endpoint(group_id) | |
def call_group_users_endpoint(self, group_id): | |
group_users_endpoint = f'https://{domain}.okta.com/api/v1/groups/{group_id}/users?limit=1000' | |
url = group_users_endpoint.format(group_id) | |
email_data = [] | |
fname_data = [] | |
lname_data = [] | |
status_data = [] | |
login_data = [] | |
okta_id_data = [] | |
try: | |
response = requests.get(url, headers={'Acccept': 'application/json', | |
'authorization': 'Bearer '+ Events().okta_token()}) | |
response.raise_for_status() | |
users = response.json() | |
links = response.links | |
while 'next' in links: | |
url = links['next']['url'] | |
response = requests.get(url, headers={'Acccept': 'application/json', | |
'authorization': 'Bearer '+ Events().okta_token()}) | |
response.raise_for_status() | |
next_users = response.json() | |
users += next_users | |
links = response.links | |
except Exception as e: | |
error = "get_all_users failed with exception {}".format(e) | |
print(error) | |
for group_data in users: | |
if group_data['status'] == 'SUSPENDED': | |
pass | |
elif group_data['status'] == 'DEPROVISIONED': | |
pass | |
else: | |
group_emails = group_data['profile']['email'] | |
email_data.append(group_emails) | |
first_name = group_data['profile']['firstName'] | |
fname_data.append(first_name) | |
last_name = group_data['profile']['lastName'] | |
lname_data.append(last_name) | |
user_status = group_data['status'] | |
status_data.append(user_status) | |
login = group_data['profile']['login'] | |
login_data.append(login) | |
user_okta_id = group_data['id'] | |
okta_id_data.append(user_okta_id) | |
d = {'Email': email_data, 'First Name': fname_data, 'Last Name': lname_data, 'Status': status_data, 'Login': login_data, 'id': okta_id_data} | |
df = pd.DataFrame(data=d) | |
print(df) | |
csv_name = Events().call_group_endpoint(group_id)['profile']['name'] | |
df.to_csv("okta_user_output.csv", index=False) | |
def call_group_endpoint(self, group_id): | |
group_endpoint = f'https://{domain}.okta.com/api/v1/groups/{group_id}' | |
url = group_endpoint.format(group_id) | |
return requests.get(url, | |
headers={'Acccept': 'application/json', | |
'authorization': 'Bearer '+ Events().okta_token()}).json() | |
if __name__ == "__main__": | |
Events().call_group() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment