Created
November 1, 2019 02:53
-
-
Save deep5050/8cb771d2bbf8af3571e08f8f119c456a to your computer and use it in GitHub Desktop.
chandy lamport implementation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Dipankar Pal (25th sep 2019) | |
import queue | |
class node(object): | |
def __init__(self, name, balance): | |
self.name = name | |
self.temp_state = [] | |
self.recorded_state = [] | |
self.is_recorded = False | |
self.out_channel_list = [] | |
self.in_channel_list = [] | |
self.balance = balance | |
self.msg_count = 0 | |
self.last_marker_from = None | |
def add_channel(self, channel): | |
self.channel_list.append(channel) | |
def send_marker(self, except_): | |
# except_ = self.last_marker_from | |
for chnl in self.out_channel_list: | |
if except_ != None: | |
if chnl.rcvr.name == except_.name: # send marker to everyone except to the channel through which the | |
# last marker was received | |
continue | |
marker = message((str(self.name) + "_" + str(self.msg_count)), self, chnl.rcvr, 1) # it's a marker | |
marker.msg(-1) | |
marker.send(self, chnl.rcvr, chnl) | |
self.msg_count += 1 | |
self.temp_state.append(marker) # temporarily make an entry for this message | |
def record_state(self): | |
self.recorded_state = True | |
self.send_marker(None) | |
def send_msg(self, rcvr_, val): | |
temp_msg = message((str(self.name) + "_" + str(self.msg_count)), self, rcvr_, 0) | |
temp_msg.set_type(0) # setting type to send | |
temp_msg.msg(val) | |
self.msg_count += 1 | |
# selecting channel automatically | |
i = 0 | |
for index in range(len(self.out_channel_list)): | |
if self.out_channel_list[index].rcvr.name == rcvr_.name: | |
i = index | |
temp_msg.send(self, rcvr_, self.out_channel_list[i]) | |
self.balance -= val | |
self.out_channel_list[i].message_queue.put(temp_msg) | |
self.temp_state.append(temp_msg) # temporarily make an entry for this message | |
def receive_msg(self, sendr_): | |
global target_channel, mssg | |
global target_channel, mssg | |
print("Recived a message") | |
# check for the input queue for the first message | |
global target_channel, mssg | |
for chnl in self.in_channel_list: | |
if chnl.sender.name == sendr_.name: | |
target_channel = chnl | |
if target_channel.message_queue.qsize() != 0: | |
mssg = target_channel.message_queue.get_nowait() | |
if mssg == None: return | |
# this message is not a marker then fine | |
if mssg.marker == 0: | |
val = mssg.msg | |
self.balance += val | |
self.temp_state.append(mssg) | |
target_channel.local_state.append(mssg) | |
print("Application message : appended") | |
else: | |
print("Got a marker") | |
# if this is a marker then | |
# if thi9s is the first marker | |
if not self.recorded_state: | |
self.recorded_state = True | |
self.last_marker_from = target_channel.sender | |
# record the state of channel as empty | |
target_channel.local_state.clear() | |
# sends the marker ro every one else | |
self.send_marker(target_channel) | |
# receive messages on every incoming channel except the current channel | |
for channel_ in self.in_channel_list: | |
if channel_.sender.name == target_channel.sender: | |
continue | |
# get the message queue | |
for _ in range(channel_.message_queue.qsize()): | |
mssg = channel_.message_queue.get_nowait() | |
self.receive_msg(channel_.sender) | |
else: | |
# if this process itself hast not recorded it's state yet record it now | |
# record this channel's state as set of messages sent after the this process has recorded it's state and | |
# before this process received the marker | |
chnl.temp_state.append(chnl.message_queue) | |
print("Recording state:->", self.name) | |
########################################### | |
class channel(object): | |
def __init__(self, sender, rcvr): | |
self.sender = sender | |
self.rcvr = rcvr | |
self.message_queue = queue.Queue(maxsize=20) | |
self.temp_state = [] | |
# adding the channel to sender list | |
self.sender.out_channel_list.append(self) | |
self.rcvr.in_channel_list.append(self) | |
self.local_state = [] | |
class message(object): | |
def __init__(self): | |
pass | |
def __init__(self, id, sender_, rcvr_, | |
marker=0): # marker represents that it is a snapshot request, by default False | |
self.id = id | |
self.type = -1 | |
self.marker = marker | |
self.sender = sender_ | |
self.rcvr = rcvr_ | |
def set_type(self, type): | |
if type == 0: # send | |
self.type = 0 | |
else: | |
self.type = 1 | |
def msg(self, msg): | |
self.msg = msg | |
def send(self, sender, rcvr, channel): | |
pass | |
process_list = [] | |
process_1 = node(1, 50) | |
process_2 = node(2, 50) | |
process_3 = node(3, 50) | |
process_4 = node(4, 50) | |
process_list.append(process_1) | |
process_list.append(process_2) | |
process_list.append(process_3) | |
process_list.append(process_4) | |
channel_list = [] | |
channel_12 = channel(process_1, process_2) | |
channel_13 = channel(process_1, process_3) | |
channel_14 = channel(process_1, process_4) | |
channel_21 = channel(process_2, process_1) | |
channel_24 = channel(process_2, process_4) | |
channel_32 = channel(process_3, process_2) | |
channel_43 = channel(process_4, process_3) | |
channel_42 = channel(process_4, process_2) | |
channel_list.append(channel_12) | |
channel_list.append(channel_13) | |
channel_list.append(channel_14) | |
channel_list.append(channel_21) | |
channel_list.append(channel_24) | |
channel_list.append(channel_32) | |
channel_list.append(channel_43) | |
channel_list.append(channel_42) | |
process_1.record_state() | |
process_1.send_msg(process_2, 5) | |
process_2.receive_msg(process_1) | |
process_1.send_msg(process_2, 10) | |
process_1.send_msg(process_3, 10) | |
process_3.receive_msg(process_1) | |
process_2.receive_msg(process_1) | |
process_2.send_msg(process_4, 10) | |
process_4.receive_msg(process_2) | |
process_2.record_state() | |
process_2.send_msg(process_3, 5) | |
# process_3.receive_msg(process_2) | |
process_3.send_msg(process_2, 3) | |
process_2.receive_msg(process_4) | |
#################### GLOBAL SNAPSHOT ############### | |
print("--------------- process state ---------------- \n") | |
for process in process_list: | |
# print(process.temp_state) | |
print("p_name:-> ", process.name, ) | |
print("balance: ", process.balance) | |
print("message:") | |
for mssg in process.temp_state: | |
print("{ id:", mssg.id, ", marker:", mssg.marker, " to/from process:", mssg.rcvr.name, ", msg:", mssg.msg, | |
" },") | |
for channel in process.in_channel_list: | |
print("channel state:") | |
for mssg in channel.local_state: | |
print("{ id:", mssg.id, ", marker:", mssg.marker, " to/from process:", mssg.rcvr.name, ", msg:", mssg.msg, | |
" },") | |
print() | |
print(".................") | |
# print("--------------- channels state --------------") | |
# for channel_ in channel_list: | |
# print("channel_",channel_.sender.name,channel_.rcvr.name) | |
# print("message:") | |
# for msg in channel_.local_state: | |
# msssg = msg.message_queue.get_nowait() | |
# print("{ id:", mssg.id, ", marker:", mssg.marker, " to/from process:", mssg.rcvr.name, ", msg:", mssg.msg, | |
# " },") | |
# | |
# print() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment