Last active
March 4, 2023 14:33
-
-
Save javrasya/34f3567aff0f54ba295cd812e9348bb9 to your computer and use it in GitHub Desktop.
This is downloading Wikipedia page views data set and uploading it to S3 as it downloads concurrently. Concurrency can be configured as well with semaphores.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import zlib | |
from typing import List, Tuple | |
from aiobotocore.session import AioSession | |
from aiohttp_retry import ExponentialRetry, RetryClient | |
from tqdm import tqdm | |
# ##### PARAMETERIZED PART ####### | |
YEAR = 2015 | |
MONTHS = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12} | |
S3_BUCKET = "<my-s3-bucket>" | |
S3_FOLDER = "benchmark/wikipedia_pageviews/" | |
AWS_PROFILE = "<my-aws>" | |
SCAN_FILE_NAMES_CONCURRENCY = 3 | |
PROCESSOR_CONCURRENCY = 5 | |
################################### | |
TEN_MB = 10485760 | |
async def upload_chunk(client, chunk, upload_id, chunk_number, key): | |
part_number = chunk_number + 1 | |
resp = await client.upload_part( | |
Bucket=S3_BUCKET, | |
Body=chunk, | |
UploadId=upload_id, | |
PartNumber=part_number, | |
Key=key | |
) | |
return { | |
'PartNumber': part_number, | |
'ETag': resp['ETag'] | |
} | |
async def load_existing_files(s3_client, key): | |
try: | |
response = await s3_client.list_objects_v2(Bucket=S3_BUCKET, Prefix=S3_FOLDER) | |
for obj in response['Contents']: | |
if key == obj['Key']: | |
return True | |
return False # no keys match | |
except KeyError: | |
return False # no keys found | |
except Exception as e: | |
# Handle or log other exceptions such as bucket doesn't exist | |
return e | |
class FileProcessor(object): | |
def __init__(self, concurrency: int) -> None: | |
self.semaphore = asyncio.Semaphore(concurrency) | |
self.download_semaphore = asyncio.Semaphore(concurrency * 3) | |
self.concurrency = concurrency | |
self.positions = {i + 1: True for i in range(concurrency)} | |
self.existing_files = set() | |
async def load_existing_files(self, s3_client): | |
response = await s3_client.list_objects_v2(Bucket=S3_BUCKET, Prefix=S3_FOLDER) | |
for obj in response['Contents']: | |
self.existing_files.add(obj['Key']) | |
def _get_available_position(self): | |
position = next(iter([position for position, available in self.positions.items() if available]), None) | |
self.positions[position] = False | |
return position | |
def _release_position(self, position: int): | |
self.positions[position] = True | |
async def download_and_upload_file(self, session: RetryClient, s3_client, month: int, file: str) -> Tuple[int, str, bool, str]: | |
key = f"{S3_FOLDER}{YEAR}/{month:02}/{file.split('.')[0]}" | |
if key not in self.existing_files: | |
async with self.download_semaphore: | |
position = self._get_available_position() | |
create_multipart_upload_resp = await s3_client.create_multipart_upload( | |
Bucket=S3_BUCKET, | |
Key=key, | |
) | |
upload_id = create_multipart_upload_resp['UploadId'] | |
part_info = { | |
'Parts': [] | |
} | |
try: | |
async with session.get(f"https://dumps.wikimedia.org/other/pagecounts-raw/{YEAR}/{YEAR}-{month:02}/{file}") as response: | |
if response.status != 200: | |
return month, file, False, f"Failed to download: Status Code: {response.status}" | |
else: | |
async with self.semaphore: | |
size = int(response.headers.get('content-length', 0)) or None | |
progressbar = tqdm( | |
desc=f"Download, extract and then upload {file} to S3", total=size, position=position, leave=False, | |
) | |
remaining = size | |
index = 0 | |
accumulator: bytes = bytes() | |
d = zlib.decompressobj(16 + zlib.MAX_WBITS) | |
async for chunk in response.content.iter_chunked(3000): | |
remaining -= len(chunk) | |
accumulator += chunk | |
if len(accumulator) > TEN_MB and (remaining == 0 or remaining > TEN_MB): | |
decompressed = d.decompress(accumulator) | |
info = await upload_chunk(s3_client, decompressed, upload_id, index, key) | |
part_info['Parts'].append(info) | |
accumulator = bytes() | |
index += 1 | |
progressbar.update(len(chunk)) | |
d.flush() | |
if index == 0: | |
return month, file, False, f"Nothing to download. Content: {await response.text()}" | |
else: | |
list_parts_resp = await s3_client.list_parts( | |
Bucket=S3_BUCKET, | |
Key=key, | |
UploadId=upload_id | |
) | |
if len(list_parts_resp['Parts']) == index: | |
await s3_client.complete_multipart_upload( | |
Bucket=S3_BUCKET, | |
Key=key, | |
UploadId=upload_id, | |
MultipartUpload=part_info | |
) | |
return month, file, True, "" | |
else: | |
await s3_client.abort_multipart_upload( | |
Bucket=S3_BUCKET, | |
Key=key, | |
UploadId=upload_id | |
) | |
return month, file, False, f"Failed to upload S3" | |
finally: | |
self._release_position(position) | |
async def find_files(semaphore: asyncio.Semaphore, session: RetryClient, month: int) -> List[Tuple[int, str]]: | |
async with semaphore: | |
async with session.get(f"https://dumps.wikimedia.org/other/pagecounts-raw/{YEAR}/{YEAR}-{month:02}/md5sums.txt") as response: | |
payload = await response.text("utf-8") | |
lines = payload.split("\n") | |
parts = [line.split(" ") for line in lines] | |
files = [(month, p[1]) for p in parts if len(p) == 2] | |
return files | |
async def main(): | |
scan_hash_limit = asyncio.Semaphore(SCAN_FILE_NAMES_CONCURRENCY) | |
retry_options = ExponentialRetry(attempts=10, start_timeout=2) | |
async with RetryClient(raise_for_status=False, retry_options=retry_options) as session: | |
results = await asyncio.gather(*[find_files(scan_hash_limit, session, month) for i, month in enumerate(MONTHS)]) | |
results_flat = [(month, file) for files in results for month, file in files] | |
boto_session = AioSession(profile=AWS_PROFILE) | |
async with boto_session.create_client('s3', region_name='eu-west-1') as s3_client: | |
processor = FileProcessor(concurrency=PROCESSOR_CONCURRENCY) | |
await processor.load_existing_files(s3_client) | |
results = await asyncio.gather(*[processor.download_and_upload_file(session, s3_client, month, file) for i, (month, file) in enumerate(results_flat)]) | |
unprocessed_files = [f"{month} {file} {error_message}" for month, file, successful, error_message in results if not successful] | |
print(f"Here is list of unprocessed files ({len(unprocessed_files)}/{len(results)})") | |
print("\n".join(unprocessed_files)) | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment