Last active
November 18, 2020 16:23
-
-
Save sescobb27/41bf721ac78c3dbbf6858e64db86b701 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule CustomUploadS3 do | |
@moduledoc """ | |
Represents an AWS S3 Multipart Upload operation | |
## Examples | |
``` | |
"path/to/big/file" | |
|> S3.Upload.stream_file | |
|> S3.upload("my-bucket", "path/on/s3") | |
|> CustomUploadS3.to_custom_upload_s3() | |
|> ExAws.request!() #=> :done | |
``` | |
""" | |
@enforce_keys ~w(bucket path src)a | |
defstruct [ | |
:src, | |
:bucket, | |
:path, | |
:upload_id, | |
opts: [], | |
service: :s3 | |
] | |
@type t :: %__MODULE__{ | |
src: Enumerable.t(), | |
bucket: binary, | |
path: binary, | |
upload_id: binary, | |
opts: Keyword.t(), | |
service: :s3 | |
} | |
@spec to_custom_upload_s3(ExAws.S3.Upload.t()) :: t() | |
def to_custom_upload_s3(upload) do | |
%__MODULE__{ | |
src: upload.src, | |
bucket: upload.bucket, | |
path: upload.path, | |
opts: upload.opts | |
} | |
end | |
defimpl ExAws.Operation, for: CustomUploadS3 do | |
alias ExAws.S3.Upload | |
def perform(op, config) do | |
with {:ok, op} <- Upload.initialize(op, config) do | |
stream = Stream.with_index(op.src, 1) | |
TaskSupervisor | |
|> Task.Supervisor.async_stream_nolink( | |
stream, | |
Upload, | |
:upload_chunk!, | |
[Map.delete(op, :src), config], | |
max_concurrency: Keyword.get(op.opts, :max_concurrency, 4), | |
timeout: Keyword.get(op.opts, :timeout, 30_000) | |
) | |
|> Enum.map(fn | |
{:ok, val} -> val | |
{:exit, {error, _}} -> raise error.message | |
end) | |
|> Upload.complete(op, config) | |
end | |
end | |
def stream!(_, _), do: raise("not implemented") | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule CustomUploadS3NoRaise do | |
@moduledoc """ | |
Represents an AWS S3 Multipart Upload operation | |
## Examples | |
``` | |
"path/to/big/file" | |
|> S3.Upload.stream_file | |
|> S3.upload("my-bucket", "path/on/s3") | |
|> ExAws.request! #=> :done | |
``` | |
""" | |
@enforce_keys ~w(bucket path src)a | |
defstruct [ | |
:src, | |
:bucket, | |
:path, | |
:upload_id, | |
opts: [], | |
service: :s3 | |
] | |
@type t :: %__MODULE__{ | |
src: Enumerable.t(), | |
bucket: binary, | |
path: binary, | |
upload_id: binary, | |
opts: Keyword.t(), | |
service: :s3 | |
} | |
def complete(parts, op, config) do | |
ExAws.S3.complete_multipart_upload( | |
op.bucket, | |
op.path, | |
op.upload_id, | |
Enum.sort_by(parts, &elem(&1, 0)) | |
) | |
|> ExAws.request(config) | |
end | |
def initialize(op, config) do | |
init_op = ExAws.S3.initiate_multipart_upload(op.bucket, op.path, op.opts) | |
with {:ok, %{body: %{upload_id: upload_id}}} <- | |
ExAws.request(init_op, config) do | |
{:ok, %{op | upload_id: upload_id}} | |
end | |
end | |
@doc """ | |
Open a file stream for use in an upload. | |
Chunk size must be at least 5 MiB. Defaults to 5 MiB | |
""" | |
@spec stream_file(path :: binary) :: File.Stream.t() | |
@spec stream_file(path :: binary, opts :: [chunk_size: pos_integer]) :: | |
File.Stream.t() | |
def stream_file(path, opts \\ []) do | |
File.stream!(path, [], opts[:chunk_size] || 5 * 1024 * 1024) | |
end | |
@doc """ | |
Upload a chunk for an operation. | |
The first argument is a tuple with the binary contents of the chunk, and a | |
positive integer index indicating which chunk it is. It will return this index | |
along with the `etag` response from AWS necessary to complete the multipart upload. | |
""" | |
@spec upload_chunk!({binary, pos_integer}, t, ExAws.Config.t()) :: | |
{pos_integer, binary} | |
def upload_chunk!({chunk, i}, op, config) do | |
%{headers: headers} = | |
ExAws.S3.upload_part(op.bucket, op.path, op.upload_id, i, chunk, op.opts) | |
|> ExAws.request!(config) | |
{_, etag} = | |
Enum.find(headers, fn {k, _v} -> | |
String.downcase(k) == "etag" | |
end) | |
{i, etag} | |
end | |
@spec upload_chunk({binary, pos_integer}, t, ExAws.Config.t()) :: | |
{pos_integer, binary} | |
def upload_chunk({chunk, i}, op, config) do | |
ExAws.S3.upload_part(op.bucket, op.path, op.upload_id, i, chunk, op.opts) | |
|> ExAws.request(config) | |
|> case do | |
{:ok, %{headers: headers}} -> | |
{_, etag} = | |
Enum.find(headers, fn {k, _v} -> | |
String.downcase(k) == "etag" | |
end) | |
{i, etag} | |
{:error, reason} -> | |
{:error, reason} | |
end | |
end | |
end | |
defimpl ExAws.Operation, for: CustomUploadS3NoRaise do | |
# this is just a re-make of default implementation of https://github.com/ex-aws/ex_aws_s3/blob/master/lib/ex_aws/s3/upload.ex | |
# but using `request` instead of `request!` so our process doesn't crash if | |
# there is an error connecting (timeout/close) to S3 | |
def perform(op, config) do | |
with {:ok, op} <- Upload.initialize(op, config) do | |
vals = | |
op.src | |
|> Stream.with_index(1) | |
|> Task.async_stream( | |
Upload, | |
:upload_chunk, | |
[Map.delete(op, :src), config], | |
max_concurrency: Keyword.get(op.opts, :max_concurrency, 4), | |
timeout: Keyword.get(op.opts, :timeout, 30_000) | |
) | |
|> Enum.map(fn {:ok, val} -> val end) | |
vals | |
|> Enum.find(fn | |
{:error, _} -> true | |
_ -> false | |
end) | |
|> case do | |
nil -> | |
vals | |
|> Upload.complete(op, config) | |
error -> | |
error | |
end | |
end | |
end | |
def stream!(_, _), do: raise("not implemented") | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment