Created
May 28, 2024 13:08
-
-
Save pool2win/71628bbe8318ecd5f5bb88fda32a196d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use tokio::sync::{mpsc, oneshot}; | |
use tokio::time; | |
struct MyActor { | |
receiver: mpsc::Receiver<ActorMessage>, | |
next_id: u32, | |
subscribers: Vec<ClientActorHandle>, | |
reader: mpsc::Receiver<u32>, | |
} | |
enum ActorMessage { | |
GetUniqueId { respond_to: oneshot::Sender<u32> }, | |
Subscribe { handle: ClientActorHandle }, | |
} | |
impl MyActor { | |
fn new(receiver: mpsc::Receiver<ActorMessage>, reader: mpsc::Receiver<u32>) -> Self { | |
MyActor { | |
receiver, | |
reader, | |
next_id: 0, | |
subscribers: vec![], | |
} | |
} | |
fn handle_message(&mut self, msg: ActorMessage) { | |
match msg { | |
ActorMessage::GetUniqueId { respond_to } => { | |
self.next_id += 1; | |
let _ = respond_to.send(self.next_id); | |
} | |
ActorMessage::Subscribe { handle } => self.subscribers.push(handle), | |
} | |
} | |
async fn update_subscribers(&self, data: u32) { | |
for subscriber in &self.subscribers { | |
subscriber.recv_update(data).await; | |
} | |
} | |
} | |
async fn run_my_actor(mut actor: MyActor) { | |
loop { | |
tokio::select! { | |
Some(message) = actor.receiver.recv() => { | |
actor.handle_message(message); | |
}, | |
Some(message) = actor.reader.recv() => { | |
actor.update_subscribers(message).await; | |
} | |
} | |
} | |
} | |
#[derive(Clone)] | |
pub struct MyActorHandle { | |
sender: mpsc::Sender<ActorMessage>, | |
} | |
impl MyActorHandle { | |
pub fn new(events_receiver: mpsc::Receiver<u32>) -> Self { | |
let (sender, receiver) = mpsc::channel(8); | |
let actor = MyActor::new(receiver, events_receiver); | |
tokio::spawn(run_my_actor(actor)); | |
Self { sender } | |
} | |
pub async fn get_unique_id(&self) -> u32 { | |
let (send, recv) = oneshot::channel(); | |
let msg = ActorMessage::GetUniqueId { respond_to: send }; | |
// Ignore send errors. If this send fails, so does the | |
// recv.await below. There's no reason to check the | |
// failure twice. | |
let _ = self.sender.send(msg).await; | |
recv.await.expect("Actor task has been killed") | |
} | |
pub async fn add_client_subscriber(&self, handle: ClientActorHandle) { | |
let msg = ActorMessage::Subscribe { handle }; | |
let _ = self.sender.send(msg).await; | |
} | |
} | |
struct ClientActor { | |
receiver: mpsc::Receiver<ClientActorMessage>, | |
id: u32, | |
} | |
enum ClientActorMessage { | |
UpdateValue { | |
value: u32, | |
respond_to: oneshot::Sender<u32>, | |
}, | |
} | |
impl ClientActor { | |
fn new(receiver: mpsc::Receiver<ClientActorMessage>, id: u32) -> Self { | |
ClientActor { receiver, id } | |
} | |
fn handle_message(&mut self, msg: ClientActorMessage) { | |
match msg { | |
ClientActorMessage::UpdateValue { value, respond_to } => { | |
println!("Client {} actor got {}", self.id, value); | |
let _ = respond_to.send(self.id); | |
} | |
} | |
} | |
} | |
async fn run_client_actor(mut actor: ClientActor) { | |
while let Some(msg) = actor.receiver.recv().await { | |
actor.handle_message(msg); | |
} | |
} | |
pub struct ClientActorHandle { | |
sender: mpsc::Sender<ClientActorMessage>, | |
} | |
impl ClientActorHandle { | |
pub fn new(id: u32) -> Self { | |
let (sender, receiver) = mpsc::channel(8); | |
let actor = ClientActor::new(receiver, id); | |
tokio::spawn(run_client_actor(actor)); | |
Self { sender } | |
} | |
pub async fn recv_update(&self, value: u32) -> u32 { | |
let (send, recv) = oneshot::channel(); | |
let msg = ClientActorMessage::UpdateValue { | |
value, | |
respond_to: send, | |
}; | |
let _ = self.sender.send(msg).await; | |
recv.await.expect("Client Actor task has been killed") | |
} | |
} | |
async fn start_producing_events(sender: mpsc::Sender<u32>) { | |
let mut interval = time::interval(time::Duration::from_millis(1000)); | |
loop { | |
interval.tick().await; | |
let _ = sender.send(100).await; | |
} | |
} | |
#[tokio::main] | |
async fn main() { | |
let (events_sender, events_receiver) = mpsc::channel::<u32>(8); | |
let handle = MyActorHandle::new(events_receiver); | |
tokio::spawn(start_producing_events(events_sender)); | |
let id = handle.get_unique_id().await; | |
println!("ID = {}", id); | |
let id = handle.clone().get_unique_id().await; | |
println!("ID = {}", id); | |
let id = handle.clone().get_unique_id().await; | |
println!("ID = {}", id); | |
println!("Now subscribe"); | |
let client_handle = ClientActorHandle::new(1); | |
handle.clone().add_client_subscriber(client_handle).await; | |
let client_handle_2 = ClientActorHandle::new(2); | |
handle.clone().add_client_subscriber(client_handle_2).await; | |
loop {} // infinite loop to let MyActor produce receive events and send them on to the subscriber | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment