Created
October 25, 2023 23:51
-
-
Save GarryOne/0965529c8aa2f0d3ee3d870fdd48e3bf to your computer and use it in GitHub Desktop.
Migrating Users from One Auth0 Tenant to Another in Python: Handling Large Payloads
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gzip | |
import json | |
import time | |
import requests | |
export_management_token = "YOUR_MANAGEMENT_TOKEN" # Replace with your token | |
domain = "YOUR_SOURCE_TENANT_DOMAIN" # Replace with your source tenant's domain | |
connection_id = "YOUR_CONNECTION_ID" # Replace with your connection ID | |
export_url = f'https://{domain}/api/v2/jobs/users-exports' | |
export_payload = { | |
'connection_id': connection_id, # Replace with your connection ID | |
'format': 'json', | |
'fields': [ | |
{"name": "user_id"}, | |
{"name": "given_name"}, | |
{"name": "family_name"}, | |
{"name": "nickname"}, | |
{"name": "name"}, | |
{"name": "email"}, | |
{"name": "email_verified"}, | |
{"name": "picture"}, | |
{"name": "identities[0].connection"}, | |
{"name": "created_at"}, | |
{"name": "updated_at"}, | |
{"name": "password_hash"}, | |
{"name": "username"}, | |
{"name": "custom_password_hash"} | |
] | |
} | |
headers = { | |
'Authorization': f'Bearer {export_management_token}', | |
'Content-Type': 'application/json' | |
} | |
export_response = requests.post(export_url, data=json.dumps(export_payload), headers=headers) | |
export_data = export_response.json() | |
# Print the decoded data (optional) | |
print(export_data) | |
job_id = export_data.get('id') | |
connection_name = export_data.get('connection') | |
# Print the initial response | |
print(export_data) | |
# Check the job status until completion or error | |
while True: | |
time.sleep(1) | |
# Check job status | |
url = f"https://{domain}/api/v2/jobs/{job_id}" | |
response = requests.get(url, headers=headers) | |
status_data = response.json() | |
print(status_data) # Print the status | |
# If the "location" field is in the response or if the job has failed, stop checking | |
if "location" in status_data or status_data.get('status') in ['completed', 'failed']: | |
break | |
# If the "location" field exists, download the file | |
if "location" in status_data: | |
download_url = status_data['location'] | |
response = requests.get(download_url, stream=True) | |
# Get the filename from the Content-Disposition header or use a default name | |
compressed_filename = response.headers.get('content-disposition', '').split('filename=')[-1].strip('"') | |
if not compressed_filename: | |
compressed_filename = f'./scripts/users-{connection_name}.gz' # Default name if none found | |
with open(compressed_filename, 'wb') as compressed_file: | |
for chunk in response.iter_content(chunk_size=8192): | |
compressed_file.write(chunk) | |
print(f"File downloaded and saved as {compressed_filename}") | |
# Decompress the file | |
decompressed_ndjson_filename = compressed_filename.replace('.gz', '.ndjson') | |
with gzip.open(compressed_filename, 'rt') as gz_file: | |
with open(decompressed_ndjson_filename, 'w') as out_file: | |
out_file.write(gz_file.read()) | |
print(f"File decompressed and saved as {decompressed_ndjson_filename}") | |
# Convert NDJSON to JSON | |
with open(decompressed_ndjson_filename, 'r') as ndjson_file: | |
ndjson_data = ndjson_file.readlines() | |
json_data = [json.loads(item) for item in ndjson_data] | |
json_filename = decompressed_ndjson_filename.replace('.ndjson', '.json') | |
with open(json_filename, 'w') as json_file: | |
json.dump(json_data, json_file, indent=4) | |
print(f"Converted NDJSON to JSON and saved as {json_filename}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment