Skip to content

Instantly share code, notes, and snippets.

@kenoir
Created September 20, 2019 10:42
Show Gist options
  • Save kenoir/1cd4642027e1575abfa6f5f507e4a02b to your computer and use it in GitHub Desktop.
Save kenoir/1cd4642027e1575abfa6f5f507e4a02b to your computer and use it in GitHub Desktop.
# chunk([1,2,3,4,5,6,7,8,9], 3)
# [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
def chunk(l, n):
return [l[i:i + n] for i in range(0, len(l), n)]
def get_queue(client, queue_name):
response = client.get_queue_url(
QueueName=queue_name
)
return response['QueueUrl']
def send_message_batch(sqs_client, queue_url, batch, maximum_batch_size = 10):
print(f"Preparing to send batch of {len(batch)} messages")
assert(
len(batch) < maximum_batch_size,
"Message batches must be less than 10"
)
response = sqs_client.send_message_batch(
QueueUrl=queue_url,
Entries=batch
)
all_ids = [o['Id'] for o in batch]
success_ids = [s['Id'] for s in response['Successful']]
failure_ids = set(all_ids) - set(success_ids)
print(f"Sent batch of {len(all_ids)} messages.")
print(f"Success: {len(success_ids)}, Failure: {len(failure_ids)}")
return {
'success': success_ids,
'failure': failure_ids,
}
def send_messages(client, target_queue_name, message_bodies):
maximum_batch_size = 10
queue_url = get_queue(client, target_queue_name)
print(f"Sending to {queue_url}")
def _create_message(message_body):
return {
'Id': str(uuid.uuid1()),
'MessageBody': json.dumps(message_body),
}
def _send_message_batch(batch):
return send_message_batch(client, queue_url, batch, maximum_batch_size)
messages = [_create_message(message_body) for message_body in message_bodies]
message_batches = chunk(messages, maximum_batch_size)
print(f"Prepared {len(message_batches)} batch from {len(messages)} messages")
results = parallel(_send_message_batch, message_batches)
return {
'messages': messages,
'results': results,
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment