Created
January 24, 2019 20:06
-
-
Save guzmonne/68e6eddaa76ef27a83bb31953e0c0385 to your computer and use it in GitHub Desktop.
Download multiple files from S3 in multiple threads
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import math | |
from datetime import datetime, timedelta | |
from multiprocessing import Process, Queue, Lock, Manager | |
import boto3 | |
from tqdm import tqdm | |
from utils import chunks, Logger, clear, tprint | |
from observations_1 import get_all_objects, object_worker, HOURS, MINUTES | |
PRODUCERS = 2 | |
CONSUMERS = 40 | |
logger = Logger() | |
def producer(queue, lock, chunk, position): | |
""" Gets files lists from S3 """ | |
with lock: | |
tprint(f'P{position} ({os.getpid()}): starting') | |
for prefix in tqdm(chunk, position=position, leave=False): | |
queue.put(get_all_objects( | |
prefix=prefix, | |
recursive=True | |
)) | |
with lock: | |
tprint( | |
f'P{position} ({os.getpid()}): {queue.qsize()} chunks remain') | |
with lock: | |
tprint(f'P{position} ({os.getpid()}): exiting') | |
def consumer(queue, cache, lock, position): | |
""" Gets files and stores them on the DB """ | |
with lock: | |
tprint(f'C{position} ({os.getpid()}): starting') | |
# Run indefinetely | |
while True: | |
chunk = queue.get() | |
with lock: | |
tprint( | |
f'C{position} ({os.getpid()}): {queue.qsize()} chunks remain') | |
object_worker(chunk, position, cache) | |
if __name__ == '__main__': | |
clear() | |
client = boto3.client('s3') | |
s3 = boto3.resource('s3') | |
s3_prefixes = [] | |
current_date = datetime.strptime( | |
'2018-12-28 00:00:00', '%Y-%m-%d %H:%M:%S') | |
end_date = datetime.strptime('2018-12-29 00:00:00', '%Y-%m-%d %H:%M:%S') | |
while end_date > current_date: | |
current_date = current_date + timedelta(days=1) | |
for hour in HOURS: | |
s3_prefixes.append( | |
f"meraki/raw/{current_date:dt=%Y-%m-%dT}{hour}/tenant=tata/") | |
s3_prefixes_chunks = list( | |
chunks(s3_prefixes, math.ceil(len(s3_prefixes) / PRODUCERS))) | |
manager = Manager() | |
cache = manager.dict() | |
queue = Queue() | |
lock = Lock() | |
producers = [] | |
consumers = [] | |
# Create producer processes | |
for producer_index, chunk in enumerate(s3_prefixes_chunks): | |
producers.append(Process(target=producer, | |
args=(queue, lock, chunk, producer_index))) | |
# Create consumer processes | |
for consumer_index in range(CONSUMERS): | |
p = Process(target=consumer, args=( | |
queue, cache, lock, consumer_index + PRODUCERS + 1)) | |
# This is critical! The consumer function has an infinite loop | |
# Which means it will never exit unless we set daemon to true | |
p.daemon = True | |
consumers.append(p) | |
# Start the producers and consumer | |
# The Python VM will launch new independent processes for each Process object | |
for p in producers: | |
p.start() | |
for c in consumers: | |
c.start() | |
# Like threading, we have a join() method that synchronizes our program | |
for p in producers: | |
p.join() | |
tprint('Parent process exiting...') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment