Skip to content

Instantly share code, notes, and snippets.

@zzuummaa
Last active August 17, 2020 08:50
Show Gist options
  • Save zzuummaa/976afa45938f4b28028422e0fe2ff4c9 to your computer and use it in GitHub Desktop.
Save zzuummaa/976afa45938f4b28028422e0fe2ff4c9 to your computer and use it in GitHub Desktop.
import asyncio
import csv
import time
import aiohttp
URL = 'https://www.youtube.com/'
MAX_CLIENTS = 200
connector = aiohttp.TCPConnector(limit=60)
with open('open80.csv', newline='\n') as csvfile:
reader = csv.reader(csvfile)
address_count = sum(1 for row in reader)
async def fetch_async(address):
try:
async with aiohttp.request('GET', URL, timeout=aiohttp.ClientTimeout(total=5), connector=connector) as response:
await response.read()
return response.status
except asyncio.TimeoutError:
return 0
async def get_pages():
status_gist = {}
coroutine_list = []
tmp_coroutine_list = []
completed_list = asyncio.as_completed(tmp_coroutine_list)
completed_list_iter = iter(completed_list)
last_time = time.time()
requests_count = 0
with open('open80.csv', newline='\n') as csvfile:
reader = csv.reader(csvfile)
for row in reader:
future = next(completed_list_iter, None)
if future is not None:
status = await future
status_gist[status] = status_gist.get(status, 0) + 1
requests_count = requests_count + 1
cur_time = time.time()
if cur_time - last_time > 1:
last_time = cur_time
print("%d/%d, %d%%, %s" % (requests_count, address_count, 100 * requests_count / address_count, status_gist))
if len(coroutine_list) < MAX_CLIENTS:
coroutine_list.append(fetch_async(row))
continue
else:
tmp_coroutine_list = coroutine_list
coroutine_list = []
completed_list = asyncio.as_completed(tmp_coroutine_list)
completed_list_iter = iter(completed_list)
# tmp_coroutine_list = coroutine_list
# coroutine_list = []
# for f in asyncio.as_completed(tmp_coroutine_list):
# status = await f
# print(status)
# status_gist[status] = status_gist.get(status, 0) + 1
# coroutine_list.append(fetch_async())
print(status_gist)
# ioloop = asyncio.ProactorEventLoop()
# asyncio.set_event_loop(ioloop)
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(get_pages())
print("done")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment