Skip to content

Instantly share code, notes, and snippets.

@fairjm
Created July 11, 2022 08:13
Show Gist options
  • Save fairjm/3289934e247d8521a2d0cdfaf20c0b0f to your computer and use it in GitHub Desktop.
Save fairjm/3289934e247d8521a2d0cdfaf20c0b0f to your computer and use it in GitHub Desktop.
classical_paxos.rs
use std::{
sync::mpsc::{channel, Receiver, Sender},
thread::{self, JoinHandle}, time::Duration,
};
#[derive(Debug)]
enum Request {
PrepareRequest {
rnd: i32,
reply_to: Sender<Response>,
},
AcceptRequest {
v: String,
rnd: i32,
reply_to: Sender<Response>,
},
}
#[derive(Debug)]
enum Response {
PrepareResponse {
id: i32,
last_rnd: i32,
v: Option<String>,
vrnd: Option<i32>,
},
AcceptResponse {
id: i32,
ok: bool,
},
}
#[derive(Debug)]
struct Acceptor {
id: i32,
last_rnd: i32,
v: Option<String>,
vrnd: Option<i32>,
recv: Receiver<Request>,
}
impl Acceptor {
fn new(id: i32, recv: Receiver<Request>) -> Acceptor {
Acceptor {
id,
last_rnd: 0,
v: None,
vrnd: None,
recv,
}
}
fn run(self: &mut Self) {
match self.recv.recv().unwrap() {
Request::PrepareRequest { rnd, reply_to } => {
let last_rnd = self.last_rnd;
if rnd > self.last_rnd {
self.last_rnd = rnd
}
reply_to
.send(Response::PrepareResponse {
id: self.id,
last_rnd,
v: self.v.clone(),
vrnd: self.vrnd.clone(),
})
.unwrap()
}
Request::AcceptRequest { v, rnd, reply_to } => {
if rnd != self.last_rnd {
println!(
"acceptor-{} - rnd({rnd}) not equals to last rnd({})",
self.id, self.last_rnd
);
reply_to
.send(Response::AcceptResponse {
id: self.id,
ok: false,
})
.unwrap()
} else {
println!("acceptor-{} - accept value.v:{v}, vrnd:{rnd}", self.id);
self.v = Option::Some(v);
self.vrnd = Option::Some(rnd);
reply_to
.send(Response::AcceptResponse {
id: self.id,
ok: true,
})
.unwrap();
}
}
}
}
}
#[derive(Debug)]
struct Proposer {
id: i32,
rnd: i32,
acceptors: Vec<Sender<Request>>,
id_sender: Sender<Sender<i32>>,
}
impl Proposer {
fn new(id: i32, id_sender: Sender<Sender<i32>>, acceptors: Vec<Sender<Request>>) -> Proposer {
Proposer {
id,
rnd: Proposer::get_id(id_sender.clone()),
acceptors,
id_sender,
}
}
fn get_id(id_sender: Sender<Sender<i32>>) -> i32 {
let (sender, recv) = channel();
id_sender.send(sender).unwrap();
recv.recv().unwrap()
}
fn get_quorum(&self) -> &[Sender<Request>] {
let n = self.acceptors.len();
let idx = n / 2 + 1;
&self.acceptors[0..idx]
}
fn propose(&mut self, v: String) {
let mut update_rnd = false;
loop {
if update_rnd {
update_rnd = false;
self.rnd = Proposer::get_id(self.id_sender.clone());
}
let quorum = self.get_quorum();
// phase 1
let mut vrnd_tmp = -1;
let mut v_tmp = None;
for i in quorum {
let (sender, recv) = channel();
let request = Request::PrepareRequest {
rnd: self.rnd,
reply_to: sender,
};
i.send(request).unwrap();
if let Response::PrepareResponse {
id,
last_rnd,
v,
vrnd,
} = recv.recv().unwrap()
{
println!(
"proposer-{} : rnd({}) send to acceptor({})",
self.id, self.rnd, id
);
if last_rnd > self.rnd {
println!(
"proposer-{} : rnd({}) is less than last acceptor({}) rnd({}).retry",
self.id, self.rnd, id, last_rnd
);
// retry
update_rnd = true;
break;
}
if let Some(resp_vrnd) = vrnd {
if resp_vrnd > vrnd_tmp {
vrnd_tmp = resp_vrnd;
v_tmp = v;
}
}
}
}
if update_rnd {
continue;
}
// phase 2
let final_v = v_tmp.unwrap_or(v.clone());
for i in quorum {
let (sender, recv) = channel();
let request = Request::AcceptRequest {
v: final_v.clone(),
rnd: self.rnd,
reply_to: sender,
};
i.send(request).unwrap();
if let Response::AcceptResponse { id, ok } = recv.recv().unwrap() {
if !ok {
println!("proposer-{} - acceptor({id}) is not ok. retry", self.id);
update_rnd = true;
} else {
println!("proposer-{} - acceptor({id}) is ok.", self.id);
}
}
}
if update_rnd {
continue;
}
// done
println!("proposer-{} - finish", self.id);
break;
}
}
}
fn main() {
let (id_send, id_recv) = channel();
let _handler = thread::spawn(move || {
let mut i = 1;
loop {
let s: Sender<i32> = id_recv.recv().unwrap();
if let Err(msg) = s.send(i) {
eprintln!("{}", msg);
}
i = i + 1;
}
});
let mut acceptor_senders = Vec::new();
let mut acceptor_join_handler = Vec::new();
for i in 0..3 {
let (acceptor_sender, acceptor_recv) = channel();
let join_handler = run_acceptor(i, acceptor_recv);
acceptor_senders.push(acceptor_sender);
acceptor_join_handler.push(join_handler);
}
let id_send_clone = id_send.clone();
let acceptor_senders_clone = acceptor_senders.clone();
thread::spawn(move || {
let mut proposer1 = Proposer::new(0, id_send_clone, acceptor_senders_clone);
proposer1.propose("v".to_owned())
});
let id_send_clone = id_send.clone();
let acceptor_senders_clone = acceptor_senders.clone();
thread::spawn(move || {
let mut proposer2 = Proposer::new(1, id_send_clone, acceptor_senders_clone);
proposer2.propose("v2".to_owned());
});
thread::sleep(Duration::from_secs(10));
}
fn run_acceptor(id: i32, s: Receiver<Request>) -> JoinHandle<()> {
let mut acceptor = Acceptor::new(id, s);
thread::spawn(move || loop {
acceptor.run();
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment