Created
April 24, 2025 04:24
-
-
Save soodoku/d23e32ed9749dcd7a0e8a7ce80c75b5a to your computer and use it in GitHub Desktop.
List all dataverse repos.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import json | |
import requests | |
import sys | |
from urllib.parse import quote | |
def parse_arguments(): | |
"""Parse command line arguments""" | |
parser = argparse.ArgumentParser(description="List Dataverse datasets owned by a user and convert to JSON") | |
parser.add_argument("-b", "--base-url", required=True, help="Base URL of the Dataverse installation (e.g. https://demo.dataverse.org)") | |
parser.add_argument("-t", "--api-token", required=True, help="API token of the user") | |
parser.add_argument("-u", "--user", required=True, help="Username of the dataset owner") | |
parser.add_argument("-o", "--output", help="Output JSON file (default: stdout)") | |
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") | |
return parser.parse_args() | |
def get_user_datasets(base_url, api_token, username): | |
"""Get all datasets owned by a user""" | |
headers = {"X-Dataverse-key": api_token} | |
# Using the search API to find datasets by user | |
search_url = f"{base_url}/api/search?q=*&type=dataset&sort=name&order=asc&per_page=1000&author_string={quote(username)}" | |
if args.verbose: | |
print(f"Searching for datasets: {search_url}", file=sys.stderr) | |
response = requests.get(search_url, headers=headers) | |
response.raise_for_status() | |
search_results = response.json() | |
total_count = search_results.get('data', {}).get('total_count', 0) | |
if args.verbose: | |
print(f"Found {total_count} datasets", file=sys.stderr) | |
datasets = [] | |
# Process search results | |
for item in search_results.get('data', {}).get('items', []): | |
# Get more detailed information about each dataset | |
dataset_id = item.get('global_id') | |
if not dataset_id: | |
continue | |
if args.verbose: | |
print(f"Processing dataset: {dataset_id}", file=sys.stderr) | |
# Get detailed metadata for each dataset | |
dataset_url = f"{base_url}/api/datasets/:persistentId/?persistentId={dataset_id}" | |
dataset_response = requests.get(dataset_url, headers=headers) | |
if dataset_response.status_code != 200: | |
print(f"Error retrieving dataset {dataset_id}: {dataset_response.text}", file=sys.stderr) | |
continue | |
dataset_data = dataset_response.json().get('data', {}) | |
# Extract required information | |
dataset_info = { | |
"title": dataset_data.get('latestVersion', {}).get('metadataBlocks', {}).get('citation', {}).get('fields', []), | |
"description": "", | |
"link": f"{base_url}/dataset.xhtml?persistentId={dataset_id}", | |
"id": dataset_id | |
} | |
# Extract title and description from metadata fields | |
for field in dataset_info["title"]: | |
if field.get('typeName') == 'title': | |
dataset_info["title"] = field.get('value', '') | |
elif field.get('typeName') == 'dsDescription': | |
descriptions = field.get('value', []) | |
if descriptions and isinstance(descriptions, list): | |
for desc in descriptions: | |
if desc.get('dsDescriptionValue', {}).get('value'): | |
dataset_info["description"] = desc.get('dsDescriptionValue', {}).get('value') | |
break | |
datasets.append(dataset_info) | |
return datasets | |
def main(): | |
"""Main function""" | |
global args | |
args = parse_arguments() | |
try: | |
# Get datasets | |
datasets = get_user_datasets(args.base_url, args.api_token, args.user) | |
# Convert to JSON | |
result = { | |
"username": args.user, | |
"base_url": args.base_url, | |
"count": len(datasets), | |
"datasets": datasets | |
} | |
# Output JSON | |
if args.output: | |
with open(args.output, 'w') as f: | |
json.dump(result, f, indent=2) | |
print(f"Output written to {args.output}", file=sys.stderr) | |
else: | |
print(json.dumps(result, indent=2)) | |
except requests.exceptions.RequestException as e: | |
print(f"Error: {e}", file=sys.stderr) | |
sys.exit(1) | |
except Exception as e: | |
print(f"Error: {e}", file=sys.stderr) | |
sys.exit(1) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment