Skip to content

Instantly share code, notes, and snippets.

@pool2win
Created May 28, 2024 13:08
Show Gist options
  • Save pool2win/71628bbe8318ecd5f5bb88fda32a196d to your computer and use it in GitHub Desktop.
Save pool2win/71628bbe8318ecd5f5bb88fda32a196d to your computer and use it in GitHub Desktop.
use tokio::sync::{mpsc, oneshot};
use tokio::time;
struct MyActor {
receiver: mpsc::Receiver<ActorMessage>,
next_id: u32,
subscribers: Vec<ClientActorHandle>,
reader: mpsc::Receiver<u32>,
}
enum ActorMessage {
GetUniqueId { respond_to: oneshot::Sender<u32> },
Subscribe { handle: ClientActorHandle },
}
impl MyActor {
fn new(receiver: mpsc::Receiver<ActorMessage>, reader: mpsc::Receiver<u32>) -> Self {
MyActor {
receiver,
reader,
next_id: 0,
subscribers: vec![],
}
}
fn handle_message(&mut self, msg: ActorMessage) {
match msg {
ActorMessage::GetUniqueId { respond_to } => {
self.next_id += 1;
let _ = respond_to.send(self.next_id);
}
ActorMessage::Subscribe { handle } => self.subscribers.push(handle),
}
}
async fn update_subscribers(&self, data: u32) {
for subscriber in &self.subscribers {
subscriber.recv_update(data).await;
}
}
}
async fn run_my_actor(mut actor: MyActor) {
loop {
tokio::select! {
Some(message) = actor.receiver.recv() => {
actor.handle_message(message);
},
Some(message) = actor.reader.recv() => {
actor.update_subscribers(message).await;
}
}
}
}
#[derive(Clone)]
pub struct MyActorHandle {
sender: mpsc::Sender<ActorMessage>,
}
impl MyActorHandle {
pub fn new(events_receiver: mpsc::Receiver<u32>) -> Self {
let (sender, receiver) = mpsc::channel(8);
let actor = MyActor::new(receiver, events_receiver);
tokio::spawn(run_my_actor(actor));
Self { sender }
}
pub async fn get_unique_id(&self) -> u32 {
let (send, recv) = oneshot::channel();
let msg = ActorMessage::GetUniqueId { respond_to: send };
// Ignore send errors. If this send fails, so does the
// recv.await below. There's no reason to check the
// failure twice.
let _ = self.sender.send(msg).await;
recv.await.expect("Actor task has been killed")
}
pub async fn add_client_subscriber(&self, handle: ClientActorHandle) {
let msg = ActorMessage::Subscribe { handle };
let _ = self.sender.send(msg).await;
}
}
struct ClientActor {
receiver: mpsc::Receiver<ClientActorMessage>,
id: u32,
}
enum ClientActorMessage {
UpdateValue {
value: u32,
respond_to: oneshot::Sender<u32>,
},
}
impl ClientActor {
fn new(receiver: mpsc::Receiver<ClientActorMessage>, id: u32) -> Self {
ClientActor { receiver, id }
}
fn handle_message(&mut self, msg: ClientActorMessage) {
match msg {
ClientActorMessage::UpdateValue { value, respond_to } => {
println!("Client {} actor got {}", self.id, value);
let _ = respond_to.send(self.id);
}
}
}
}
async fn run_client_actor(mut actor: ClientActor) {
while let Some(msg) = actor.receiver.recv().await {
actor.handle_message(msg);
}
}
pub struct ClientActorHandle {
sender: mpsc::Sender<ClientActorMessage>,
}
impl ClientActorHandle {
pub fn new(id: u32) -> Self {
let (sender, receiver) = mpsc::channel(8);
let actor = ClientActor::new(receiver, id);
tokio::spawn(run_client_actor(actor));
Self { sender }
}
pub async fn recv_update(&self, value: u32) -> u32 {
let (send, recv) = oneshot::channel();
let msg = ClientActorMessage::UpdateValue {
value,
respond_to: send,
};
let _ = self.sender.send(msg).await;
recv.await.expect("Client Actor task has been killed")
}
}
async fn start_producing_events(sender: mpsc::Sender<u32>) {
let mut interval = time::interval(time::Duration::from_millis(1000));
loop {
interval.tick().await;
let _ = sender.send(100).await;
}
}
#[tokio::main]
async fn main() {
let (events_sender, events_receiver) = mpsc::channel::<u32>(8);
let handle = MyActorHandle::new(events_receiver);
tokio::spawn(start_producing_events(events_sender));
let id = handle.get_unique_id().await;
println!("ID = {}", id);
let id = handle.clone().get_unique_id().await;
println!("ID = {}", id);
let id = handle.clone().get_unique_id().await;
println!("ID = {}", id);
println!("Now subscribe");
let client_handle = ClientActorHandle::new(1);
handle.clone().add_client_subscriber(client_handle).await;
let client_handle_2 = ClientActorHandle::new(2);
handle.clone().add_client_subscriber(client_handle_2).await;
loop {} // infinite loop to let MyActor produce receive events and send them on to the subscriber
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment