Last active
June 12, 2026 17:25
-
-
Save jay0lee/5c8632cb8883268cbadd2ea8106b7d49 to your computer and use it in GitHub Desktop.
Demo script to manage Gemini Enterprise User Licenses
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| gemini_licenses.py - A CLI tool to manage Google Cloud Gemini Enterprise Licenses | |
| This script interacts with the Google Cloud Discovery Engine API to manage user | |
| licenses for Gemini Enterprise. It supports listing subscriptions, listing | |
| assigned licenses, batch assigning/removing licenses, and syncing state from a CSV. | |
| Prerequisites: | |
| 1. Python 3.7+ | |
| 2. Install required packages: | |
| $ pip install google-api-python-client google-auth | |
| 3. Authenticate with Google Cloud Application Default Credentials (ADC): | |
| $ gcloud auth application-default login | |
| 4. Ensure your authenticated identity has the necessary IAM permissions | |
| (e.g., roles/discoveryengine.admin) on the target project. | |
| Usage Examples: | |
| # 1. View UserStore licensing configuration (auto-assignment settings) | |
| $ python3 gemini_licenses.py --project 123456789012 --location us get-config | |
| # 2. View available Gemini Enterprise subscriptions in your project | |
| $ python3 gemini_licenses.py --project 123456789012 --location us list-subscriptions | |
| # 3. View currently assigned users | |
| $ python3 gemini_licenses.py --project 123456789012 --location us list | |
| # 4. Assign a license to specific users (auto-selects subscription if only 1 exists) | |
| $ python3 gemini_licenses.py --project 123456789012 --location us assign \ | |
| --emails alice@example.com bob@example.com | |
| # 5. Remove a license from a user (changes state to UNASSIGNED) | |
| $ python3 gemini_licenses.py --project 123456789012 --location us remove \ | |
| --emails charlie@example.com | |
| # 6. Remove a license and completely purge the user record | |
| $ python3 gemini_licenses.py --project 123456789012 --location us remove \ | |
| --emails dave@example.com --delete-user-record | |
| # 7. Sync licenses to match a CSV file (assigns missing, removes extras) | |
| $ python3 gemini_licenses.py --project 123456789012 --location us sync \ | |
| --csv employees.csv --column "Email Address" | |
| """ | |
| import argparse | |
| import sys | |
| import time | |
| import json | |
| import httplib2 | |
| import logging | |
| import csv | |
| import google.auth | |
| from googleapiclient.discovery import build | |
| from googleapiclient.errors import HttpError | |
| # Mute the annoying httplib2 timeout warnings | |
| logging.getLogger('google_auth_httplib2').setLevel(logging.ERROR) | |
| # The API uses 'default_user_store' for the top-level user store parent under Gemini Enterprise | |
| USER_STORE_ID = 'default_user_store' | |
| def enable_debug_logging(): | |
| """Enables raw HTTP wire-level logging for the googleapiclient library.""" | |
| httplib2.debuglevel = 4 | |
| def chunked_list(lst, chunk_size): | |
| """Yields successive chunks from a list.""" | |
| if chunk_size < 1: | |
| chunk_size = 500 # Fallback to default if someone passes 0 or negative | |
| for i in range(0, len(lst), chunk_size): | |
| yield lst[i:i + chunk_size] | |
| def resolve_project_number(project_id_or_number): | |
| """Checks if the input is a number. If not, resolves it via the Resource Manager API.""" | |
| if project_id_or_number.isdigit(): | |
| return project_id_or_number | |
| print(f"[*] Notice: You provided Project ID '{project_id_or_number}'.") | |
| print(" The Discovery Engine API requires the numeric Project Number for mutations.") | |
| print(" Looking up the Project Number...") | |
| try: | |
| credentials, _ = google.auth.default( | |
| scopes=['https://www.googleapis.com/auth/cloud-platform.read-only'] | |
| ) | |
| rm_client = build('cloudresourcemanager', 'v1', credentials=credentials) | |
| project_info = rm_client.projects().get(projectId=project_id_or_number).execute() | |
| project_number = project_info['projectNumber'] | |
| print(f"[+] Found Project Number: {project_number}") | |
| print(f" Tip: Pass '--project {project_number}' next time to save an API call!\n") | |
| return project_number | |
| except HttpError as e: | |
| print(f"\n[!] Failed to look up project number for '{project_id_or_number}'.") | |
| print(f" Error: {e.reason}") | |
| print(" Please manually pass your 12-digit Project Number using the --project flag.") | |
| sys.exit(1) | |
| except Exception as e: | |
| print(f"\n[!] Unexpected error resolving project number: {e}") | |
| sys.exit(1) | |
| def get_discovery_client(project_number, location): | |
| """Obtains ADC and builds the Discovery Engine API client with least privilege scope.""" | |
| credentials, _ = google.auth.default( | |
| scopes=['https://www.googleapis.com/auth/discoveryengine.serving.readwrite'] | |
| ) | |
| credentials = credentials.with_quota_project(project_number) | |
| if location.lower() == 'global': | |
| endpoint = "https://discoveryengine.googleapis.com" | |
| else: | |
| endpoint = f"https://{location.lower()}-discoveryengine.googleapis.com" | |
| client = build( | |
| 'discoveryengine', | |
| 'v1', | |
| credentials=credentials, | |
| client_options={'api_endpoint': endpoint} | |
| ) | |
| return client | |
| def process_lro(client, op_data): | |
| """Processes a Long-Running Operation (LRO), handling immediate completion or polling.""" | |
| def handle_result(data): | |
| if 'error' in data: | |
| print(f"\n[!] Operation Failed: {data['error'].get('message', 'Unknown Error')}") | |
| return False | |
| res_data = data.get('response', {}) | |
| if 'errorSamples' in res_data and res_data['errorSamples']: | |
| print("\n[!] Operation completed, but some licenses failed:") | |
| for err in res_data['errorSamples']: | |
| print(f" - {err.get('message', err)}") | |
| return False | |
| else: | |
| print("\n[+] Operation chunk completed successfully!") | |
| return True | |
| if op_data.get('done'): | |
| return handle_result(op_data) | |
| operation_name = op_data.get('name') | |
| if not operation_name: | |
| print("\n[!] Operation started, but no operation name was returned to track.") | |
| return False | |
| print(f"Waiting for operation to complete: {operation_name.split('/')[-1]} ", end="", flush=True) | |
| retries = 0 | |
| while True: | |
| time.sleep(3) | |
| try: | |
| op_data = client.projects().locations().operations().get( | |
| name=operation_name | |
| ).execute() | |
| if op_data.get('done'): | |
| print(" Done!") | |
| return handle_result(op_data) | |
| print(".", end="", flush=True) | |
| except HttpError as e: | |
| if e.resp.status == 404: | |
| retries += 1 | |
| if retries <= 5: | |
| print("?", end="", flush=True) | |
| continue | |
| else: | |
| print("\n[!] Operation polling failed. The operation ID was not found after 15 seconds.") | |
| raise | |
| else: | |
| raise | |
| def get_user_store_config(client, project_number, location): | |
| """READ: Fetches and displays the UserStore configuration.""" | |
| name = f"projects/{project_number}/locations/{location}/userStores/{USER_STORE_ID}" | |
| print(f"Fetching UserStore configuration from: {name}") | |
| try: | |
| response = client.projects().locations().userStores().get( | |
| name=name | |
| ).execute() | |
| except HttpError as e: | |
| if e.resp.status == 404: | |
| print(f"\n[!] The userStore '{USER_STORE_ID}' was not found. Are you in the correct location?") | |
| return | |
| raise | |
| print("-" * 75) | |
| print(f"User Store Name: {response.get('name', 'Unknown')}") | |
| print(f"Display Name: {response.get('displayName', 'N/A')}") | |
| print("-" * 75) | |
| default_lic = response.get('defaultLicenseConfig', '') | |
| if default_lic: | |
| sub_id = default_lic.split('/')[-1] | |
| print(f"Default Subscription ID : {sub_id}") | |
| print(f"Default Subscription Full Path : {default_lic}") | |
| else: | |
| print("Default Subscription : None Configured") | |
| auto_register = response.get('enableLicenseAutoRegister', False) | |
| auto_update = response.get('enableExpiredLicenseAutoUpdate', False) | |
| print(f"Auto-Register New Users : {auto_register}") | |
| print(f"Auto-Update Expired Licenses : {auto_update}") | |
| print("-" * 75) | |
| def get_all_licenses(client, project_number, location): | |
| """Internal helper to fetch all licenses, handling pagination automatically.""" | |
| parent = f"projects/{project_number}/locations/{location}/userStores/{USER_STORE_ID}" | |
| licenses = [] | |
| page_token = None | |
| while True: | |
| response = client.projects().locations().userStores().userLicenses().list( | |
| parent=parent, | |
| pageSize=50, | |
| pageToken=page_token | |
| ).execute() | |
| licenses.extend(response.get('userLicenses', [])) | |
| page_token = response.get('nextPageToken') | |
| if not page_token: | |
| break | |
| return licenses | |
| def fetch_subscriptions(client, project_number, location): | |
| """Internal helper to pull the raw subscription payload from the API.""" | |
| parent = f"projects/{project_number}/locations/{location}" | |
| try: | |
| response = client.projects().locations().licenseConfigs().list( | |
| parent=parent | |
| ).execute() | |
| return response.get('licenseConfigs', []) | |
| except HttpError as e: | |
| if e.resp.status == 404: | |
| return [] | |
| raise | |
| def resolve_subscription_id(client, project_number, location, provided_id): | |
| """Validates or auto-discovers the subscription ID based on current availability.""" | |
| if provided_id: | |
| return provided_id | |
| print("No --subscription-id provided. Checking available subscriptions...") | |
| configs = fetch_subscriptions(client, project_number, location) | |
| if not configs: | |
| print("\n[!] Error: No subscriptions/license configs found for this project and location.") | |
| sys.exit(1) | |
| sub_ids = [config.get('name', '').split('/')[-1] for config in configs] | |
| if len(sub_ids) == 1: | |
| selected_id = sub_ids[0] | |
| print(f"[+] Auto-selected the only available subscription: {selected_id}\n") | |
| return selected_id | |
| print(f"\n[!] Error: Multiple subscriptions found ({', '.join(sub_ids)}).") | |
| print("Please explicitly specify which one to use by passing the --subscription-id flag.") | |
| sys.exit(1) | |
| def list_subscriptions(client, project_number, location): | |
| """READ: Lists the subscription License Configs distributed to this project.""" | |
| print(f"Fetching subscriptions (License Configs) from: projects/{project_number}/locations/{location}") | |
| configs = fetch_subscriptions(client, project_number, location) | |
| if not configs: | |
| print("No subscriptions/license configurations distributed to this project.") | |
| return | |
| print("-" * 75) | |
| for config in configs: | |
| name = config.get('name', 'Unknown') | |
| sub_id = name.split('/')[-1] | |
| active_count = config.get('activeLicenseCount', '0') | |
| total_count = config.get('totalLicenseCount', 'Unknown') | |
| state = config.get('state', 'UNKNOWN') | |
| print(f"Subscription ID: {sub_id}") | |
| print(f"State: {state} | Assigned: {active_count}/{total_count}") | |
| print("-" * 75) | |
| def list_licenses(client, project_number, location): | |
| """READ: Prints currently assigned user licenses.""" | |
| print(f"Fetching licenses for: projects/{project_number}/locations/{location}/userStores/{USER_STORE_ID}") | |
| licenses = get_all_licenses(client, project_number, location) | |
| if not licenses: | |
| print("No licenses found.") | |
| return | |
| print("-" * 50) | |
| for lic in licenses: | |
| name = lic.get('name', 'Unknown') | |
| state = lic.get('licenseAssignmentState', 'UNKNOWN_STATE') | |
| email = lic.get('userPrincipal', name.split('/')[-1]) | |
| print(f"User: {email} | State: {state}") | |
| print("-" * 50) | |
| print(f"Total assignments: {len(licenses)}") | |
| def batch_update_licenses(client, project_number, location, subscription_id, assigns, removes, delete_record, chunk_size): | |
| """CORE MUTATION: Handles both assigns and removes, chunking the payloads to ensure stability.""" | |
| if not assigns and not removes: | |
| print("No users provided to update.") | |
| return | |
| parent = f"projects/{project_number}/locations/{location}/userStores/{USER_STORE_ID}" | |
| license_config_name = f"projects/{project_number}/locations/{location}/licenseConfigs/{subscription_id}" if subscription_id else "" | |
| all_operations = [] | |
| for email in assigns: | |
| all_operations.append({ | |
| "userPrincipal": email, | |
| "licenseConfig": license_config_name, | |
| "licenseAssignmentState": "ASSIGNED" | |
| }) | |
| for email in removes: | |
| all_operations.append({ | |
| "userPrincipal": email, | |
| "licenseAssignmentState": "UNASSIGNED" | |
| }) | |
| chunks = list(chunked_list(all_operations, chunk_size)) | |
| total_chunks = len(chunks) | |
| action_text = "Removing records" if delete_record else "Unassigning licenses" | |
| if removes: | |
| print(f"Executing batch update: {len(assigns)} to assign, {len(removes)} to {action_text.lower()}...") | |
| else: | |
| print(f"Executing batch update: {len(assigns)} to assign...") | |
| for idx, chunk in enumerate(chunks, 1): | |
| if total_chunks > 1: | |
| print(f"\nProcessing Chunk {idx} of {total_chunks} ({len(chunk)} users)...") | |
| body = { | |
| "deleteUnassignedUserLicenses": delete_record, | |
| "inlineSource": { | |
| "userLicenses": chunk, | |
| "updateMask": "userPrincipal,licenseConfig,licenseAssignmentState" | |
| } | |
| } | |
| response = client.projects().locations().userStores().batchUpdateUserLicenses( | |
| parent=parent, body=body | |
| ).execute() | |
| success = process_lro(client, response) | |
| # Stop processing future chunks if a quota/billing error trips the current chunk | |
| if not success and total_chunks > 1: | |
| print("\n[!] Halting remaining chunks due to failure in the current batch.") | |
| break | |
| def sync_licenses(client, project_number, location, subscription_id, csv_path, column_name, delete_record, chunk_size): | |
| """SYNC: Reconciles currently assigned users against a CSV source of truth.""" | |
| target_emails = set() | |
| try: | |
| with open(csv_path, 'r', encoding='utf-8') as f: | |
| reader = csv.DictReader(f) | |
| if column_name not in reader.fieldnames: | |
| print(f"Error: Column '{column_name}' not found in CSV. Available columns: {', '.join(reader.fieldnames)}") | |
| sys.exit(1) | |
| for row in reader: | |
| email = row[column_name].strip() | |
| if email: | |
| target_emails.add(email) | |
| except FileNotFoundError: | |
| print(f"Error: Could not find CSV file at '{csv_path}'") | |
| sys.exit(1) | |
| print("Fetching current license state from Gemini Enterprise...") | |
| current_licenses = get_all_licenses(client, project_number, location) | |
| current_emails = set() | |
| for lic in current_licenses: | |
| if lic.get('licenseAssignmentState') == 'ASSIGNED': | |
| name = lic.get('name', '') | |
| email = lic.get('userPrincipal', name.split('/')[-1]) | |
| current_emails.add(email) | |
| to_assign = target_emails - current_emails | |
| to_remove = current_emails - target_emails | |
| print("-" * 50) | |
| print(f"CSV Target Users : {len(target_emails)}") | |
| print(f"Currently Active : {len(current_emails)}") | |
| print(f"Delta -> Assign : {len(to_assign)}") | |
| print(f"Delta -> Remove : {len(to_remove)}") | |
| print("-" * 50) | |
| if not to_assign and not to_remove: | |
| print("Licenses are already in sync. No changes needed.") | |
| return | |
| batch_update_licenses(client, project_number, location, subscription_id, list(to_assign), list(to_remove), delete_record, chunk_size) | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Manage Gemini Enterprise Licenses via CLI") | |
| parser.add_argument('--project', help='Google Cloud Project ID or Number (defaults to ADC project)') | |
| parser.add_argument('--location', default='global', | |
| help='Location of the license (default: global. e.g., us, eu, asia)') | |
| parser.add_argument('--chunk-size', type=int, default=500, | |
| help='Number of licenses to process per batch API call (default: 500)') | |
| parser.add_argument('--debug', action='store_true', help='Enable raw HTTP request logging') | |
| subparsers = parser.add_subparsers(dest='command', help='Commands') | |
| subparsers.required = True | |
| subparsers.add_parser('get-config', help='Get UserStore licensing configuration') | |
| subparsers.add_parser('list-subscriptions', help='List subscriptions distributed to the project') | |
| subparsers.add_parser('list', help='List all user licenses') | |
| assign_parser = subparsers.add_parser('assign', help='Assign license to one or more users') | |
| assign_parser.add_argument('--subscription-id', help='Your Gemini Enterprise Subscription ID (optional if only 1 exists)') | |
| assign_parser.add_argument('--emails', nargs='+', required=True, help='Space-separated list of user emails') | |
| remove_parser = subparsers.add_parser('remove', help='Remove license from one or more users') | |
| remove_parser.add_argument('--emails', nargs='+', required=True, help='Space-separated list of user emails') | |
| remove_parser.add_argument('--delete-user-record', action='store_true', | |
| help='Fully delete the user from the userStore after removing the license (Unassign and delete)') | |
| sync_parser = subparsers.add_parser('sync', help='Sync assigned licenses with a CSV file') | |
| sync_parser.add_argument('--subscription-id', help='Your Gemini Enterprise Subscription ID (optional if only 1 exists)') | |
| sync_parser.add_argument('--csv', required=True, help='Path to the CSV file containing the source of truth') | |
| sync_parser.add_argument('--column', required=True, help='The exact name of the CSV column containing emails') | |
| sync_parser.add_argument('--delete-user-record', action='store_true', | |
| help='Fully delete removed users from the userStore (Unassign and delete)') | |
| args = parser.parse_args() | |
| if args.debug: | |
| enable_debug_logging() | |
| _, adc_project = google.auth.default() | |
| raw_project = args.project or adc_project | |
| if not raw_project: | |
| print("Error: Could not determine Project ID. Provide --project flag or configure your ADC environment.") | |
| sys.exit(1) | |
| project_number = resolve_project_number(raw_project) | |
| client = get_discovery_client(project_number, args.location) | |
| try: | |
| if args.command == 'get-config': | |
| get_user_store_config(client, project_number, args.location) | |
| elif args.command == 'list-subscriptions': | |
| list_subscriptions(client, project_number, args.location) | |
| elif args.command == 'list': | |
| list_licenses(client, project_number, args.location) | |
| elif args.command == 'assign': | |
| sub_id = resolve_subscription_id(client, project_number, args.location, args.subscription_id) | |
| batch_update_licenses(client, project_number, args.location, sub_id, | |
| assigns=args.emails, removes=[], delete_record=False, chunk_size=args.chunk_size) | |
| elif args.command == 'remove': | |
| batch_update_licenses(client, project_number, args.location, subscription_id=None, | |
| assigns=[], removes=args.emails, delete_record=args.delete_user_record, chunk_size=args.chunk_size) | |
| elif args.command == 'sync': | |
| sub_id = resolve_subscription_id(client, project_number, args.location, args.subscription_id) | |
| sync_licenses(client, project_number, args.location, sub_id, | |
| args.csv, args.column, args.delete_user_record, args.chunk_size) | |
| except HttpError as e: | |
| print(f"\nAPI Request Error ({e.resp.status}): {e.reason}") | |
| try: | |
| error_details = json.loads(e.content.decode('utf-8')) | |
| print(f"Details: {json.dumps(error_details, indent=2)}") | |
| except Exception: | |
| print(f"Raw Content: {e.content}") | |
| sys.exit(1) | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment