Last active
January 18, 2019 12:43
-
-
Save imranismail/feb1272b98ea00d658f3f5ff34431ddf to your computer and use it in GitHub Desktop.
BEAM cluster formation in EC2
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Cluster do | |
use GenServer | |
require Logger | |
import SweetXml, only: [sigil_x: 2] | |
def child_spec(_) do | |
Supervisor.Spec.worker(__MODULE__, []) | |
end | |
def start_link do | |
GenServer.start_link(__MODULE__, MapSet.new(), name: __MODULE__) | |
end | |
def init(connected_node_set) do | |
{:ok, connected_node_set, 0} | |
end | |
def handle_info(:timeout, connected_node_set) do | |
Logger.info(fn -> "Forming cluster with Proxy.Cluster" end) | |
handle_info(:load, connected_node_set) | |
end | |
def handle_info(:load, connected_node_set) do | |
attached_node_set = fetch_attached_node_set() | |
detached_node_set = MapSet.difference(connected_node_set, attached_node_set) | |
detached_node_set | |
|> MapSet.to_list() | |
|> disconnect_nodes() | |
attached_node_set | |
|> MapSet.difference(connected_node_set) | |
|> MapSet.to_list() | |
|> connect_nodes() | |
form_cluster_after(:timer.seconds(5)) | |
{:noreply, attached_node_set} | |
end | |
defp connect_nodes([]), do: :ok | |
defp connect_nodes([head|tail]) do | |
case Node.connect(head) do | |
true -> | |
Logger.debug(fn -> "Connected to #{inspect head}" end) | |
:ok | |
reason -> | |
Logger.error(fn -> "Attempted to connect to node (#{inspect head}) but failed with reason: #{reason}." end) | |
end | |
connect_nodes(tail) | |
end | |
defp disconnect_nodes([]), do: :ok | |
defp disconnect_nodes([head|tail]) do | |
Logger.debug(fn -> "Disconnected from #{inspect head}" end) | |
disconnect_nodes(tail) | |
end | |
defp fetch_instance_id do | |
ExAws.InstanceMeta.request(ExAws.Config.new(:ec2), "http://169.254.169.254/latest/meta-data/instance-id") | |
end | |
defp fetch_availability_zone do | |
ExAws.InstanceMeta.request(ExAws.Config.new(:ec2), "http://169.254.169.254/latest/meta-data/placement/availability-zone") | |
end | |
defp fetch_region do | |
fetch_availability_zone() | |
|> String.slice(0..-2) | |
end | |
defp fetch_instance_group do | |
ExAws.EC2.describe_tags( | |
filters: [ | |
"resource-id": fetch_instance_id(), | |
key: "spotinst:aws:ec2:group:name" | |
] | |
) | |
|> ExAws.request!(region: fetch_region()) | |
|> Map.fetch!(:body) | |
|> SweetXml.xpath(~x"//DescribeTagsResponse/tagSet/item", key: ~x"./key/text()"s, value: ~x"./value/text()"s) | |
|> Map.fetch!(:value) | |
end | |
defp fetch_ips do | |
instance_group = fetch_instance_group() | |
ExAws.EC2.describe_instances( | |
filters: [ | |
"tag:spotinst:aws:ec2:group:name": instance_group, | |
"instance-state-name": "running" | |
] | |
) | |
|> ExAws.request!(region: fetch_region()) | |
|> Map.fetch!(:body) | |
|> SweetXml.xpath(~x"//DescribeInstancesResponse/reservationSet/item/instancesSet/item/privateIpAddress/text()"ls) | |
end | |
defp fetch_attached_node_set do | |
fetch_ips() | |
|> Enum.map(fn ip_address -> :"fave@#{ip_address}" end) | |
|> MapSet.new() | |
end | |
defp form_cluster_after(duration) do | |
Logger.debug(fn -> "Forming cluster in #{duration}ms" end) | |
Process.send_after(__MODULE__, :load, duration) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment