Mix.install([
{:finch, "~> 0.17.0"},
{:req, "~> 0.4.8"},
{:bandit, "~> 1.1"}
])
Inspired by https://twitter.com/wojtekmach/status/1745916280635338967
This sample spins up 2 Bandit-based HTTP servers. The 'inner' server serves as a streaming data source, the 'outer' HTTP server pulls from the 'inner' one and transforms the data on the fly.
The original sample from the tweet screenshot shows crashing GenServers, when the client terminates the TCP session. This sample has additional code for failure handling.
You can e.g. run
#!/bin/bash
curl --include --url http://localhost:4001
For whatever reason, I am forced to do a try/rescue
pattern, even if I 'just' return the exception in this snippet:
{:error, :closed} -> {:halt, {req, %MyOwnConnectionClosedException{}}}
defmodule MyOwnConnectionClosedException do
defexception message: "an example error has occurred"
end
defmodule BanditTester do
require Logger
def server(port_inner \\ 4000, port_outer \\ 4001) do
{:ok, pid_inner} =
Bandit.start_link(
scheme: :http,
port: port_inner,
plug: fn conn, _ ->
conn = conn |> Plug.Conn.send_chunked(200)
generator = Stream.cycle(0..9) |> Stream.map(&to_string/1)
Enum.reduce_while(generator, conn, fn item, inner_con ->
case inner_con |> Plug.Conn.chunk(item) do
{:ok, inner_con} ->
Process.sleep(trunc(:timer.seconds(0.5)))
{:cont, inner_con}
{:error, :closed} ->
{:halt, inner_con}
end
end)
end
)
{:ok, pid_outer} =
Bandit.start_link(
scheme: :http,
port: port_outer,
plug: fn conn, _ ->
conn = conn |> Plug.Conn.send_chunked(200)
try do
Req.new(url: "http://localhost:#{port_inner}")
|> Req.Request.put_private(:conn, conn)
|> Req.get!(
into: fn {:data, data}, {req, _resp} ->
data = "#{String.upcase(data)}\n"
case Plug.Conn.chunk(req.private.conn, data) do
{:ok, resp} ->
{:cont, {req, resp}}
{:error, :closed} ->
{:halt, {req, %MyOwnConnectionClosedException{}}}
end
end
)
rescue
_e in MyOwnConnectionClosedException -> conn
end
end
)
{:ok, [pid_inner, pid_outer]}
end
def stop(pids) do
pids
|> Enum.reverse()
|> Enum.each(&GenServer.stop(&1, :normal))
end
end
Kick off the TCP/HTTP bandit servers
inner_port = 4000
outer_port = 4001
{:ok, servers} = BanditTester.server(inner_port, outer_port)
This calls the outer server then.
{:ok, task} =
Task.start(fn ->
Req.get!("http://localhost:#{outer_port}",
into: fn {:data, data}, acc ->
IO.puts(data)
{:cont, acc}
end
)
end)
BanditTester.stop(servers)