Skip to content

Instantly share code, notes, and snippets.

@zilista
Created January 13, 2020 13:29
Show Gist options
  • Save zilista/88c5597202515edc89c3f72c84995c17 to your computer and use it in GitHub Desktop.
Save zilista/88c5597202515edc89c3f72c84995c17 to your computer and use it in GitHub Desktop.
import random
from requests_html import HTMLSession
from queue import Queue
from threading import Lock
from concurrent.futures import ThreadPoolExecutor
LIST_KEY_AVAILABILITY = ['xxxxxxxxxxxxxxxxxxxxxx', 'xxxxxxxxxxxxxxxxxxxxxx']
lock = Lock()
def domain_availability(domain, list_key):
for _ in range(len(list_key)):
key = random.choice(list_key)
url_api = f'https://domain-availability-api.whoisxmlapi.com/api/v1?apiKey={key}&domainName={domain}'
with HTMLSession() as session:
try:
response = session.get(url_api, timeout=10)
if response.status_code == 200:
break
else:
response = None
# print(f'Ошибка. Проверь токен {key}')
except Exception as e:
response = None
return response
def get_info_domain_worker(qu):
while True:
domain = qu.get()
try:
response_domain = domain_availability(domain, LIST_KEY_AVAILABILITY ).json()
# print( domain, response_domain['DomainInfo']['domainAvailability'], response_domain)
if response_domain['DomainInfo']:
domain_resp = response_domain['DomainInfo']['domainName']
domain_availabile = response_domain['DomainInfo']['domainAvailability']
print(domain_resp, domain_availabile)
if domain_availabile == 'AVAILABLE':
with lock:
with open('domain_available.csv', 'a') as file:
file.write(f'{domain_resp}\t{domain_availabile}\n')
except Exception as e:
print(domain, 'Ответ не получен. Сервис вернул "None", проверь токены и повтори проверку')
with lock:
with open('domain_recheck.csv', 'a') as file:
file.write(f'{domain}\n')
if qu.empty():
break
def main():
domain_qu = Queue()
worker_count = 5
with open('domain_not_response.csv', 'r') as file:
for line in file:
domain = line.strip()
domain_qu.put(domain)
with ThreadPoolExecutor(max_workers = worker_count) as executor:
for _ in range(worker_count):
executor.submit(get_info_domain_worker, domain_qu)
if __name__=='__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment