Created
December 16, 2022 12:41
-
-
Save a-v-ershov/7151d8928b9694e6d52f61945b67ca0d to your computer and use it in GitHub Desktop.
Example of using Scraper API with aiohttp
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import requests | |
import time | |
import logging | |
import aiohttp | |
import asyncio | |
class ScraperAPI: | |
def __init__( | |
self, | |
api_key: str | None = None, | |
waiting_time_coef: int = 2, | |
exp_backoff_coef: float = 1.0, | |
num_of_concurrent_requests: int = 5 | |
): | |
self._api_key = api_key or os.environ['SCRAPER_API_KEY'] | |
self._waiting_time = waiting_time_coef | |
self._exp_backoff = exp_backoff_coef | |
self._num_of_conc_req = num_of_concurrent_requests | |
self._logger = logging.getLogger(__name__) | |
async def scrape_urls(self, urls: list[str]) -> dict[str, list[dict]]: | |
status_urls = self._get_status_urls(urls) | |
res = { | |
'success': [], | |
'failed': [] | |
} | |
responses = await self._get_data_from_status_urls(status_urls) | |
for response in responses.values(): | |
if response['status'] == 'finished': | |
res['success'].append((response['url'], response['response'])) | |
else: | |
res['failed'].append((response['url'], response['response'])) | |
if len(res['failed']) > 0: | |
self._logger.warning(f'Failed to scrape {len(res["failed"])} urls. URLs:\n{[(i[0], i[1]["status"]) for i in res["failed"]]}') | |
return res | |
async def _get_data_from_status_urls( | |
self, | |
status_urls: list[str], | |
res: dict = {}, | |
current_backoff: float = 1.0 | |
) -> dict[str, dict]: | |
waiting_time = max( | |
self._waiting_time * current_backoff, | |
self._waiting_time * current_backoff * len(status_urls) / self._num_of_conc_req | |
) | |
self._logger.info(f'Waiting for {waiting_time} sec') | |
time.sleep(waiting_time) | |
responses = await self._get_responses_from_status_urls(status_urls) | |
running_jobs = [i for i in responses if i['status'] == 'running'] | |
completed_jobs = [i for i in responses if i['status'] != 'running'] | |
res.update({i['url']: i for i in completed_jobs}) | |
if len(running_jobs) > 0: | |
self._logger.info(f'{len(running_jobs)} jobs are still running') | |
return await self._get_data_from_status_urls( | |
[i['statusUrl'] for i in running_jobs], | |
res, | |
current_backoff * self._exp_backoff | |
) | |
else: | |
self._logger.info(f'All {len(res.keys())} jobs are completed') | |
return res | |
async def _get_responses_from_status_urls(self, urls: list[str]) -> list[dict]: | |
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit_per_host=self._num_of_conc_req)) as session: | |
return await asyncio.gather(*[self._get_response_from_status_url(url, session) for url in urls]) | |
async def _get_response_from_status_url(self, url: str, session: aiohttp.ClientSession) -> dict: | |
async with session.get(url) as response: | |
return await response.json() | |
def _get_status_urls(self, urls: list[str]) -> list[str]: | |
scraping_jobs = requests.post( | |
url = 'https://async.scraperapi.com/batchjobs', | |
json = { 'apiKey': self._api_key, 'urls': urls} | |
) | |
return [i['statusUrl'] for i in scraping_jobs.json()] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment