Skip to content

Instantly share code, notes, and snippets.

@heyseus1
Created July 14, 2021 01:16
Show Gist options
  • Save heyseus1/e63c548c792f7d6dfee2d59b218a915a to your computer and use it in GitHub Desktop.
Save heyseus1/e63c548c792f7d6dfee2d59b218a915a to your computer and use it in GitHub Desktop.
requires Okta OIDC and refresh token with proper scopes. skips deactivated and suspended users
#!/usr/bin/python
import requests
import csv
import os
import json
import pandas as pd
scope = "okta.users.read okta.groups.read okta.groups.manage okta.users.manage okta.logs.read okta.events.read okta.trustedOrigins.manage okta.trustedOrigins.read"
client_id = os.environ['OKTA_CLIENT_ID']
client_secret = os.environ['OKTA_SECRET_ID']
authorization = os.environ['OKTA_AUTH']
refresh_token = os.environ['OKTA_REFRESH_TOKEN']
redirect_uri = "https://oauth.pstmn.io/v1/browser-callback"
domain = input('enter subdomain:' )
class Events():
def okta_token(self):
grant_type = "refresh_token"
url = f"https://{domain}.okta.com/oauth2/v1/token"
headers = {
'Accept': 'application/json',
'Content-Type': 'application/x-www-form-urlencoded'
}
payload=f"grant_type={grant_type}&scope={scope}&redirect_uri={redirect_uri}&Authorization={authorization}&refresh_token={refresh_token}&client_id={client_id}&client_secret={client_secret}"
response = requests.request("POST", url, headers=headers, data=payload)
json = response.json()
access_token = json.get("access_token")
scope_allowed = json.get("scope")
expires_in = json.get("expires_in")
return access_token
def call_group(self):
search_url = f"https://{domain}.okta.com/api/v1/groups?q=everyone&limit=1"
response = requests.get(search_url,
headers={'Accept': 'application/json',
'authorization': 'Bearer '+ Events().okta_token()}).json()
for data in response:
group_id = data['id']
Events().call_group_users_endpoint(group_id)
def call_group_users_endpoint(self, group_id):
group_users_endpoint = f'https://{domain}.okta.com/api/v1/groups/{group_id}/users?limit=1000'
url = group_users_endpoint.format(group_id)
email_data = []
fname_data = []
lname_data = []
status_data = []
login_data = []
okta_id_data = []
try:
response = requests.get(url, headers={'Acccept': 'application/json',
'authorization': 'Bearer '+ Events().okta_token()})
response.raise_for_status()
users = response.json()
links = response.links
while 'next' in links:
url = links['next']['url']
response = requests.get(url, headers={'Acccept': 'application/json',
'authorization': 'Bearer '+ Events().okta_token()})
response.raise_for_status()
next_users = response.json()
users += next_users
links = response.links
except Exception as e:
error = "get_all_users failed with exception {}".format(e)
print(error)
for group_data in users:
if group_data['status'] == 'SUSPENDED':
pass
elif group_data['status'] == 'DEPROVISIONED':
pass
else:
group_emails = group_data['profile']['email']
email_data.append(group_emails)
first_name = group_data['profile']['firstName']
fname_data.append(first_name)
last_name = group_data['profile']['lastName']
lname_data.append(last_name)
user_status = group_data['status']
status_data.append(user_status)
login = group_data['profile']['login']
login_data.append(login)
user_okta_id = group_data['id']
okta_id_data.append(user_okta_id)
d = {'Email': email_data, 'First Name': fname_data, 'Last Name': lname_data, 'Status': status_data, 'Login': login_data, 'id': okta_id_data}
df = pd.DataFrame(data=d)
print(df)
csv_name = Events().call_group_endpoint(group_id)['profile']['name']
df.to_csv("okta_user_output.csv", index=False)
def call_group_endpoint(self, group_id):
group_endpoint = f'https://{domain}.okta.com/api/v1/groups/{group_id}'
url = group_endpoint.format(group_id)
return requests.get(url,
headers={'Acccept': 'application/json',
'authorization': 'Bearer '+ Events().okta_token()}).json()
if __name__ == "__main__":
Events().call_group()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment