Last active
May 8, 2025 21:20
-
-
Save luisfergromo/c0d091ad8c835bef990697f9c333f1cd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from google.colab import userdata | |
| import requests | |
| from datetime import datetime, timedelta | |
| import json | |
| from concurrent.futures import ThreadPoolExecutor | |
| from tqdm import tqdm | |
| # Configuración | |
| CANVAS_API_URL = "https://educacioncontinuaiteso.instructure.com" | |
| API_KEY = userdata.get('token') | |
| MAX_WORKERS = 5 # Ajustar según límites de tu API | |
| # Headers | |
| headers = {'Authorization': f'Bearer {API_KEY}'} | |
| def get_all_users(): | |
| """Obtiene lista de todos los usuarios""" | |
| url = f"{CANVAS_API_URL}/api/v1/accounts/1/users" | |
| users = [] | |
| while url: | |
| response = requests.get(url, headers=headers, params={'per_page': 100}) | |
| if response.status_code != 200: | |
| print(f"Error obteniendo usuarios: {response.status_code}") | |
| break | |
| users.extend(response.json()) | |
| url = response.links.get('next', {}).get('url') | |
| return users | |
| def get_user_page_views(user_id, start_time=None, end_time=None): | |
| """Obtiene page views para un usuario específico""" | |
| params = {'per_page': 100} | |
| if start_time: | |
| params['start_time'] = start_time.strftime('%Y-%m-%dT%H:%M:%SZ') | |
| if end_time: | |
| params['end_time'] = end_time.strftime('%Y-%m-%dT%H:%M:%SZ') | |
| url = f"{CANVAS_API_URL}/api/v1/users/{user_id}/page_views" | |
| all_views = [] | |
| while url: | |
| response = requests.get(url, headers=headers, params=params) | |
| if response.status_code != 200: | |
| print(f"Error para usuario {user_id}: {response.status_code}") | |
| break | |
| all_views.extend(response.json()) | |
| url = response.links.get('next', {}).get('url') | |
| params = {} # Los parámetros ya están en la URL de paginación | |
| return all_views | |
| def main(): | |
| print("Obteniendo lista de usuarios...") | |
| users = get_all_users() | |
| print(f"Total de usuarios encontrados: {len(users)}") | |
| # Configurar rango de fechas si es necesario | |
| end_time = datetime.utcnow() | |
| start_time = end_time - timedelta(days=1) # Últimos 30 días | |
| all_page_views = [] | |
| print("\nRecolectando page views...") | |
| with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: | |
| futures = [] | |
| for user in tqdm(users, desc="Encolando solicitudes"): | |
| futures.append(executor.submit( | |
| get_user_page_views, | |
| user['id'], | |
| start_time, | |
| end_time | |
| )) | |
| for future in tqdm(futures, desc="Procesando usuarios", total=len(users)): | |
| try: | |
| user_views = future.result() | |
| all_page_views.extend(user_views) | |
| except Exception as e: | |
| print(f"\nError procesando usuario: {e}") | |
| # Guardar resultados | |
| output_file = 'all_page_views.json' | |
| with open(output_file, 'w') as f: | |
| json.dump(all_page_views, f, indent=2) | |
| print(f"\n✅ Proceso completado!") | |
| print(f"📊 Total de page views recolectados: {len(all_page_views)}") | |
| print(f"💾 Datos guardados en: {output_file}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment