Created
July 5, 2015 05:35
-
-
Save hamann/e0d79903b48c3e498bcf to your computer and use it in GitHub Desktop.
s3 multipart upload
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Db24.AWS.S3 do | |
require Logger | |
alias Db24.AWS.S3.MultipartUploadState, as: Mp | |
@multipart_min_chunk_size 5242880 # 5MB | |
defmodule MultipartUploadState do | |
defstruct upload_id: nil, | |
count: 1, | |
error: nil, | |
etags: [], | |
file: nil, | |
bucket: nil, | |
key: nil | |
def new(file, bucket, key) do | |
%MultipartUploadState{ | |
file: file, | |
bucket: bucket |> to_char_list, | |
key: key |> to_char_list | |
} | |
end | |
end | |
def upload(file, key) when is_binary(file) and is_binary(key) do | |
{:ok, binary} = File.read(file) | |
t_key = key |> to_char_list | |
t_bucket = bucket |> to_char_list | |
:erlcloud_s3.put_object(t_bucket, t_key, binary, [acl: :public_read], erlcloud_config) | |
end | |
def upload_multipart(file, key) when is_binary(file) and is_binary(key) do | |
mp = Mp.new(file, bucket, key) | |
result = File.stream!(file, [], @multipart_min_chunk_size) | |
|> Enum.reduce(mp, fn(binary, mp) -> | |
upload_part(binary, mp, _current_tries = 0) | |
end) | |
handle_multipart_result(result, 0) | |
end | |
defp upload_part(binary, %Mp{upload_id: nil} = mp, current_tries) do | |
set_default_config | |
case :erlcloud_s3.start_multipart(mp.bucket, mp.key, [acl: :public_read], [], erlcloud_config) do | |
{:ok, [uploadId: upload_id]} -> | |
mp = mp |> Map.put(:upload_id, upload_id) | |
upload_part(binary, mp, current_tries) | |
_error -> | |
upload_part(binary, mp, current_tries + 1) | |
end | |
end | |
defp upload_part(_binary, %Mp{error: error} = mp, _current_tries) when not is_nil(error), do: mp | |
defp upload_part(_binary, %Mp{} = mp, current_tries) when current_tries >= 5 do | |
Logger.error("Aborting multipart upload for #{mp.file} after #{current_tries} tries") | |
mp |> Map.put(:error, "too many failures") | |
end | |
defp upload_part(binary, %Mp{upload_id: upload_id} = mp, current_tries) when is_binary(binary) and is_list(upload_id) do | |
case :erlcloud_s3.upload_part(mp.bucket, mp.key, upload_id, mp.count, binary) do | |
{:ok, [{:etag, etag}]} -> | |
Logger.info("Successful multipart upload for #{mp.file}, part #{mp.count}") | |
mp | |
|> Map.put(:etags, [{mp.count, etag} | mp.etags]) | |
|> Map.put(:count, mp.count + 1) | |
error -> | |
IO.inspect(error) | |
upload_part(binary, mp, current_tries + 1) | |
end | |
end | |
defp handle_multipart_result(_mp, current_tries) when current_tries >= 5 do | |
{:error, "too many failures"} | |
end | |
defp handle_multipart_result(%Mp{error: error} = mp, current_tries) when not is_nil(error) do | |
Logger.warn("Aborting multipart upload for id #{mp.upload_id}") | |
case :erlcloud_s3.abort_multipart(mp.bucket, mp.key, mp.upload_id) do | |
:ok -> :ok | |
error -> | |
IO.inspect(error) | |
handle_multipart_result(mp, current_tries + 1) | |
end | |
end | |
defp handle_multipart_result(%Mp{} = mp, current_tries) do | |
Logger.info("Finalizing multipart upload for id #{mp.upload_id}") | |
case :erlcloud_s3.complete_multipart(mp.bucket, mp.key, mp.upload_id, Enum.reverse(mp.etags)) do | |
:ok -> :ok | |
error -> | |
IO.inspect(error) | |
handle_multipart_result(mp, current_tries + 1) | |
end | |
end | |
# TODO aws returns max 1000 keys | |
# we have to use `marker` to retrieve all | |
def list_objects(bucket_name \\ nil) | |
def list_objects(bucket_name) when is_binary(bucket_name) do | |
bucket_name |> to_char_list |> :erlcloud_s3.list_objects(erlcloud_config) | |
end | |
def list_objects(_) do | |
bucket |> to_char_list |> :erlcloud_s3.list_objects(erlcloud_config) | |
end | |
def erlcloud_config do | |
access_key_id = aws_config |> Dict.get(:access_key_id) |> to_char_list | |
secret_access_key = aws_config |> Dict.get(:secret_access_key) |> to_char_list | |
endpoint = aws_config |> Dict.get(:endpoint) |> to_char_list | |
:erlcloud_s3.new( | |
access_key_id, | |
secret_access_key, | |
endpoint | |
) | |
end | |
# TODO this implementation is not optimal, because we make a single request | |
# for each key. but :erlcloud_s3.delete_objects_batch returns 'Access denied' | |
# and don't know why, so for now this should be sufficient | |
def delete_all_objects do | |
keys = list_objects | |
|> Dict.get(:contents) | |
|> Enum.map(fn(content) -> Dict.get(content, :key) end) | |
for key <- keys do | |
delete_object(key) | |
end | |
end | |
def delete_object(key) when is_binary(key) do | |
key |> to_char_list |> delete_object | |
end | |
def delete_object(key) when is_list(key) do | |
set_default_config | |
bucket_name = bucket |> to_char_list | |
:erlcloud_s3.delete_object(bucket_name, key) | |
end | |
def set_default_config do | |
:erlang.put(:aws_config, erlcloud_config) | |
end | |
def endpoint, | |
do: aws_config |> Dict.get(:endpoint) | |
def asset_host, | |
do: aws_config |> Dict.get(:asset_host) | |
def bucket, | |
do: aws_config |> Dict.get(:bucket) | |
def aws_config, | |
do: Application.get_env(:db24, __MODULE__) | |
def url(filename) do | |
"#{asset_host}/#{filename}" | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment