Skip to content

Instantly share code, notes, and snippets.

@hamann
Created July 5, 2015 05:35
Show Gist options
  • Save hamann/e0d79903b48c3e498bcf to your computer and use it in GitHub Desktop.
Save hamann/e0d79903b48c3e498bcf to your computer and use it in GitHub Desktop.
s3 multipart upload
defmodule Db24.AWS.S3 do
require Logger
alias Db24.AWS.S3.MultipartUploadState, as: Mp
@multipart_min_chunk_size 5242880 # 5MB
defmodule MultipartUploadState do
defstruct upload_id: nil,
count: 1,
error: nil,
etags: [],
file: nil,
bucket: nil,
key: nil
def new(file, bucket, key) do
%MultipartUploadState{
file: file,
bucket: bucket |> to_char_list,
key: key |> to_char_list
}
end
end
def upload(file, key) when is_binary(file) and is_binary(key) do
{:ok, binary} = File.read(file)
t_key = key |> to_char_list
t_bucket = bucket |> to_char_list
:erlcloud_s3.put_object(t_bucket, t_key, binary, [acl: :public_read], erlcloud_config)
end
def upload_multipart(file, key) when is_binary(file) and is_binary(key) do
mp = Mp.new(file, bucket, key)
result = File.stream!(file, [], @multipart_min_chunk_size)
|> Enum.reduce(mp, fn(binary, mp) ->
upload_part(binary, mp, _current_tries = 0)
end)
handle_multipart_result(result, 0)
end
defp upload_part(binary, %Mp{upload_id: nil} = mp, current_tries) do
set_default_config
case :erlcloud_s3.start_multipart(mp.bucket, mp.key, [acl: :public_read], [], erlcloud_config) do
{:ok, [uploadId: upload_id]} ->
mp = mp |> Map.put(:upload_id, upload_id)
upload_part(binary, mp, current_tries)
_error ->
upload_part(binary, mp, current_tries + 1)
end
end
defp upload_part(_binary, %Mp{error: error} = mp, _current_tries) when not is_nil(error), do: mp
defp upload_part(_binary, %Mp{} = mp, current_tries) when current_tries >= 5 do
Logger.error("Aborting multipart upload for #{mp.file} after #{current_tries} tries")
mp |> Map.put(:error, "too many failures")
end
defp upload_part(binary, %Mp{upload_id: upload_id} = mp, current_tries) when is_binary(binary) and is_list(upload_id) do
case :erlcloud_s3.upload_part(mp.bucket, mp.key, upload_id, mp.count, binary) do
{:ok, [{:etag, etag}]} ->
Logger.info("Successful multipart upload for #{mp.file}, part #{mp.count}")
mp
|> Map.put(:etags, [{mp.count, etag} | mp.etags])
|> Map.put(:count, mp.count + 1)
error ->
IO.inspect(error)
upload_part(binary, mp, current_tries + 1)
end
end
defp handle_multipart_result(_mp, current_tries) when current_tries >= 5 do
{:error, "too many failures"}
end
defp handle_multipart_result(%Mp{error: error} = mp, current_tries) when not is_nil(error) do
Logger.warn("Aborting multipart upload for id #{mp.upload_id}")
case :erlcloud_s3.abort_multipart(mp.bucket, mp.key, mp.upload_id) do
:ok -> :ok
error ->
IO.inspect(error)
handle_multipart_result(mp, current_tries + 1)
end
end
defp handle_multipart_result(%Mp{} = mp, current_tries) do
Logger.info("Finalizing multipart upload for id #{mp.upload_id}")
case :erlcloud_s3.complete_multipart(mp.bucket, mp.key, mp.upload_id, Enum.reverse(mp.etags)) do
:ok -> :ok
error ->
IO.inspect(error)
handle_multipart_result(mp, current_tries + 1)
end
end
# TODO aws returns max 1000 keys
# we have to use `marker` to retrieve all
def list_objects(bucket_name \\ nil)
def list_objects(bucket_name) when is_binary(bucket_name) do
bucket_name |> to_char_list |> :erlcloud_s3.list_objects(erlcloud_config)
end
def list_objects(_) do
bucket |> to_char_list |> :erlcloud_s3.list_objects(erlcloud_config)
end
def erlcloud_config do
access_key_id = aws_config |> Dict.get(:access_key_id) |> to_char_list
secret_access_key = aws_config |> Dict.get(:secret_access_key) |> to_char_list
endpoint = aws_config |> Dict.get(:endpoint) |> to_char_list
:erlcloud_s3.new(
access_key_id,
secret_access_key,
endpoint
)
end
# TODO this implementation is not optimal, because we make a single request
# for each key. but :erlcloud_s3.delete_objects_batch returns 'Access denied'
# and don't know why, so for now this should be sufficient
def delete_all_objects do
keys = list_objects
|> Dict.get(:contents)
|> Enum.map(fn(content) -> Dict.get(content, :key) end)
for key <- keys do
delete_object(key)
end
end
def delete_object(key) when is_binary(key) do
key |> to_char_list |> delete_object
end
def delete_object(key) when is_list(key) do
set_default_config
bucket_name = bucket |> to_char_list
:erlcloud_s3.delete_object(bucket_name, key)
end
def set_default_config do
:erlang.put(:aws_config, erlcloud_config)
end
def endpoint,
do: aws_config |> Dict.get(:endpoint)
def asset_host,
do: aws_config |> Dict.get(:asset_host)
def bucket,
do: aws_config |> Dict.get(:bucket)
def aws_config,
do: Application.get_env(:db24, __MODULE__)
def url(filename) do
"#{asset_host}/#{filename}"
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment