Skip to content

Instantly share code, notes, and snippets.

@deep5050
Created November 1, 2019 02:53
Show Gist options
  • Save deep5050/8cb771d2bbf8af3571e08f8f119c456a to your computer and use it in GitHub Desktop.
Save deep5050/8cb771d2bbf8af3571e08f8f119c456a to your computer and use it in GitHub Desktop.
chandy lamport implementation
# Dipankar Pal (25th sep 2019)
import queue
class node(object):
def __init__(self, name, balance):
self.name = name
self.temp_state = []
self.recorded_state = []
self.is_recorded = False
self.out_channel_list = []
self.in_channel_list = []
self.balance = balance
self.msg_count = 0
self.last_marker_from = None
def add_channel(self, channel):
self.channel_list.append(channel)
def send_marker(self, except_):
# except_ = self.last_marker_from
for chnl in self.out_channel_list:
if except_ != None:
if chnl.rcvr.name == except_.name: # send marker to everyone except to the channel through which the
# last marker was received
continue
marker = message((str(self.name) + "_" + str(self.msg_count)), self, chnl.rcvr, 1) # it's a marker
marker.msg(-1)
marker.send(self, chnl.rcvr, chnl)
self.msg_count += 1
self.temp_state.append(marker) # temporarily make an entry for this message
def record_state(self):
self.recorded_state = True
self.send_marker(None)
def send_msg(self, rcvr_, val):
temp_msg = message((str(self.name) + "_" + str(self.msg_count)), self, rcvr_, 0)
temp_msg.set_type(0) # setting type to send
temp_msg.msg(val)
self.msg_count += 1
# selecting channel automatically
i = 0
for index in range(len(self.out_channel_list)):
if self.out_channel_list[index].rcvr.name == rcvr_.name:
i = index
temp_msg.send(self, rcvr_, self.out_channel_list[i])
self.balance -= val
self.out_channel_list[i].message_queue.put(temp_msg)
self.temp_state.append(temp_msg) # temporarily make an entry for this message
def receive_msg(self, sendr_):
global target_channel, mssg
global target_channel, mssg
print("Recived a message")
# check for the input queue for the first message
global target_channel, mssg
for chnl in self.in_channel_list:
if chnl.sender.name == sendr_.name:
target_channel = chnl
if target_channel.message_queue.qsize() != 0:
mssg = target_channel.message_queue.get_nowait()
if mssg == None: return
# this message is not a marker then fine
if mssg.marker == 0:
val = mssg.msg
self.balance += val
self.temp_state.append(mssg)
target_channel.local_state.append(mssg)
print("Application message : appended")
else:
print("Got a marker")
# if this is a marker then
# if thi9s is the first marker
if not self.recorded_state:
self.recorded_state = True
self.last_marker_from = target_channel.sender
# record the state of channel as empty
target_channel.local_state.clear()
# sends the marker ro every one else
self.send_marker(target_channel)
# receive messages on every incoming channel except the current channel
for channel_ in self.in_channel_list:
if channel_.sender.name == target_channel.sender:
continue
# get the message queue
for _ in range(channel_.message_queue.qsize()):
mssg = channel_.message_queue.get_nowait()
self.receive_msg(channel_.sender)
else:
# if this process itself hast not recorded it's state yet record it now
# record this channel's state as set of messages sent after the this process has recorded it's state and
# before this process received the marker
chnl.temp_state.append(chnl.message_queue)
print("Recording state:->", self.name)
###########################################
class channel(object):
def __init__(self, sender, rcvr):
self.sender = sender
self.rcvr = rcvr
self.message_queue = queue.Queue(maxsize=20)
self.temp_state = []
# adding the channel to sender list
self.sender.out_channel_list.append(self)
self.rcvr.in_channel_list.append(self)
self.local_state = []
class message(object):
def __init__(self):
pass
def __init__(self, id, sender_, rcvr_,
marker=0): # marker represents that it is a snapshot request, by default False
self.id = id
self.type = -1
self.marker = marker
self.sender = sender_
self.rcvr = rcvr_
def set_type(self, type):
if type == 0: # send
self.type = 0
else:
self.type = 1
def msg(self, msg):
self.msg = msg
def send(self, sender, rcvr, channel):
pass
process_list = []
process_1 = node(1, 50)
process_2 = node(2, 50)
process_3 = node(3, 50)
process_4 = node(4, 50)
process_list.append(process_1)
process_list.append(process_2)
process_list.append(process_3)
process_list.append(process_4)
channel_list = []
channel_12 = channel(process_1, process_2)
channel_13 = channel(process_1, process_3)
channel_14 = channel(process_1, process_4)
channel_21 = channel(process_2, process_1)
channel_24 = channel(process_2, process_4)
channel_32 = channel(process_3, process_2)
channel_43 = channel(process_4, process_3)
channel_42 = channel(process_4, process_2)
channel_list.append(channel_12)
channel_list.append(channel_13)
channel_list.append(channel_14)
channel_list.append(channel_21)
channel_list.append(channel_24)
channel_list.append(channel_32)
channel_list.append(channel_43)
channel_list.append(channel_42)
process_1.record_state()
process_1.send_msg(process_2, 5)
process_2.receive_msg(process_1)
process_1.send_msg(process_2, 10)
process_1.send_msg(process_3, 10)
process_3.receive_msg(process_1)
process_2.receive_msg(process_1)
process_2.send_msg(process_4, 10)
process_4.receive_msg(process_2)
process_2.record_state()
process_2.send_msg(process_3, 5)
# process_3.receive_msg(process_2)
process_3.send_msg(process_2, 3)
process_2.receive_msg(process_4)
#################### GLOBAL SNAPSHOT ###############
print("--------------- process state ---------------- \n")
for process in process_list:
# print(process.temp_state)
print("p_name:-> ", process.name, )
print("balance: ", process.balance)
print("message:")
for mssg in process.temp_state:
print("{ id:", mssg.id, ", marker:", mssg.marker, " to/from process:", mssg.rcvr.name, ", msg:", mssg.msg,
" },")
for channel in process.in_channel_list:
print("channel state:")
for mssg in channel.local_state:
print("{ id:", mssg.id, ", marker:", mssg.marker, " to/from process:", mssg.rcvr.name, ", msg:", mssg.msg,
" },")
print()
print(".................")
# print("--------------- channels state --------------")
# for channel_ in channel_list:
# print("channel_",channel_.sender.name,channel_.rcvr.name)
# print("message:")
# for msg in channel_.local_state:
# msssg = msg.message_queue.get_nowait()
# print("{ id:", mssg.id, ", marker:", mssg.marker, " to/from process:", mssg.rcvr.name, ", msg:", mssg.msg,
# " },")
#
# print()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment