Skip to content

Instantly share code, notes, and snippets.

@sescobb27
Last active November 18, 2020 16:23
Show Gist options
  • Save sescobb27/41bf721ac78c3dbbf6858e64db86b701 to your computer and use it in GitHub Desktop.
Save sescobb27/41bf721ac78c3dbbf6858e64db86b701 to your computer and use it in GitHub Desktop.
defmodule CustomUploadS3 do
@moduledoc """
Represents an AWS S3 Multipart Upload operation
## Examples
```
"path/to/big/file"
|> S3.Upload.stream_file
|> S3.upload("my-bucket", "path/on/s3")
|> CustomUploadS3.to_custom_upload_s3()
|> ExAws.request!() #=> :done
```
"""
@enforce_keys ~w(bucket path src)a
defstruct [
:src,
:bucket,
:path,
:upload_id,
opts: [],
service: :s3
]
@type t :: %__MODULE__{
src: Enumerable.t(),
bucket: binary,
path: binary,
upload_id: binary,
opts: Keyword.t(),
service: :s3
}
@spec to_custom_upload_s3(ExAws.S3.Upload.t()) :: t()
def to_custom_upload_s3(upload) do
%__MODULE__{
src: upload.src,
bucket: upload.bucket,
path: upload.path,
opts: upload.opts
}
end
defimpl ExAws.Operation, for: CustomUploadS3 do
alias ExAws.S3.Upload
def perform(op, config) do
with {:ok, op} <- Upload.initialize(op, config) do
stream = Stream.with_index(op.src, 1)
TaskSupervisor
|> Task.Supervisor.async_stream_nolink(
stream,
Upload,
:upload_chunk!,
[Map.delete(op, :src), config],
max_concurrency: Keyword.get(op.opts, :max_concurrency, 4),
timeout: Keyword.get(op.opts, :timeout, 30_000)
)
|> Enum.map(fn
{:ok, val} -> val
{:exit, {error, _}} -> raise error.message
end)
|> Upload.complete(op, config)
end
end
def stream!(_, _), do: raise("not implemented")
end
end
defmodule CustomUploadS3NoRaise do
@moduledoc """
Represents an AWS S3 Multipart Upload operation
## Examples
```
"path/to/big/file"
|> S3.Upload.stream_file
|> S3.upload("my-bucket", "path/on/s3")
|> ExAws.request! #=> :done
```
"""
@enforce_keys ~w(bucket path src)a
defstruct [
:src,
:bucket,
:path,
:upload_id,
opts: [],
service: :s3
]
@type t :: %__MODULE__{
src: Enumerable.t(),
bucket: binary,
path: binary,
upload_id: binary,
opts: Keyword.t(),
service: :s3
}
def complete(parts, op, config) do
ExAws.S3.complete_multipart_upload(
op.bucket,
op.path,
op.upload_id,
Enum.sort_by(parts, &elem(&1, 0))
)
|> ExAws.request(config)
end
def initialize(op, config) do
init_op = ExAws.S3.initiate_multipart_upload(op.bucket, op.path, op.opts)
with {:ok, %{body: %{upload_id: upload_id}}} <-
ExAws.request(init_op, config) do
{:ok, %{op | upload_id: upload_id}}
end
end
@doc """
Open a file stream for use in an upload.
Chunk size must be at least 5 MiB. Defaults to 5 MiB
"""
@spec stream_file(path :: binary) :: File.Stream.t()
@spec stream_file(path :: binary, opts :: [chunk_size: pos_integer]) ::
File.Stream.t()
def stream_file(path, opts \\ []) do
File.stream!(path, [], opts[:chunk_size] || 5 * 1024 * 1024)
end
@doc """
Upload a chunk for an operation.
The first argument is a tuple with the binary contents of the chunk, and a
positive integer index indicating which chunk it is. It will return this index
along with the `etag` response from AWS necessary to complete the multipart upload.
"""
@spec upload_chunk!({binary, pos_integer}, t, ExAws.Config.t()) ::
{pos_integer, binary}
def upload_chunk!({chunk, i}, op, config) do
%{headers: headers} =
ExAws.S3.upload_part(op.bucket, op.path, op.upload_id, i, chunk, op.opts)
|> ExAws.request!(config)
{_, etag} =
Enum.find(headers, fn {k, _v} ->
String.downcase(k) == "etag"
end)
{i, etag}
end
@spec upload_chunk({binary, pos_integer}, t, ExAws.Config.t()) ::
{pos_integer, binary}
def upload_chunk({chunk, i}, op, config) do
ExAws.S3.upload_part(op.bucket, op.path, op.upload_id, i, chunk, op.opts)
|> ExAws.request(config)
|> case do
{:ok, %{headers: headers}} ->
{_, etag} =
Enum.find(headers, fn {k, _v} ->
String.downcase(k) == "etag"
end)
{i, etag}
{:error, reason} ->
{:error, reason}
end
end
end
defimpl ExAws.Operation, for: CustomUploadS3NoRaise do
# this is just a re-make of default implementation of https://github.com/ex-aws/ex_aws_s3/blob/master/lib/ex_aws/s3/upload.ex
# but using `request` instead of `request!` so our process doesn't crash if
# there is an error connecting (timeout/close) to S3
def perform(op, config) do
with {:ok, op} <- Upload.initialize(op, config) do
vals =
op.src
|> Stream.with_index(1)
|> Task.async_stream(
Upload,
:upload_chunk,
[Map.delete(op, :src), config],
max_concurrency: Keyword.get(op.opts, :max_concurrency, 4),
timeout: Keyword.get(op.opts, :timeout, 30_000)
)
|> Enum.map(fn {:ok, val} -> val end)
vals
|> Enum.find(fn
{:error, _} -> true
_ -> false
end)
|> case do
nil ->
vals
|> Upload.complete(op, config)
error ->
error
end
end
end
def stream!(_, _), do: raise("not implemented")
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment