Skip to content

Instantly share code, notes, and snippets.

@vschiavoni
Created August 6, 2015 15:10
Show Gist options
  • Save vschiavoni/a30dff25cbc150456d8a to your computer and use it in GitHub Desktop.
Save vschiavoni/a30dff25cbc150456d8a to your computer and use it in GitHub Desktop.
-------------------------------------------------------------------------------
-- modules
-------------------------------------------------------------------------------
require"splay.base"
rpc = require"splay.rpc"
rpc.l_o.level=1
misc = require "splay.misc"
crypto = require "crypto"
-- addition to allow local run
PARAMS={}
local cmd_line_args=nil
if not job then --outside the sandbox
if #arg < 2 then
print("lua ", arg[0], " my_position nb_nodes")
os.exit()
else
local pos, total = tonumber(arg[1]), tonumber(arg[2])
local utils = require("splay.utils")
job = utils.generate_job(pos, total, 20001)
cmd_line_args=arg[3]
end
end
if arg~=nil then
if cmd_line_args==nil then cmd_line_args=arg[1] end
if cmd_line_args~=nil and cmd_line_args~="" then
print("ARGS: ",cmd_line_args)
for _,v in pairs(misc.split(cmd_line_args,":")) do
local t=misc.split(v,"=")
PARAMS[t[1]]=t[2]
end
end
end
rpc.server(job.me.port)
-------------------------------------------------------------------------------
-- current node
-------------------------------------------------------------------------------
--current nodes's id
me = {}
me.peer = job.me
M = 32
function compute_hash(o)
return tonumber(string.sub(crypto.evp.new("sha1"):digest(o), 1, M/4), 16)
end
--current node's age
me.age = 0
me.id = compute_hash(table.concat({tostring(job.me.ip),":",tostring(job.me.port)}))
-------------------------------------------------------------------------------
-- parameters
-------------------------------------------------------------------------------
--T-CHORD params
STATS_PERIOD = tonumber(PARAMS["STATS_PERIOD"]) or 5
GOSSIP_TIME = tonumber(PARAMS["GOSSIP_TIME"]) or 5
TCHORD_EXPONENT = 8
--size of leaf set
TCHORD_LEAF = tonumber(PARAMS["TCHORD_LEAF"]) or 3
TCHORD_MESSAGE_SIZE = tonumber(PARAMS["TCHORD_MESSAGE_SIZE"]) or 10
TCHORD_HB_TIMEOUT = tonumber(PARAMS["TCHORD_HB_TIMEOUT"]) or 1000
TCHORD_CONVERGE = PARAMS["TCHORD_CONVERGE"] or true
TCHORD = {
leaves = {},
fingers = {},
t = TCHORD_EXPONENT,
l = TCHORD_LEAF,
m = TCHORD_MESSAGE_SIZE,
timeout = TCHORD_HB_TIMEOUT,
-- current cycle of the active TCHORD thread
ideal_fingers = {},
ideal_leaves = {},
-------------------------------------------------------------------------------
-- utilities
-------------------------------------------------------------------------------
get_pos = function(node)
local id
if type(node) == "table" then id = node.id else id = node end
return id%2^TCHORD.t
end,
-- remove duplicates from view
remove_dup = function(set)
--local rd = misc.time()
for i,v in ipairs(set) do
local j = i+1
while(j <= #set and #set > 0) do
if v.id == set[j].id then
table.remove(set,j)
else j = j + 1
end
end
end
--log:print("TCHORD.remove_dup", misc.time()-rd)
end,
--keep n first elelements from t
keep_n = function(t,n)
for i = #t, n+1, -1 do
table.remove(t,i)
end
end,
remove_self = function(t, node)
local j = 1
for i = 1, #t do
if TCHORD.same_node(t[j],node) then table.remove(t, j)
else j = j+1 end
end
end,
--ranks the set of nodes from the point of view of the node with the specified id, based on their clockwise distance on the ring
rank = function(n, set)
local distances = {}
local ranked = {}
for i,v in ipairs(set) do
local dist = TCHORD.get_pos(v) - TCHORD.get_pos(n)
local d = 0
if dist >= 0 then d = dist
else d = dist + 2^TCHORD.t end
distances[#distances+1] = {distance= d, node=v}
end
table.sort(distances, function(a,b) return a.distance < b.distance end)
for i,v in ipairs(distances) do
ranked[#ranked+1] = v.node
end
return ranked
end,
same_node = function(n1,n2)
local peer_first
if n1.peer then peer_first = n1.peer else peer_first = n1 end
local peer_second
if n2.peer then peer_second = n2.peer else peer_second = n2 end
return peer_first.port == peer_second.port and peer_first.ip == peer_second.ip
end,
-------------------------------------------------------------------------------
-- Convergence
-------------------------------------------------------------------------------
precompute_views = function()
if TCHORD_CONVERGE then
local ranked = TCHORD.rank_all_nodes ()
for i, v in ipairs(ranked) do
for j,w in ipairs(v) do
end
end
for i = 1, TCHORD.l do
TCHORD.ideal_leaves[i] = ranked[i]
for i,v in ipairs(ranked[i]) do
end
end
for i = 1, TCHORD.t do
TCHORD.ideal_fingers[i] = TCHORD.precompute_fingers(ranked, i)
end
--TCHORD.display_correct_fingers()
end
end,
hash_2_position={},
--group ordered nodes by ranks
rank_all_nodes = function()
local result = {}
local ids = {}
for i,v in ipairs(job.nodes) do
if not TCHORD.same_node(v, me) then
local hashed_index = compute_hash(tostring(v.ip) ..":"..tostring(v.port))
ids[#ids+1] = hashed_index
TCHORD.hash_2_position[hashed_index]=i
end
end
ids = TCHORD.rank(me, ids)
for i,v in ipairs(ids) do
if #result ~= 0 then
if TCHORD.get_pos(v) ~= TCHORD.get_pos(result[#result][1]) then
result[#result+1] = {}
table.insert(result[#result], v)
else
table.insert(result[#result], v)
end
else
result[#result+1] = {}
table.insert(result[#result], v)
end
end
return result
end,
precompute_fingers = function(nodes,j)
local f_index = (TCHORD.get_pos(me) + 2^(j-1))%2^TCHORD.t
local first = 0
for i,v in ipairs(nodes) do
if #v ~= 0 then first = i break end
end
local min = TCHORD.get_pos(nodes[first][1])
for i = first, #nodes do
local value = TCHORD.get_pos(nodes[i][1])
if value == f_index then return nodes[i] end
if value < min then min = value end
if min <= f_index and value > f_index then return nodes[i] end
end
return nodes[first]
end,
}
max_time = 10
function terminator()
events.sleep(max_time)
os.exit()
end
function main()
-- this thread will be in charge of killing the node after max_time seconds
events.thread(terminator)
log:print("COMPLETE VIEW STATE "..me.id.." mandatory_entries:".. TCHORD.l.." optional_entries:"..TCHORD.t)
TCHORD.precompute_views()
log:print("Node position:", job.position)
log:print("Ideal leaves:")
for k,v in pairs(TCHORD.ideal_leaves) do
local hash=v[1]
log:print(k,hash, TCHORD.hash_2_position[hash])
end
log:print("Ideal fingers:")
for k,v in pairs(TCHORD.ideal_fingers) do
local hash=v[1]
log:print(k,hash, TCHORD.hash_2_position[hash])
end
end
events.thread(main)
events.loop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment