|
defmodule Iris.Kubernetes do |
|
import Logger |
|
use GenServer |
|
|
|
@name __MODULE__ |
|
@service_account_path Path.join(~w(/ var run secrets kubernetes.io serviceaccount)) |
|
@token_path Path.join(@service_account_path, "token") |
|
@namespace_path Path.join(@service_account_path, "namespace") |
|
@kubernetes_master "kubernetes.default.svc.cluster.local" |
|
|
|
def start_link() do |
|
GenServer.start_link(@name, MapSet.new, name: @name) |
|
end |
|
|
|
def init(state) do |
|
{:ok, state, 0} |
|
end |
|
|
|
def handle_info(:timeout, nodes), do: handle_info(:load, nodes) |
|
def handle_info(:load, nodes) do |
|
if configured? do |
|
new_nodes = MapSet.new(fetch_nodes()) |
|
|
|
nodes |
|
|> MapSet.difference(new_nodes) |
|
|> MapSet.to_list() |
|
|> remove_nodes() |
|
|
|
new_nodes |
|
|> MapSet.difference(nodes) |
|
|> MapSet.to_list() |
|
|> connect_nodes() |
|
|
|
poll_kubernetes(5000) |
|
|
|
{:noreply, new_nodes} |
|
else |
|
{:noreply, nodes, :hibernate} |
|
end |
|
end |
|
|
|
defp connect_nodes([]), do: :ok |
|
defp connect_nodes([head|tail]) do |
|
case Node.connect(head) do |
|
true -> |
|
debug "connected to #{inspect head}" |
|
:ok |
|
reason -> |
|
debug "attempted to connect to node (#{inspect head}), but failed with #{reason}." |
|
end |
|
connect_nodes(tail) |
|
end |
|
|
|
defp remove_nodes([]), do: :ok |
|
defp remove_nodes([head|tail]) do |
|
debug "disconnected from #{inspect head}" |
|
remove_nodes(tail) |
|
end |
|
|
|
defp fetch_token() do |
|
case File.exists?(@token_path) do |
|
true -> |
|
@token_path |
|
|> File.read!() |
|
|> String.trim() |
|
false -> "" |
|
end |
|
end |
|
|
|
defp fetch_namespace() do |
|
case File.exists?(@namespace_path) do |
|
true -> |
|
@namespace_path |
|
|> File.read!() |
|
|> String.trim() |
|
false -> "" |
|
end |
|
end |
|
|
|
defp fetch_nodes() do |
|
token = fetch_token() |
|
namespace = fetch_namespace() |
|
node_name = Application.get_env(:iris, :kubernetes_node_name) |
|
selector = Application.get_env(:iris, :kubernetes_selector) |
|
fetch_nodes(token, namespace, node_name, selector) |
|
end |
|
defp fetch_nodes(token, namespace, node_name, selector) do |
|
endpoint = "api/v1/namespaces/#{namespace}/endpoints?labelSelector=#{selector}" |
|
url = "https://#{@kubernetes_master}/#{endpoint}" |
|
headers = ["Authorization": "Bearer #{token}"] |
|
options = [ssl: [verify: :verify_none]] |
|
|
|
with {:ok, %HTTPoison.Response{status_code: 200, body: body}} <- HTTPoison.get(url, headers, options), |
|
{:ok, data} <- Poison.decode(body), |
|
{:ok, result} <- map_reduce(data, node_name) do |
|
result |
|
else |
|
{:ok, %HTTPoison.Response{status_code: 403, body: body}} -> |
|
%{"message" => message} = Poison.decode!(body) |
|
warn "cannot query Kubernetes (unauthorized): #{message}" |
|
[] |
|
{:ok, %HTTPoison.Response{status_code: code, body: body}} -> |
|
warn "cannot query Kubernetes (#{code}): #{inspect body}" |
|
[] |
|
{:error, %HTTPoison.Error{reason: reason}} -> |
|
error "request to Kubernetes failed!: #{inspect reason}" |
|
[] |
|
end |
|
end |
|
|
|
defp map_reduce(%{"items" => []}, _node_name), do: {:ok, []} |
|
defp map_reduce(%{"items" => items}, node_name) do |
|
result = Enum.reduce(items, [], fn |
|
%{"subsets" => []}, acc -> |
|
acc |
|
%{"subsets" => subsets}, acc -> |
|
subsets |
|
|> Enum.flat_map(fn |
|
%{"addresses" => addresses} -> |
|
Enum.map(addresses, fn %{"ip" => ip} -> :"#{node_name}@#{ip}" end) |
|
_ -> |
|
[] |
|
end) |
|
|> Enum.concat(acc) |
|
_, acc -> |
|
acc |
|
end) |
|
{:ok, result} |
|
end |
|
defp map_reduce(_data, _node_name), do: {:ok, []} |
|
|
|
defp configured?() do |
|
cond do |
|
is_nil(Application.get_env(:iris, :kubernetes_node_name)) && is_nil(Application.get_env(:iris, :kubernetes_selector)) -> |
|
warn "Kubernetes is not configured!" |
|
false |
|
is_nil(Application.get_env(:iris, :kubernetes_selector)) -> |
|
warn ":kubernetes_selector is not configured!" |
|
false |
|
is_nil(Application.get_env(:iris, :kubernetes_node_name)) -> |
|
warn ":kubernetes_node_name is not configured!" |
|
false |
|
true -> true |
|
end |
|
end |
|
|
|
defp poll_kubernetes(interval) do |
|
info "Polling Kubernetes in #{interval/1000} seconds" |
|
Process.send_after(@name, :load, interval) |
|
end |
|
end |