Skip to content

Instantly share code, notes, and snippets.

@imranismail
Last active May 19, 2021 17:21
Show Gist options
  • Save imranismail/a27bfe0f1d3fe1e154059440226c1b99 to your computer and use it in GitHub Desktop.
Save imranismail/a27bfe0f1d3fe1e154059440226c1b99 to your computer and use it in GitHub Desktop.
Dynamic Elixir Node Discovery on Kubernetes

Dynamic Elixir Node Discovery on Kubernetes

One way one would do it i to connect nodes together by having the sys.config/vm.args as suggested by chrismccord here

However, when deploying to platform such as Kubernetes or AWS ElasticBeanstalk whereby the cluster is elastic and the IPs that are attached to the nodes are ephemeral, you probably don't want to update sys.config/vm.args everytime you do a deployment now would you.

Now, to solve this one would need to use a Discovery Service such as Consul/etcd/Zookeeper, I opt to use Kubernetes as it's one of the best PaaS for containerized apps available right now and also has a RESTful API for querying resources such as node IPs.

How it's done is that the Iris.Kubernetes process will poll the API endpoint every 5 seconds for new IPs that are available in the cluster and attempt a connection.

The process also maintains an internal state of a set of nodes that are discovered.

defmodule Iris do
use Application
# See http://elixir-lang.org/docs/stable/elixir/Application.html
# for more information on OTP Applications
def start(_type, _args) do
import Supervisor.Spec
# Define workers and child supervisors to be supervised
children = [
# Start the Ecto repository
supervisor(Iris.Repo, []),
# Start the endpoint when the application starts
supervisor(Iris.Endpoint, []),
# Start your own worker by calling: Iris.Worker.start_link(arg1, arg2, arg3)
# worker(Iris.Worker, [arg1, arg2, arg3]),
worker(Iris.Kubernetes, []),
]
# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: Iris.Supervisor]
Supervisor.start_link(children, opts)
end
# Tell Phoenix to update the endpoint configuration
# whenever the application is updated.
def config_change(changed, _new, removed) do
Iris.Endpoint.config_change(changed, removed)
:ok
end
end
defmodule Iris.Kubernetes do
import Logger
use GenServer
@name __MODULE__
@service_account_path Path.join(~w(/ var run secrets kubernetes.io serviceaccount))
@token_path Path.join(@service_account_path, "token")
@namespace_path Path.join(@service_account_path, "namespace")
@kubernetes_master "kubernetes.default.svc.cluster.local"
def start_link() do
GenServer.start_link(@name, MapSet.new, name: @name)
end
def init(state) do
{:ok, state, 0}
end
def handle_info(:timeout, nodes), do: handle_info(:load, nodes)
def handle_info(:load, nodes) do
if configured? do
new_nodes = MapSet.new(fetch_nodes())
nodes
|> MapSet.difference(new_nodes)
|> MapSet.to_list()
|> remove_nodes()
new_nodes
|> MapSet.difference(nodes)
|> MapSet.to_list()
|> connect_nodes()
poll_kubernetes(5000)
{:noreply, new_nodes}
else
{:noreply, nodes, :hibernate}
end
end
defp connect_nodes([]), do: :ok
defp connect_nodes([head|tail]) do
case Node.connect(head) do
true ->
debug "connected to #{inspect head}"
:ok
reason ->
debug "attempted to connect to node (#{inspect head}), but failed with #{reason}."
end
connect_nodes(tail)
end
defp remove_nodes([]), do: :ok
defp remove_nodes([head|tail]) do
debug "disconnected from #{inspect head}"
remove_nodes(tail)
end
defp fetch_token() do
case File.exists?(@token_path) do
true ->
@token_path
|> File.read!()
|> String.trim()
false -> ""
end
end
defp fetch_namespace() do
case File.exists?(@namespace_path) do
true ->
@namespace_path
|> File.read!()
|> String.trim()
false -> ""
end
end
defp fetch_nodes() do
token = fetch_token()
namespace = fetch_namespace()
node_name = Application.get_env(:iris, :kubernetes_node_name)
selector = Application.get_env(:iris, :kubernetes_selector)
fetch_nodes(token, namespace, node_name, selector)
end
defp fetch_nodes(token, namespace, node_name, selector) do
endpoint = "api/v1/namespaces/#{namespace}/endpoints?labelSelector=#{selector}"
url = "https://#{@kubernetes_master}/#{endpoint}"
headers = ["Authorization": "Bearer #{token}"]
options = [ssl: [verify: :verify_none]]
with {:ok, %HTTPoison.Response{status_code: 200, body: body}} <- HTTPoison.get(url, headers, options),
{:ok, data} <- Poison.decode(body),
{:ok, result} <- map_reduce(data, node_name) do
result
else
{:ok, %HTTPoison.Response{status_code: 403, body: body}} ->
%{"message" => message} = Poison.decode!(body)
warn "cannot query Kubernetes (unauthorized): #{message}"
[]
{:ok, %HTTPoison.Response{status_code: code, body: body}} ->
warn "cannot query Kubernetes (#{code}): #{inspect body}"
[]
{:error, %HTTPoison.Error{reason: reason}} ->
error "request to Kubernetes failed!: #{inspect reason}"
[]
end
end
defp map_reduce(%{"items" => []}, _node_name), do: {:ok, []}
defp map_reduce(%{"items" => items}, node_name) do
result = Enum.reduce(items, [], fn
%{"subsets" => []}, acc ->
acc
%{"subsets" => subsets}, acc ->
subsets
|> Enum.flat_map(fn
%{"addresses" => addresses} ->
Enum.map(addresses, fn %{"ip" => ip} -> :"#{node_name}@#{ip}" end)
_ ->
[]
end)
|> Enum.concat(acc)
_, acc ->
acc
end)
{:ok, result}
end
defp map_reduce(_data, _node_name), do: {:ok, []}
defp configured?() do
cond do
is_nil(Application.get_env(:iris, :kubernetes_node_name)) && is_nil(Application.get_env(:iris, :kubernetes_selector)) ->
warn "Kubernetes is not configured!"
false
is_nil(Application.get_env(:iris, :kubernetes_selector)) ->
warn ":kubernetes_selector is not configured!"
false
is_nil(Application.get_env(:iris, :kubernetes_node_name)) ->
warn ":kubernetes_node_name is not configured!"
false
true -> true
end
end
defp poll_kubernetes(interval) do
info "Polling Kubernetes in #{interval/1000} seconds"
Process.send_after(@name, :load, interval)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment