local_req = Path.expand("~/dev/req")
Mix.install([
{:req, path: local_req}
])
:ok
This module is a simple proof-of-concept using Finch.stream/5
and Stream.resource/3
to expose the response body as a stream. It doesn't currently handle request cancelation.
defmodule FinchStream do
@moduledoc """
Uses Finch's lower-level `stream` API to expose an HTTP
response as a Stream.
## Usage
Finch.start_link(name: MyFinch)
{status, headers, stream} =
Finch.build(:get, "https://example.com")
|> FinchStream.stream!(MyFinch)
"""
def stream!(request, finch, opts \\ []) do
IO.puts("starting stream")
me = self()
ref = make_ref()
task =
Task.async(fn ->
on_chunk = fn chunk, _acc -> send(me, {:chunk, chunk, ref}) end
Finch.stream(request, finch, nil, on_chunk, opts)
send(me, {:done, ref})
end)
status = receive(do: ({:chunk, {:status, status}, ^ref} -> status))
headers = receive(do: ({:chunk, {:headers, headers}, ^ref} -> headers))
stream =
Stream.resource(
fn -> {ref, task} end,
&next_fun/1,
&after_fun/1
)
{status, headers, stream}
end
defp next_fun({ref, task}) do
IO.puts("awaiting data")
receive do
{:chunk, {:data, data}, ^ref} -> {[data], {ref, task}}
{:done, ^ref} -> {:halt, {ref, task}}
end
end
defp after_fun({_ref, task}) do
IO.puts("done")
Task.shutdown(task)
end
end
{:module, FinchStream, <<70, 79, 82, 49, 0, 0, 14, ...>>, {:after_fun, 1}}
defmodule ReqStream do
def attach(req) do
req
|> Req.Request.register_options([:stream])
|> Req.Request.append_request_steps(stream: &stream_request/1)
end
defp stream_request(request) do
if request.options[:stream] do
Req.Request.merge_options(request, finch_exec_request: &exec_stream_request/3)
else
request
end
end
defp exec_stream_request(finch_request, finch_name, finch_opts) do
{status, headers, stream} = FinchStream.stream!(finch_request, finch_name, finch_opts)
%Req.Response{
status: status,
headers: headers,
body: {:stream, stream}
}
end
end
{:module, ReqStream, <<70, 79, 82, 49, 0, 0, 11, ...>>, {:exec_stream_request, 3}}
response =
Req.new(url: "https://sse.dev/test")
|> ReqStream.attach()
|> Req.get!(stream: true)
starting stream
%Req.Response{
status: 200,
headers: [
{"date", "Sun, 23 Apr 2023 20:47:51 GMT"},
{"server", "Apache/2.4.52 (Unix) OpenSSL/1.1.1n"},
{"access-control-allow-origin", "*"},
{"cache-control", "no-cache"},
{"content-type", "text/event-stream"},
{"transfer-encoding", "chunked"}
],
body: {:stream, #Function<52.57817549/2 in Stream.resource/3>},
private: %{}
}
{:stream, stream} = response.body
for data <- stream do
IO.inspect(data)
end
awaiting data
"data: {\"testing\":true,\"sse_dev\":\"is great\",\"msg\":\"It works!\",\"now\":1682282871288}\n\n"
awaiting data
"data: {\"testing\":true,\"sse_dev\":\"is great\",\"msg\":\"It works!\",\"now\":1682282873288}\n\n"
awaiting data
"data: {\"testing\":true,\"sse_dev\":\"is great\",\"msg\":\"It works!\",\"now\":1682282875288}\n\n"
awaiting data
"data: {\"testing\":true,\"sse_dev\":\"is great\",\"msg\":\"It works!\",\"now\":1682282877288}\n\n"
awaiting data