Created
July 11, 2022 08:13
-
-
Save fairjm/3289934e247d8521a2d0cdfaf20c0b0f to your computer and use it in GitHub Desktop.
classical_paxos.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::{ | |
sync::mpsc::{channel, Receiver, Sender}, | |
thread::{self, JoinHandle}, time::Duration, | |
}; | |
#[derive(Debug)] | |
enum Request { | |
PrepareRequest { | |
rnd: i32, | |
reply_to: Sender<Response>, | |
}, | |
AcceptRequest { | |
v: String, | |
rnd: i32, | |
reply_to: Sender<Response>, | |
}, | |
} | |
#[derive(Debug)] | |
enum Response { | |
PrepareResponse { | |
id: i32, | |
last_rnd: i32, | |
v: Option<String>, | |
vrnd: Option<i32>, | |
}, | |
AcceptResponse { | |
id: i32, | |
ok: bool, | |
}, | |
} | |
#[derive(Debug)] | |
struct Acceptor { | |
id: i32, | |
last_rnd: i32, | |
v: Option<String>, | |
vrnd: Option<i32>, | |
recv: Receiver<Request>, | |
} | |
impl Acceptor { | |
fn new(id: i32, recv: Receiver<Request>) -> Acceptor { | |
Acceptor { | |
id, | |
last_rnd: 0, | |
v: None, | |
vrnd: None, | |
recv, | |
} | |
} | |
fn run(self: &mut Self) { | |
match self.recv.recv().unwrap() { | |
Request::PrepareRequest { rnd, reply_to } => { | |
let last_rnd = self.last_rnd; | |
if rnd > self.last_rnd { | |
self.last_rnd = rnd | |
} | |
reply_to | |
.send(Response::PrepareResponse { | |
id: self.id, | |
last_rnd, | |
v: self.v.clone(), | |
vrnd: self.vrnd.clone(), | |
}) | |
.unwrap() | |
} | |
Request::AcceptRequest { v, rnd, reply_to } => { | |
if rnd != self.last_rnd { | |
println!( | |
"acceptor-{} - rnd({rnd}) not equals to last rnd({})", | |
self.id, self.last_rnd | |
); | |
reply_to | |
.send(Response::AcceptResponse { | |
id: self.id, | |
ok: false, | |
}) | |
.unwrap() | |
} else { | |
println!("acceptor-{} - accept value.v:{v}, vrnd:{rnd}", self.id); | |
self.v = Option::Some(v); | |
self.vrnd = Option::Some(rnd); | |
reply_to | |
.send(Response::AcceptResponse { | |
id: self.id, | |
ok: true, | |
}) | |
.unwrap(); | |
} | |
} | |
} | |
} | |
} | |
#[derive(Debug)] | |
struct Proposer { | |
id: i32, | |
rnd: i32, | |
acceptors: Vec<Sender<Request>>, | |
id_sender: Sender<Sender<i32>>, | |
} | |
impl Proposer { | |
fn new(id: i32, id_sender: Sender<Sender<i32>>, acceptors: Vec<Sender<Request>>) -> Proposer { | |
Proposer { | |
id, | |
rnd: Proposer::get_id(id_sender.clone()), | |
acceptors, | |
id_sender, | |
} | |
} | |
fn get_id(id_sender: Sender<Sender<i32>>) -> i32 { | |
let (sender, recv) = channel(); | |
id_sender.send(sender).unwrap(); | |
recv.recv().unwrap() | |
} | |
fn get_quorum(&self) -> &[Sender<Request>] { | |
let n = self.acceptors.len(); | |
let idx = n / 2 + 1; | |
&self.acceptors[0..idx] | |
} | |
fn propose(&mut self, v: String) { | |
let mut update_rnd = false; | |
loop { | |
if update_rnd { | |
update_rnd = false; | |
self.rnd = Proposer::get_id(self.id_sender.clone()); | |
} | |
let quorum = self.get_quorum(); | |
// phase 1 | |
let mut vrnd_tmp = -1; | |
let mut v_tmp = None; | |
for i in quorum { | |
let (sender, recv) = channel(); | |
let request = Request::PrepareRequest { | |
rnd: self.rnd, | |
reply_to: sender, | |
}; | |
i.send(request).unwrap(); | |
if let Response::PrepareResponse { | |
id, | |
last_rnd, | |
v, | |
vrnd, | |
} = recv.recv().unwrap() | |
{ | |
println!( | |
"proposer-{} : rnd({}) send to acceptor({})", | |
self.id, self.rnd, id | |
); | |
if last_rnd > self.rnd { | |
println!( | |
"proposer-{} : rnd({}) is less than last acceptor({}) rnd({}).retry", | |
self.id, self.rnd, id, last_rnd | |
); | |
// retry | |
update_rnd = true; | |
break; | |
} | |
if let Some(resp_vrnd) = vrnd { | |
if resp_vrnd > vrnd_tmp { | |
vrnd_tmp = resp_vrnd; | |
v_tmp = v; | |
} | |
} | |
} | |
} | |
if update_rnd { | |
continue; | |
} | |
// phase 2 | |
let final_v = v_tmp.unwrap_or(v.clone()); | |
for i in quorum { | |
let (sender, recv) = channel(); | |
let request = Request::AcceptRequest { | |
v: final_v.clone(), | |
rnd: self.rnd, | |
reply_to: sender, | |
}; | |
i.send(request).unwrap(); | |
if let Response::AcceptResponse { id, ok } = recv.recv().unwrap() { | |
if !ok { | |
println!("proposer-{} - acceptor({id}) is not ok. retry", self.id); | |
update_rnd = true; | |
} else { | |
println!("proposer-{} - acceptor({id}) is ok.", self.id); | |
} | |
} | |
} | |
if update_rnd { | |
continue; | |
} | |
// done | |
println!("proposer-{} - finish", self.id); | |
break; | |
} | |
} | |
} | |
fn main() { | |
let (id_send, id_recv) = channel(); | |
let _handler = thread::spawn(move || { | |
let mut i = 1; | |
loop { | |
let s: Sender<i32> = id_recv.recv().unwrap(); | |
if let Err(msg) = s.send(i) { | |
eprintln!("{}", msg); | |
} | |
i = i + 1; | |
} | |
}); | |
let mut acceptor_senders = Vec::new(); | |
let mut acceptor_join_handler = Vec::new(); | |
for i in 0..3 { | |
let (acceptor_sender, acceptor_recv) = channel(); | |
let join_handler = run_acceptor(i, acceptor_recv); | |
acceptor_senders.push(acceptor_sender); | |
acceptor_join_handler.push(join_handler); | |
} | |
let id_send_clone = id_send.clone(); | |
let acceptor_senders_clone = acceptor_senders.clone(); | |
thread::spawn(move || { | |
let mut proposer1 = Proposer::new(0, id_send_clone, acceptor_senders_clone); | |
proposer1.propose("v".to_owned()) | |
}); | |
let id_send_clone = id_send.clone(); | |
let acceptor_senders_clone = acceptor_senders.clone(); | |
thread::spawn(move || { | |
let mut proposer2 = Proposer::new(1, id_send_clone, acceptor_senders_clone); | |
proposer2.propose("v2".to_owned()); | |
}); | |
thread::sleep(Duration::from_secs(10)); | |
} | |
fn run_acceptor(id: i32, s: Receiver<Request>) -> JoinHandle<()> { | |
let mut acceptor = Acceptor::new(id, s); | |
thread::spawn(move || loop { | |
acceptor.run(); | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment