Created
August 6, 2015 15:10
-
-
Save vschiavoni/a30dff25cbc150456d8a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
------------------------------------------------------------------------------- | |
-- modules | |
------------------------------------------------------------------------------- | |
require"splay.base" | |
rpc = require"splay.rpc" | |
rpc.l_o.level=1 | |
misc = require "splay.misc" | |
crypto = require "crypto" | |
-- addition to allow local run | |
PARAMS={} | |
local cmd_line_args=nil | |
if not job then --outside the sandbox | |
if #arg < 2 then | |
print("lua ", arg[0], " my_position nb_nodes") | |
os.exit() | |
else | |
local pos, total = tonumber(arg[1]), tonumber(arg[2]) | |
local utils = require("splay.utils") | |
job = utils.generate_job(pos, total, 20001) | |
cmd_line_args=arg[3] | |
end | |
end | |
if arg~=nil then | |
if cmd_line_args==nil then cmd_line_args=arg[1] end | |
if cmd_line_args~=nil and cmd_line_args~="" then | |
print("ARGS: ",cmd_line_args) | |
for _,v in pairs(misc.split(cmd_line_args,":")) do | |
local t=misc.split(v,"=") | |
PARAMS[t[1]]=t[2] | |
end | |
end | |
end | |
rpc.server(job.me.port) | |
------------------------------------------------------------------------------- | |
-- current node | |
------------------------------------------------------------------------------- | |
--current nodes's id | |
me = {} | |
me.peer = job.me | |
M = 32 | |
function compute_hash(o) | |
return tonumber(string.sub(crypto.evp.new("sha1"):digest(o), 1, M/4), 16) | |
end | |
--current node's age | |
me.age = 0 | |
me.id = compute_hash(table.concat({tostring(job.me.ip),":",tostring(job.me.port)})) | |
------------------------------------------------------------------------------- | |
-- parameters | |
------------------------------------------------------------------------------- | |
--T-CHORD params | |
STATS_PERIOD = tonumber(PARAMS["STATS_PERIOD"]) or 5 | |
GOSSIP_TIME = tonumber(PARAMS["GOSSIP_TIME"]) or 5 | |
TCHORD_EXPONENT = 8 | |
--size of leaf set | |
TCHORD_LEAF = tonumber(PARAMS["TCHORD_LEAF"]) or 3 | |
TCHORD_MESSAGE_SIZE = tonumber(PARAMS["TCHORD_MESSAGE_SIZE"]) or 10 | |
TCHORD_HB_TIMEOUT = tonumber(PARAMS["TCHORD_HB_TIMEOUT"]) or 1000 | |
TCHORD_CONVERGE = PARAMS["TCHORD_CONVERGE"] or true | |
TCHORD = { | |
leaves = {}, | |
fingers = {}, | |
t = TCHORD_EXPONENT, | |
l = TCHORD_LEAF, | |
m = TCHORD_MESSAGE_SIZE, | |
timeout = TCHORD_HB_TIMEOUT, | |
-- current cycle of the active TCHORD thread | |
ideal_fingers = {}, | |
ideal_leaves = {}, | |
------------------------------------------------------------------------------- | |
-- utilities | |
------------------------------------------------------------------------------- | |
get_pos = function(node) | |
local id | |
if type(node) == "table" then id = node.id else id = node end | |
return id%2^TCHORD.t | |
end, | |
-- remove duplicates from view | |
remove_dup = function(set) | |
--local rd = misc.time() | |
for i,v in ipairs(set) do | |
local j = i+1 | |
while(j <= #set and #set > 0) do | |
if v.id == set[j].id then | |
table.remove(set,j) | |
else j = j + 1 | |
end | |
end | |
end | |
--log:print("TCHORD.remove_dup", misc.time()-rd) | |
end, | |
--keep n first elelements from t | |
keep_n = function(t,n) | |
for i = #t, n+1, -1 do | |
table.remove(t,i) | |
end | |
end, | |
remove_self = function(t, node) | |
local j = 1 | |
for i = 1, #t do | |
if TCHORD.same_node(t[j],node) then table.remove(t, j) | |
else j = j+1 end | |
end | |
end, | |
--ranks the set of nodes from the point of view of the node with the specified id, based on their clockwise distance on the ring | |
rank = function(n, set) | |
local distances = {} | |
local ranked = {} | |
for i,v in ipairs(set) do | |
local dist = TCHORD.get_pos(v) - TCHORD.get_pos(n) | |
local d = 0 | |
if dist >= 0 then d = dist | |
else d = dist + 2^TCHORD.t end | |
distances[#distances+1] = {distance= d, node=v} | |
end | |
table.sort(distances, function(a,b) return a.distance < b.distance end) | |
for i,v in ipairs(distances) do | |
ranked[#ranked+1] = v.node | |
end | |
return ranked | |
end, | |
same_node = function(n1,n2) | |
local peer_first | |
if n1.peer then peer_first = n1.peer else peer_first = n1 end | |
local peer_second | |
if n2.peer then peer_second = n2.peer else peer_second = n2 end | |
return peer_first.port == peer_second.port and peer_first.ip == peer_second.ip | |
end, | |
------------------------------------------------------------------------------- | |
-- Convergence | |
------------------------------------------------------------------------------- | |
precompute_views = function() | |
if TCHORD_CONVERGE then | |
local ranked = TCHORD.rank_all_nodes () | |
for i, v in ipairs(ranked) do | |
for j,w in ipairs(v) do | |
end | |
end | |
for i = 1, TCHORD.l do | |
TCHORD.ideal_leaves[i] = ranked[i] | |
for i,v in ipairs(ranked[i]) do | |
end | |
end | |
for i = 1, TCHORD.t do | |
TCHORD.ideal_fingers[i] = TCHORD.precompute_fingers(ranked, i) | |
end | |
--TCHORD.display_correct_fingers() | |
end | |
end, | |
hash_2_position={}, | |
--group ordered nodes by ranks | |
rank_all_nodes = function() | |
local result = {} | |
local ids = {} | |
for i,v in ipairs(job.nodes) do | |
if not TCHORD.same_node(v, me) then | |
local hashed_index = compute_hash(tostring(v.ip) ..":"..tostring(v.port)) | |
ids[#ids+1] = hashed_index | |
TCHORD.hash_2_position[hashed_index]=i | |
end | |
end | |
ids = TCHORD.rank(me, ids) | |
for i,v in ipairs(ids) do | |
if #result ~= 0 then | |
if TCHORD.get_pos(v) ~= TCHORD.get_pos(result[#result][1]) then | |
result[#result+1] = {} | |
table.insert(result[#result], v) | |
else | |
table.insert(result[#result], v) | |
end | |
else | |
result[#result+1] = {} | |
table.insert(result[#result], v) | |
end | |
end | |
return result | |
end, | |
precompute_fingers = function(nodes,j) | |
local f_index = (TCHORD.get_pos(me) + 2^(j-1))%2^TCHORD.t | |
local first = 0 | |
for i,v in ipairs(nodes) do | |
if #v ~= 0 then first = i break end | |
end | |
local min = TCHORD.get_pos(nodes[first][1]) | |
for i = first, #nodes do | |
local value = TCHORD.get_pos(nodes[i][1]) | |
if value == f_index then return nodes[i] end | |
if value < min then min = value end | |
if min <= f_index and value > f_index then return nodes[i] end | |
end | |
return nodes[first] | |
end, | |
} | |
max_time = 10 | |
function terminator() | |
events.sleep(max_time) | |
os.exit() | |
end | |
function main() | |
-- this thread will be in charge of killing the node after max_time seconds | |
events.thread(terminator) | |
log:print("COMPLETE VIEW STATE "..me.id.." mandatory_entries:".. TCHORD.l.." optional_entries:"..TCHORD.t) | |
TCHORD.precompute_views() | |
log:print("Node position:", job.position) | |
log:print("Ideal leaves:") | |
for k,v in pairs(TCHORD.ideal_leaves) do | |
local hash=v[1] | |
log:print(k,hash, TCHORD.hash_2_position[hash]) | |
end | |
log:print("Ideal fingers:") | |
for k,v in pairs(TCHORD.ideal_fingers) do | |
local hash=v[1] | |
log:print(k,hash, TCHORD.hash_2_position[hash]) | |
end | |
end | |
events.thread(main) | |
events.loop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment