Last active
July 15, 2020 17:36
-
-
Save hashbrowncipher/69f4ce9f8bfa6d5bb88d9c2562e7529d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from aiohttp.client_exceptions import ClientError | |
import asyncio | |
from asyncio import Queue | |
from aiohttp.resolver import AsyncResolver | |
import json | |
import os | |
import random | |
from blake3 import blake3 | |
from hashlib import md5 | |
from socket import socketpair | |
from socket import AF_UNIX | |
from socket import SOCK_DGRAM | |
from socket import SHUT_WR | |
import sys | |
from multiprocessing import Process | |
from time import perf_counter | |
import aiohttp | |
from boto3 import Session | |
from botocore.awsrequest import AWSRequest | |
from botocore.auth import SigV4Auth | |
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes | |
from cryptography.hazmat.backends import default_backend | |
backend = default_backend() | |
aes = algorithms.AES(os.urandom(32)) | |
MB = 1024 * 1024 | |
MB_ZEROES = b'\x00' * MB | |
CHUNK_SIZE = 128 * MB | |
CHUNK_ZEROES = b'\x00' * CHUNK_SIZE | |
JOB_COUNT = 4096 | |
CONCURRENCY = 8 | |
BUCKET = sys.argv[1] | |
class RandomPayload(aiohttp.Payload): | |
def __init__(self, limit): | |
self._iv = os.urandom(16) | |
self._limit = limit | |
super().__init__(self._iv) | |
async def write(self, writer): | |
cipher = Cipher(aes, modes.CTR(self._iv), backend=backend) | |
encryptor = cipher.encryptor() | |
limit = self._limit | |
buf = bytearray(MB + 16) | |
hasher = blake3() | |
while limit: | |
encryptor.update_into(MB_ZEROES, buf) | |
view = memoryview(buf)[:MB] | |
hasher.update(view) | |
await writer.write(view) | |
limit -= MB | |
hasher.digest() | |
credentials = Session().get_credentials() | |
def make_request(to): | |
headers = { | |
"Host": f"{BUCKET}.s3.amazonaws.com", | |
"Content-Length": str(CHUNK_SIZE), | |
"X-Amz-Content-SHA256": "UNSIGNED-PAYLOAD", | |
} | |
request = AWSRequest( | |
method="PUT", | |
url=f"https://{BUCKET}.s3.amazonaws.com/{to}", | |
headers=headers | |
) | |
SigV4Auth(credentials, "s3", "us-east-1").add_auth(request) | |
prepared = request.prepare() | |
return dict(method=prepared.method, headers=dict(prepared.headers), url=prepared.url) | |
async def do_request(session, job, counts): | |
url = job["url"] | |
try: | |
resp = await session.request( | |
job["method"], | |
url, | |
headers=job["headers"], | |
data=RandomPayload(CHUNK_SIZE), | |
) | |
async with resp: | |
if resp.status == 200: | |
pass | |
else: | |
print(resp) | |
return resp.status == 200 | |
return True | |
except ClientError as ex: | |
counts.setdefault(type(ex), 0) | |
counts[type(ex)] += 1 | |
print(f"{type(ex)} {url}") | |
except asyncio.TimeoutError as ex: | |
counts["timeout"] += 1 | |
#print(f"{type(ex)} {url}") | |
return False | |
class RandomAsyncResolver(AsyncResolver): | |
async def resolve(self, *args, **kwargs): | |
ret = await super().resolve(*args, **kwargs) | |
random.shuffle(ret) | |
return ret | |
async def async_worker(q, loop, count, identifier): | |
timeout = aiohttp.ClientTimeout(sock_read=1, total=3) | |
conn = aiohttp.TCPConnector( | |
resolver=RandomAsyncResolver(), | |
loop=loop, | |
use_dns_cache=False | |
) | |
async with aiohttp.ClientSession( | |
connector=conn, | |
loop=loop, | |
timeout=timeout | |
) as session: | |
while True: | |
job = await q.get() | |
count["got"] += 1 | |
while not await do_request(session, job, count): | |
pass | |
count["done"] += 1 | |
count["active"] = count["got"] - count["done"] | |
q.task_done() | |
async def dispatcher(q, loop, sock, counts): | |
while True: | |
payload = await loop.sock_recv(sock, 16384) | |
if not payload: | |
break | |
job = json.loads(payload) | |
await q.put(job) | |
print("Finished dispatch") | |
await q.join() | |
pid = os.getpid() | |
print((pid, counts)) | |
print(open(f"/proc/{pid}/schedstat").read().split()[1]) | |
async def async_parent(loop, sock, identifier): | |
counts = dict(got=0, done=0, timeout=0, payload=0, os=0) | |
q = Queue(1, loop=loop) | |
workers = [ | |
async_worker(q, loop, counts, identifier) | |
for i in range(6) | |
] | |
done, pending = await asyncio.wait([ | |
dispatcher(q, loop, sock, counts), *workers | |
], return_when=asyncio.FIRST_COMPLETED) | |
print(done) | |
def worker(sock, identifier): | |
loop = asyncio.get_event_loop() | |
sock.setblocking(0) | |
loop.run_until_complete(async_parent(loop, sock, identifier)) | |
def main(): | |
s1, s2 = socketpair(AF_UNIX, SOCK_DGRAM) | |
processes = [Process(target=worker, args=(s1, i)) for i in range(CONCURRENCY)] | |
for p in processes: | |
p.start() | |
# 1024 files -> 128 GiB | |
for i in range(JOB_COUNT): | |
payload = json.dumps(make_request(f"{sys.argv[2]}/{i:04}")).encode("utf8") | |
s2.send(payload) | |
for i in range(CONCURRENCY): | |
s2.send(b"") | |
s2.close() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment