Last active
June 28, 2023 10:01
-
-
Save vishnevskiy/312d6f3180ff0122b9fb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Snowflake do | |
use Application | |
def start(_type, _args) do | |
import Supervisor.Spec | |
children = [ | |
worker(:riak_core_vnode_master, [Snowflake.VNode]), | |
] | |
case Supervisor.start_link(children, strategy: :one_for_one) do | |
{:ok, pid} -> | |
# HACK: riak_core assume APP_sup for the name of the supervisor. | |
Process.register(pid, :snowflake_sup) | |
:ok = :riak_core.register(:snowflake, [vnode_module: Snowflake.VNode]) | |
:ok = :riak_core_ring_events.add_guarded_handler(Snowflake.RingEventHandler, []) | |
:ok = :riak_core_node_watcher_events.add_guarded_handler(Snowflake.NodeEventHandler, []) | |
:ok = :riak_core_node_watcher.service_up(:snowflake, pid) | |
{:ok, pid} | |
{:error, reason} -> | |
{:error, reason} | |
end | |
end | |
def next_id, do: command(:next_id) | |
def ping, do: command(:ping) | |
defp command(cmd) do | |
doc_idx = :riak_core_util.chash_key({Atom.to_string(cmd), :erlang.term_to_binary(:erlang.now())}) | |
pref_list = :riak_core_apl.get_primary_apl(doc_idx, 1, :snowflake) | |
[{index_node, _type}] = pref_list | |
# HACK: riak_core automatically adds _master for the VNode master name. | |
:riak_core_vnode_master.sync_spawn_command(index_node, cmd, Snowflake.VNode_master) | |
end | |
end | |
defmodule Snowflake.Console do | |
def join(node_str) do | |
try do | |
case :riak_core.join(node_str) do | |
:ok -> | |
IO.puts "Sent join request to #{node_str}" | |
:ok | |
{:error, :not_reachable} -> | |
IO.puts "Node #{node_str} is not reachable!" | |
:error | |
{:error, :different_ring_sizes} -> | |
IO.puts "Failed! #{node_str} has a different ring_creation_size" | |
:error | |
end | |
catch | |
reason -> | |
IO.puts "Join failed #{reason}" | |
:error | |
end | |
end | |
def leave, do: :riak_core.leave() | |
def remove(node), do: remove_node(:erlang.list_to_atom(node)) | |
def remove_node(node) when is_atom(node) do | |
# TODO: copy properly | |
:riak_core.remove_from_cluster(node) | |
end | |
def ringready do | |
:riak_core_status.ringready() | |
end | |
end | |
defmodule Snowflake.RingEventHandler do | |
use GenEvent | |
def handle_event({:ring_update, _ring}, state) do | |
{:ok, state} | |
end | |
end | |
defmodule Snowflake.NodeEventHandler do | |
use GenEvent | |
def handle_event({:service_update, _services}, state) do | |
{:ok, state} | |
end | |
end | |
defmodule Snowflake.VNode do | |
@behaviour :riak_core_vnode | |
def start_vnode(i) do | |
:riak_core_vnode_master.get_vnode_pid(i, __MODULE__) | |
end | |
def init([index]) do | |
ts = :erlang.now() | |
# This could get ugly if you expect them to be unique across data | |
# centers, or if you have more than 1024 partitions | |
<<machine_id :: size(10), _rest :: bits>> = <<index :: size(160)>> | |
{:ok, {index, machine_id, 0, ts}} | |
end | |
def handle_command(:next_id, sender, {index, machine_id, seq, ts}=state) do | |
case get_next_seq(ts, seq) do | |
:backwards_clock -> | |
{:reply, {:fail, :backwards_clock}, state} | |
:exhausted -> | |
# Retry after a millisecond | |
:erlang.sleep(1) | |
handle_command(:next_id, sender, state) | |
{:ok, new_ts, new_seq} -> | |
{:reply, construct_id(new_ts, machine_id, new_seq), {index, machine_id, new_seq, new_ts}} | |
end | |
end | |
def handle_command(:ping, _sender, state) do | |
{:reply, :pong, state} | |
end | |
def handle_command(message, _sender, state) do | |
{:noreply, state} | |
end | |
def handle_exit(_pid, reason, state) do | |
{:stop, reason, state} | |
end | |
def handle_handoff_command({:riak_core_fold_req_v2, _foldfun, acc0, _forwardable, _opts}, _sender, state) do | |
{:reply, acc0, state} | |
end | |
def handle_handoff_command(_message, _sender, state) do | |
{:forward, state} | |
end | |
def handoff_starting(_target_node, state) do | |
{true, state} | |
end | |
def handoff_cancelled(state) do | |
{:ok, state} | |
end | |
def handoff_finished(_target_node, state) do | |
{:ok, state} | |
end | |
def handle_handoff_data(_data, state) do | |
{:reply, :ok, state} | |
end | |
def encode_handoff_item(_object_name, _object_value) do | |
"" | |
end | |
def is_empty(state) do | |
{false, state} | |
end | |
def delete(state) do | |
{:ok, state} | |
end | |
def handle_coverage(_req, _key_spaces, _sender, state) do | |
{:stop, :not_implemented, state} | |
end | |
def terminate(_reason, _state) do | |
:ok | |
end | |
defp get_next_seq({megas, secs, micros} = ts, seq) do | |
now = :erlang.now() | |
{now_megas, now_secs, now_micros} = now | |
cond do | |
# Time is essentially equal at the millisecond | |
megas == now_megas and secs == now_secs and div(micros, 1000) == div(now_micros, 1000) -> | |
case rem(seq + 1, 4096) do | |
0 -> :exhausted | |
new_seq -> {:ok, now, new_seq} | |
end | |
# Woops, clock was moved backwards by NTP | |
now < ts -> | |
:backwards_clock | |
# New millisecond | |
true -> | |
{:ok, now, 0} | |
end | |
end | |
@twitter_epoch 1142974214000 | |
defp construct_id({megas, secs, micros}, machine_id, seq) do | |
combined = div(((megas * 1000000 + secs) * 1000000 + micros), 1000) - @twitter_epoch | |
<<integer :: [size(64), unsigned, integer]>> = << | |
combined :: [size(42), unsigned, integer], | |
machine_id :: size(10), | |
seq :: [size(12), integer, unsigned]>> | |
integer | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment