Created
April 28, 2026 13:29
-
-
Save MikyPo/5bb5f01cd9e934ad5ac83584186c4308 to your computer and use it in GitHub Desktop.
sampling in api yametrica
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import json | |
| import time | |
| import requests | |
| import pandas as pd | |
| from datetime import datetime, timedelta | |
| from dotenv import load_dotenv | |
| from io import StringIO | |
| # Настройки | |
| load_dotenv('../.env') | |
| COUNTER_ID = os.getenv('COUNTER_ID') | |
| API_TOKEN = os.getenv('API_TOKEN') | |
| DATE_1 = '2026-03-01' | |
| DATE_2 = '2026-03-05' | |
| LIMIT = 20000 # Лимит строк в одном запросе | |
| API_URL = 'https://api-metrika.yandex.net/stat/v1/data' | |
| ACRCY = 'full' # Точность данных | |
| output_dir = 'data' | |
| os.makedirs(output_dir, exist_ok=True) | |
| headers = {'Authorization': f'OAuth {API_TOKEN}'} | |
| # Проверяет метаданные на наличие семплирования и возвращает список предупреждений | |
| def check_sampling_metadata(metadata: dict, date_str: str) -> list: | |
| warnings = [] | |
| if metadata.get('sampled'): | |
| share = metadata.get('sample_share', 1.0) | |
| warnings.append( | |
| f"⚠️ [{date_str}] Данные семплированы: использовано {share*100:.1f}% исходных данных" | |
| ) | |
| if share < 1.0 and ACRCY == 'full': | |
| warnings.append( | |
| f" 🔴 Критично: запрошено accuracy='full', но выборка = {share*100:.1f}%" | |
| ) | |
| if metadata.get('total_rows_rounded'): | |
| warnings.append(f"⚠️ [{date_str}] Общее количество строк округлено — точность снижена") | |
| if metadata.get('passing') in ('cut_by_limit', 'cut_by_memory'): | |
| warnings.append(f"⚠️ [{date_str}] Данные обрезаны сервером: причина = {metadata.get('passing')}") | |
| return warnings | |
| # Загружает данные за один день в формате JSON с поддержкой пагинации | |
| # Возвращает tuple: (DataFrame с данными, dict с метаданными) | |
| def fetch_day_data_json(date_str: str) -> tuple[pd.DataFrame, dict]: | |
| all_rows = [] | |
| offset = 1 | |
| request_count = 0 | |
| # Параметры запроса | |
| base_params = { | |
| 'ids': COUNTER_ID, | |
| 'date1': date_str, | |
| 'date2': date_str, | |
| 'metrics': 'ym:s:visits', | |
| 'dimensions': 'ym:s:externalRefererDomain,ym:s:date', | |
| 'limit': LIMIT, | |
| 'accuracy': ACRCY, | |
| 'include_undefined': True, | |
| 'format': 'json' | |
| } | |
| while True: | |
| params = {**base_params, 'offset': offset} | |
| request_count += 1 | |
| try: | |
| response = requests.get(API_URL, params=params, headers=headers, timeout=30) | |
| if response.status_code != 200: | |
| print(f"❌ HTTP {response.status_code}: {response.text[:200]}") | |
| break | |
| payload = response.json() | |
| # Извлекаем метаданные о семплировании (только из первого запроса) | |
| if request_count == 1: | |
| metadata = { | |
| 'sampled': payload.get('sampled', False), | |
| 'sample_share': payload.get('sample_share', 1.0), | |
| 'sample_size': payload.get('sample_size'), | |
| 'sample_space': payload.get('sample_space'), | |
| 'total_rows_rounded': payload.get('total_rows_rounded', False), | |
| 'passing': payload.get('passing'), # 'ok', 'cut_by_limit', etc. | |
| 'total_rows': payload.get('total_rows'), | |
| 'request_date': datetime.now().isoformat() | |
| } | |
| # Было семплирование? | |
| print(f"📊 [{date_str}] sampled={metadata['sampled']} | share={metadata['sample_share']:.2%} | total_rows={metadata['total_rows']}") | |
| # Логируем предупреждения о семплировании | |
| for warning in check_sampling_metadata(metadata, date_str): | |
| print(warning) | |
| # Извлекаем строки данных | |
| data = payload.get('data', []) | |
| if not data: | |
| break # Нет больше данных — выходим из цикла пагинации | |
| all_rows.extend(data) | |
| print(f"Загружено строк: {len(all_rows):,} (offset={offset})") | |
| # Проверяем, есть ли ещё данные для загрузки | |
| if len(data) < LIMIT: | |
| break # Получили последнюю порцию данных | |
| offset += LIMIT | |
| time.sleep(0.3) # Небольшая пауза, чтобы не превысить rate limits | |
| except requests.exceptions.RequestException as e: | |
| print(f"❌ Ошибка сети для {date_str}, offset={offset}: {e}") | |
| break | |
| except json.JSONDecodeError as e: | |
| print(f"❌ Ошибка парсинга JSON для {date_str}: {e}") | |
| print(f" Ответ сервера: {response.text[:300]}") | |
| break | |
| # Формируем DataFrame | |
| df = pd.DataFrame(all_rows) if all_rows else pd.DataFrame() | |
| # Если метаданные не были получены (например, при ошибке), возвращаем заглушку | |
| metadata = metadata if 'metadata' in locals() else {'error': 'No metadata received'} | |
| return df, metadata | |
| # Основной цикл выгрузки | |
| print("🚀 Начинаем выгрузку данных в формате JSON...") | |
| # Создаём диапазон дат | |
| start_date = datetime.strptime(DATE_1, '%Y-%m-%d') | |
| end_date = datetime.strptime(DATE_2, '%Y-%m-%d') | |
| date_range = [start_date + timedelta(days=x) for x in range((end_date - start_date).days + 1)] | |
| print(f"📅 Период: {DATE_1} → {DATE_2} (всего {len(date_range)} дней)") | |
| # Словарь для сбора общей статистики по семплированию | |
| sampling_summary = {} | |
| for current_date in date_range: | |
| date_str = current_date.strftime('%Y-%m-%d') | |
| print(f"\n📥 Загружаем данные за {date_str}...") | |
| df_day, metadata = fetch_day_data_json(date_str) | |
| # Сохраняем информацию о семплировании для сводного отчёта | |
| sampling_summary[date_str] = { | |
| 'sampled': metadata.get('sampled'), | |
| 'sample_share': metadata.get('sample_share'), | |
| 'total_rows': metadata.get('total_rows'), | |
| 'rows_loaded': len(df_day) | |
| } | |
| # Сохраняем данные дня | |
| if not df_day.empty: | |
| filename = f"{date_str}_yametrika.json" | |
| filepath = os.path.join(output_dir, filename) | |
| # Сохраняем данные в JSON (чтобы не терять структуру) | |
| df_day.to_json(filepath, orient='records', force_ascii=False, indent=2) | |
| print(f"Данные сохранены: {filepath} ({len(df_day):,} строк)") | |
| # Опционально: сохранить также в CSV для удобства анализа | |
| csv_path = filepath.replace('.json', '.csv') | |
| df_day.to_csv(csv_path, index=False, encoding='utf-8-sig') | |
| print(f"CSV-копия: {csv_path}") | |
| else: | |
| print(f"⚠️ Нет данных за {date_str} или ошибка загрузки") | |
| # Сохраняем метаданные дня | |
| meta_filename = f"{date_str}_metadata.json" | |
| meta_filepath = os.path.join(output_dir, meta_filename) | |
| with open(meta_filepath, 'w', encoding='utf-8') as f: | |
| json.dump(metadata, f, indent=2, ensure_ascii=False) | |
| print(f"Метаданные: {meta_filepath}") | |
| # Небольшая пауза между днями | |
| time.sleep(0.5) | |
| # 📊 Финальный отчёт | |
| print("📊 Отчёт") | |
| print("-"*60) | |
| sampled_days = [d for d, m in sampling_summary.items() if m.get('sampled')] | |
| if sampled_days: | |
| print(f"⚠️ Семплирование применено в {len(sampled_days)} из {len(date_range)} дней:") | |
| for date in sampled_days: | |
| info = sampling_summary[date] | |
| print(f" • {date}: выборка {info['sample_share']*100:.1f}%, " | |
| f"загружено {info['rows_loaded']:,} из ~{info['total_rows']:,}") | |
| else: | |
| print("✅ Все данные загружены без семплирования (100% точность)") | |
| # Сохраняем сводку по семплированию | |
| summary_path = os.path.join(output_dir, 'sampling_summary.json') | |
| with open(summary_path, 'w', encoding='utf-8') as f: | |
| json.dump(sampling_summary, f, indent=2, ensure_ascii=False) | |
| print(f"\n💾 Сводка сохранена: {summary_path}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment