-
-
Save Nicolab/fa142a96805a244c80467575aa066647 to your computer and use it in GitHub Desktop.
Stream HTTP body with Elixir Mint
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Downloader do | |
@moduledoc""" | |
Download streams of bytes from URLs. | |
Useful to transfer large files with low RAM usage. | |
## Example with `ExAWS.S3.upload/3` | |
```elixir | |
url | |
|> Downloader.stream_body!() | |
|> Downloader.chunk_bytes(5_000_000) | |
|> ExAws.S3.upload(s3_bucket, filename, opts) | |
|> ExAws.request!() | |
``` | |
""" | |
defstruct completed: false, | |
content_length: 0, | |
max_body_length: 200 * 1024 * 1024, | |
max_redirect: 3, | |
receive_timeout: 10_000, | |
redirect_count: 0, | |
redirect_location: nil, | |
status: 0 | |
@opts [:max_body_length, :max_redirect, :receive_timeout, :redirect_count] | |
@doc """ | |
Creates a stream to download files from a URL in chunks. | |
Raises on 4xx and 5xx responses and when too many redirects. | |
Also raises connection errors. | |
Supports `:max_body_length`, `:max_redirect` and `:receive_timeout` options. | |
Example: | |
url = "http://www.example.com/video.mp4" | |
stream = Downloader.stream_body!(url) | |
body = Enum.map(stream, fn chunk -> chunk end) |> Enum.join() | |
""" | |
def stream_body!(url, opts \\ []) do | |
Stream.resource( | |
fn -> start_request!(url, opts) end, | |
&process_request/1, | |
&finish_request!/1 | |
) | |
end | |
defp start_request!(url, opts) do | |
state = struct(__MODULE__, Keyword.take(opts, @opts)) | |
if state.redirect_count < state.max_redirect do | |
do_start!(url, state) | |
else | |
raise http_error(:too_many_redirects) | |
end | |
end | |
defp do_start!(url, state) do | |
uri = URI.parse(url) | |
opts = if uri.scheme == "https", do: [transport_opts: [verify: :verify_none]], else: [] | |
headers = body = [] | |
with {:ok, conn} <- Mint.HTTP.connect(scheme_atom(uri.scheme), uri.host, uri.port, opts) do | |
request = Mint.HTTP.request(conn, "GET", path(uri), headers, body) | |
{request, state} | |
else | |
{:error, error} -> | |
raise error | |
end | |
end | |
defp finish_request!({{:error, conn, error}, _state}) do | |
Mint.HTTP.close(conn) | |
raise error | |
end | |
defp finish_request!({{:ok, conn, _ref}, _state}) do | |
Mint.HTTP.close(conn) | |
end | |
defp process_request({{:error, conn, error}, state}) do | |
{:halt, {{:error, conn, error}, state}} | |
end | |
defp process_request({{:ok, conn, ref}, %{status: status} = state}) do | |
cond do | |
status >= 300 and status < 400 and !!state.redirect_location -> | |
Mint.HTTP.close(conn) | |
state.redirect_location | |
|> start_request!(redirect_opts(state)) | |
|> process_request() | |
status >= 400 and status < 500 -> | |
{:halt, {{:error, conn, http_error(:status_4xx)}, state}} | |
status >= 500 -> | |
{:halt, {{:error, conn, http_error(:status_5xx)}, state}} | |
state.content_length > state.max_body_length -> | |
{:halt, {{:error, conn, http_error(:max_body_length_exceeded)}, state}} | |
state.completed -> | |
{:halt, {{:ok, conn, ref}, state}} | |
true -> | |
receive_chunk(conn, ref, state) | |
end | |
end | |
defp receive_chunk(conn, ref, state) do | |
receive do | |
message -> | |
case Mint.HTTP.stream(conn, message) do | |
{:ok, conn, data} -> | |
{chunk, state} = parse_chunk(data, state, ref) | |
{List.wrap(chunk), {{:ok, conn, ref}, state}} | |
{:error, conn, error, _messages} -> | |
{:halt, {{:error, conn, error}, state}} | |
:unknown -> | |
{[], {{:ok, conn, ref}, state}} | |
end | |
after | |
state.receive_timeout -> | |
{:halt, {{:error, conn, http_error(:receive_timeout)}, state}} | |
end | |
end | |
defp parse_chunk(data, state, ref, chunk \\ []) | |
defp parse_chunk([{:status, ref, status} | messages], state, ref, chunk) do | |
parse_chunk(messages, %{state | status: status}, ref, chunk) | |
end | |
defp parse_chunk([{:headers, ref, headers} | messages], state, ref, chunk) do | |
content_length = (get_header(headers, "content-length") || "0") |> String.to_integer() | |
location = get_header(headers, "location") | |
state = %{state | redirect_location: location, content_length: content_length} | |
parse_chunk(messages, state, ref, chunk) | |
end | |
defp parse_chunk([{:data, ref, data} | messages], state, ref, chunk) do | |
chunk = [data | chunk] | |
parse_chunk(messages, state, ref, chunk) | |
end | |
defp parse_chunk([{:done, ref}], state, ref, chunk) do | |
parse_chunk([], %{state | completed: true}, ref, chunk) | |
end | |
defp parse_chunk([], state, _ref, chunk) do | |
if state.status >= 200 and state.status < 300 do | |
{Enum.reverse(chunk), state} | |
else | |
{[], state} | |
end | |
end | |
defp get_header(headers, key) do | |
for {^key, value} <- headers, do: value | |
end | |
defp redirect_opts(%__MODULE__{} = state) do | |
state | |
|> Map.take(@opts) | |
|> Map.put(:redirect_count, state.redirect_count + 1) | |
|> Map.to_list() | |
end | |
defp path(uri) do | |
IO.iodata_to_binary([ | |
if(uri.path, do: uri.path, else: ["/"]), | |
if(uri.query, do: ["?" | uri.query], else: []), | |
if(uri.fragment, do: ["#" | uri.fragment], else: []) | |
]) | |
end | |
@doc """ | |
Streams an enumerable in chunks of same byte size. | |
Useful to stream to servers that limit chunk sizes, like AWS S3. | |
""" | |
def chunk_bytes(enum, bytes) do | |
chunk_fun = fn element, acc -> | |
acc = acc <> element | |
if bytes < byte_size(acc) do | |
<<head::binary-size(bytes), rest::binary>> = acc | |
{:cont, head, rest} | |
else | |
{:cont, acc} | |
end | |
end | |
after_fun = fn acc -> {:cont, acc, ""} end | |
Stream.chunk_while(enum, "", chunk_fun, after_fun) | |
end | |
def http_error(reason), do: %Mint.HTTPError{reason: reason, module: __MODULE__} | |
def format_error(:status_5xx), do: "response status 5xx" | |
def format_error(:status_4xx), do: "response status 4xx" | |
def format_error(:too_many_redirects), do: "request was redirected too many times" | |
def format_error(:receive_timeout), do: "receive timeout" | |
def format_error(:max_body_length_exceeded), do: "response body length exceeded the max limit" | |
defp scheme_atom("https"), do: :https | |
defp scheme_atom(_), do: :http | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment