Skip to content

Instantly share code, notes, and snippets.

@guzmonne
Created January 24, 2019 20:06
Show Gist options
  • Save guzmonne/68e6eddaa76ef27a83bb31953e0c0385 to your computer and use it in GitHub Desktop.
Save guzmonne/68e6eddaa76ef27a83bb31953e0c0385 to your computer and use it in GitHub Desktop.
Download multiple files from S3 in multiple threads
import os
import math
from datetime import datetime, timedelta
from multiprocessing import Process, Queue, Lock, Manager
import boto3
from tqdm import tqdm
from utils import chunks, Logger, clear, tprint
from observations_1 import get_all_objects, object_worker, HOURS, MINUTES
PRODUCERS = 2
CONSUMERS = 40
logger = Logger()
def producer(queue, lock, chunk, position):
""" Gets files lists from S3 """
with lock:
tprint(f'P{position} ({os.getpid()}): starting')
for prefix in tqdm(chunk, position=position, leave=False):
queue.put(get_all_objects(
prefix=prefix,
recursive=True
))
with lock:
tprint(
f'P{position} ({os.getpid()}): {queue.qsize()} chunks remain')
with lock:
tprint(f'P{position} ({os.getpid()}): exiting')
def consumer(queue, cache, lock, position):
""" Gets files and stores them on the DB """
with lock:
tprint(f'C{position} ({os.getpid()}): starting')
# Run indefinetely
while True:
chunk = queue.get()
with lock:
tprint(
f'C{position} ({os.getpid()}): {queue.qsize()} chunks remain')
object_worker(chunk, position, cache)
if __name__ == '__main__':
clear()
client = boto3.client('s3')
s3 = boto3.resource('s3')
s3_prefixes = []
current_date = datetime.strptime(
'2018-12-28 00:00:00', '%Y-%m-%d %H:%M:%S')
end_date = datetime.strptime('2018-12-29 00:00:00', '%Y-%m-%d %H:%M:%S')
while end_date > current_date:
current_date = current_date + timedelta(days=1)
for hour in HOURS:
s3_prefixes.append(
f"meraki/raw/{current_date:dt=%Y-%m-%dT}{hour}/tenant=tata/")
s3_prefixes_chunks = list(
chunks(s3_prefixes, math.ceil(len(s3_prefixes) / PRODUCERS)))
manager = Manager()
cache = manager.dict()
queue = Queue()
lock = Lock()
producers = []
consumers = []
# Create producer processes
for producer_index, chunk in enumerate(s3_prefixes_chunks):
producers.append(Process(target=producer,
args=(queue, lock, chunk, producer_index)))
# Create consumer processes
for consumer_index in range(CONSUMERS):
p = Process(target=consumer, args=(
queue, cache, lock, consumer_index + PRODUCERS + 1))
# This is critical! The consumer function has an infinite loop
# Which means it will never exit unless we set daemon to true
p.daemon = True
consumers.append(p)
# Start the producers and consumer
# The Python VM will launch new independent processes for each Process object
for p in producers:
p.start()
for c in consumers:
c.start()
# Like threading, we have a join() method that synchronizes our program
for p in producers:
p.join()
tprint('Parent process exiting...')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment