Last active
January 26, 2023 19:59
-
-
Save petitchamp/5222e252df5367ed7724f848175a9e22 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from requests import post | |
from queue import Queue | |
from logger import logger | |
from datetime import datetime | |
NoCaptchaTaskProxyless = "NoCaptchaTaskProxyless" | |
SERVER = 'https://api.yescaptcha.com' | |
def handle_response_error(response, exit_program: bool): | |
if response.get('errorId') == 1: | |
logger.error(f'YesCaptcha service error: {response.get("errorCode")}, {response.get("errorDescription")}') | |
if exit_program: | |
exit(1) | |
return False | |
return True | |
''' | |
This code create multiple tasks (default number is 3) on the yes captcha server to solve google recaptcha, so that code waits less time for a recaptcha result | |
Example: | |
recaptcha = QueuedRecaptchaV2(client_key, website_key, website_url, NoCaptchaTaskProxyless) | |
recaptcha.set_queue_size(5) | |
gresp = recaptcha.get_recaptcha_response() | |
''' | |
class QueuedRecaptchaV2: | |
create_task_url = f"{SERVER}/createTask" | |
get_result_url = f'{SERVER}/getTaskResult' | |
get_balance_url = f'{SERVER}/getBalance' | |
def __init__(self, client_key, website_key, website_url, task_type): | |
self.client_key = client_key | |
self.website_key = website_key | |
self.website_url = website_url | |
self.task_type = task_type | |
self.queue_size = 3 | |
self.recaptcha_key = Queue(self.queue_size) | |
self.task_id_list = [] | |
# Check account balance | |
try: | |
result = post( | |
QueuedRecaptchaV2.get_balance_url, | |
json={"clientKey": self.client_key}, | |
verify=True | |
).json() | |
# Check error | |
handle_response_error(result, True) | |
balance = result.get("balance") | |
if balance < 3000: | |
logger.critical(f'Not enough balance left: {balance} points') | |
exit(1) | |
except Exception as e: | |
logger.error(e) | |
def set_queue_size(self, size: int): | |
self.queue_size = size | |
self.recaptcha_key = Queue(self.queue_size) | |
def _create_task(self) -> str: | |
""" | |
第一步:创建任务并获取taskId | |
:param | |
:return taskId : string 创建成功的任务ID | |
""" | |
data = { | |
"clientKey": self.client_key, | |
"task": { | |
"websiteURL": self.website_url, | |
"websiteKey": self.website_key, | |
"type": self.task_type | |
} | |
} | |
try: | |
result = post(QueuedRecaptchaV2.create_task_url, json=data, verify=True).json() | |
# Check error | |
if handle_response_error(result, False): | |
taskId = result.get("taskId") | |
if taskId: | |
return taskId | |
else: | |
return '' | |
except Exception as e: | |
logger.error(e) | |
def _get_response(self, task_id: str) -> str: | |
""" | |
第二步:使用taskId获取response | |
:param task_id: string | |
:return response: string 识别结果 | |
""" | |
# 循环请求识别结果,3秒请求一次 | |
try: | |
data = { | |
"clientKey": self.client_key, | |
"taskId": task_id | |
} | |
result = post(QueuedRecaptchaV2.get_result_url, json=data, verify=True).json() | |
gresponse = '' | |
if handle_response_error(result, False): | |
solution = result.get('solution', {}) | |
if solution: | |
gresponse = solution.get('gRecaptchaResponse') | |
return gresponse | |
except Exception as e: | |
logger.error(e) | |
def get_recaptcha_response(self): | |
if self.recaptcha_key.empty(): | |
# no recaptcha key in the queue | |
if len(self.task_id_list) == 0: | |
# no task submitted, create some task | |
for ii in range(self.queue_size): | |
task_id = self._create_task() | |
if task_id: | |
self.task_id_list.append(task_id) | |
# task submitted, get result | |
times = 0 | |
while times < 120 and self.task_id_list: | |
for task_id in self.task_id_list: | |
recaptcha_response = self._get_response(task_id) | |
if recaptcha_response: | |
self.recaptcha_key.put((recaptcha_response, datetime.now().timestamp())) | |
self.task_id_list.remove(task_id) | |
times += 3 | |
time.sleep(3) | |
return self.recaptcha_key.get_nowait()[0] | |
else: | |
recaptcha_response, timestamp = self.recaptcha_key.get_nowait() | |
if self.recaptcha_key.qsize() == 1: | |
for ii in range(self.queue_size - 1): | |
task_id = self._create_task() | |
if task_id: | |
self.task_id_list.append(task_id) | |
return recaptcha_response, timestamp |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment