Last active
January 22, 2025 15:24
-
-
Save ilopX/3e1cf3d3ca1daf5f4da2838b03a77c2b to your computer and use it in GitHub Desktop.
Rust observer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#![allow(dead_code)] | |
use std::any::{Any, TypeId}; | |
use std::cell::RefCell; | |
use std::collections::{HashMap, VecDeque}; | |
use std::mem; | |
use std::rc::Rc; | |
fn main() { | |
let text = RefCell::new(String::new()); | |
let observer = Observer::new(); | |
let (red_listener, _, _, _) = UserEvents::new(&text, &observer).create_all(); | |
red_listener.deactivate(); | |
observer.send(RedEvent {}); | |
observer.send(FirstEvent { val: 555 }); | |
// FirstEvent sent to -> SecondEvent send to -> ThirdEvent change buffer | |
assert_eq!("first(555) second third", text.borrow().as_str()); | |
println!("{}", text.borrow()); | |
} | |
// This code implements a structure called Hub, designed as a centralized system for message | |
// processing. Hub performs the following tasks: | |
// | |
// 1. Message Reception: The structure accepts messages represented as other structures (objects) | |
// that may contain data or logic. | |
// | |
// 2. Message Processing: During processing, the Hub dispatches signals to subscribers that are | |
// registered to receive specific types of messages. | |
// | |
// 3. Nested Message Support: The structure allows messages to be sent within other messages being | |
// processed. These nested messages are queued and processed after the current layer of messages is | |
// completed. | |
// | |
// In summary, Hub provides an event-driven subscription and processing system with nested message | |
// handling capabilities, making it a versatile tool for complex asynchronous workflows. | |
//////////////////////////////////////////////////////////////////////////////////////////////////// | |
// main function implementation | |
struct UserEvents<'a> { | |
text: &'a RefCell<String>, | |
observer: &'a Observer, | |
} | |
impl<'a> UserEvents<'a> { | |
fn new(text: &'a RefCell<String>, observer: &'a Observer) -> Self { | |
Self { text, observer } | |
} | |
fn create_all(&self) -> (Listener, Listener, Listener, Listener) { | |
( | |
self.create_red(), | |
self.create_first(), | |
self.create_second(), | |
self.create_third(), | |
) | |
} | |
fn create_red(&self) -> Listener { | |
self.observer.listen::<RedEvent>(|_, _| { | |
self.text.borrow_mut().push_str("some text"); | |
}) | |
} | |
fn create_first(&self) -> Listener { | |
self.observer.listen::<FirstEvent>(|first_event, events| { | |
events.send(SecondEvent { | |
message: format!("first({})", first_event.val), | |
}); | |
}) | |
} | |
fn create_second(&self) -> Listener { | |
self.observer.listen::<SecondEvent>(|second_event, events| { | |
let new_val = format!("{} second", second_event.message); | |
events.send(ThirdEvent { | |
string_val: new_val, | |
}); | |
}) | |
} | |
fn create_third(&self) -> Listener { | |
self.observer.listen::<ThirdEvent>(|third_event, _| { | |
let new_val = format!("{} third", third_event.string_val); | |
self.text.borrow_mut().push_str(&new_val); | |
}) | |
} | |
} | |
struct FirstEvent { | |
val: u32, | |
} | |
struct SecondEvent { | |
message: String, | |
} | |
struct ThirdEvent { | |
string_val: String, | |
} | |
struct RedEvent {} | |
impl Event for FirstEvent {} | |
impl Event for SecondEvent {} | |
impl Event for ThirdEvent {} | |
impl Event for RedEvent {} | |
//////////////////////////////////////////////////////////////////////////////////////////////////// | |
// Hub implementation | |
trait Event: Any {} | |
const DEFAULT_BUFFER_SIZE: usize = 10; | |
struct Observer { | |
listener_map: Rc<RefCell<ListenerMap>>, | |
} | |
impl Observer { | |
pub fn new() -> Self { | |
Self { | |
listener_map: Rc::new(RefCell::new(ListenerMap::new(DEFAULT_BUFFER_SIZE))), | |
} | |
} | |
pub fn listen<T: Event>(&self, listener_fn: impl FnMut(&Box<T>, &mut EventPool)) -> Listener { | |
let new_listener = Listener::new(listener_fn); | |
new_listener.activate(self); | |
new_listener | |
} | |
pub fn remove_listener(&self, listener: &Listener) { | |
listener.deactivate(); | |
} | |
pub fn send(&self, event: impl Event) { | |
self.listener_map.borrow_mut().send(event); | |
} | |
} | |
struct ListenerMap { | |
buffer_size: usize, | |
listeners: HashMap<TypeId, Vec<Listener>>, | |
} | |
impl ListenerMap { | |
fn new(buffer_size: usize) -> Self { | |
Self { | |
buffer_size, | |
listeners: Default::default(), | |
} | |
} | |
fn add(&mut self, new_listener: &Listener) { | |
self.add_listener(new_listener.clone()); | |
} | |
fn add_listener(&mut self, new_listener: Listener) { | |
let key = new_listener.event_type(); | |
match self.listeners.get_mut(&key) { | |
Some(list) => list.push(new_listener), | |
None => { | |
let new_vec_listeners = Vec::from_listener(new_listener, self.buffer_size); | |
self.listeners.insert(key, new_vec_listeners); | |
} | |
} | |
} | |
fn remove(&mut self, listener: &Listener) { | |
self.listeners | |
.entry(listener.event_type()) | |
.and_modify(|list| list.remove_first(listener)); | |
} | |
fn send(&mut self, event: impl Event) { | |
let mut event_pool = EventPool::from(event); | |
while let Some(event) = event_pool.pop() { | |
self.call_listeners_for(event, &mut event_pool); | |
} | |
} | |
#[inline] | |
fn call_listeners_for(&mut self, event: Box<dyn Event>, mut event_pool: &mut EventPool) { | |
let event_type = (*event).type_id(); | |
if let Some(listeners) = self.listeners.get(&event_type) { | |
listeners.notify_all(&event, &mut event_pool); | |
} | |
} | |
} | |
trait VecListeners<T: PartialEq> { | |
fn notify_all(&self, event: &Box<dyn Event>, pool: &mut EventPool); | |
fn remove_first(&mut self, element: &T); | |
fn from_listener(listener: Listener, buffer_size: usize) -> Vec<Listener> { | |
let mut new_list = Vec::with_capacity(buffer_size); | |
new_list.push(listener); | |
new_list | |
} | |
} | |
impl VecListeners<Listener> for Vec<Listener> { | |
fn notify_all(&self, event: &Box<dyn Event>, mut event_pool: &mut EventPool) { | |
for listener in self { | |
listener.notify(&event, &mut event_pool); | |
} | |
} | |
fn remove_first(&mut self, element: &Listener) { | |
let pos = self.iter().position(|a| a == element); | |
if let Some(index) = pos { | |
self.remove(index); | |
} | |
} | |
} | |
#[cfg(test)] | |
mod observer_tests { | |
use crate::{Event, Observer}; | |
use std::cell::{Cell, RefCell}; | |
#[test] | |
fn add_listener_and_send() { | |
let bool_val = Cell::new(false); | |
let observer = Observer::new(); | |
observer.listen::<bool>(|val, _| bool_val.set(**val)); | |
observer.send(true); | |
assert_eq!(bool_val.get(), true); | |
} | |
#[test] | |
fn listen_and_send_and_check_event_args() { | |
let counter = RefCell::new(0); | |
let observer = Observer::new(); | |
struct SomeEvent(u32); | |
impl Event for SomeEvent {} | |
observer.listen::<SomeEvent>(|first_event, _| { | |
counter.replace(first_event.0); | |
}); | |
observer.send(SomeEvent(123456)); | |
assert_eq!(*counter.borrow(), 123456); | |
} | |
#[test] | |
fn listen_and_remove_listener_and_send() { | |
let bool_val = Cell::new(false); | |
let observer = Observer::new(); | |
let listener = observer.listen::<u32>(|_, _| { | |
bool_val.replace(true); | |
}); | |
listener.deactivate(); | |
observer.send(100); | |
assert_eq!(bool_val.get(), false); | |
} | |
#[test] | |
fn send_from_other_event() { | |
let text = RefCell::new(String::from("")); | |
let observer = Observer::new(); | |
observer.listen::<String>(|event, events| { | |
text.borrow_mut().push_str(event); | |
events.send(true); | |
}); | |
observer.listen::<bool>(|event, events| { | |
let mut result = text.borrow_mut(); | |
result.push_str(&event.to_string()); | |
result.push_str(", "); | |
events.send(100); | |
}); | |
observer.listen::<u32>(|event, _| { | |
let mut result = text.borrow_mut(); | |
result.push_str(&event.to_string()); | |
}); | |
observer.send("Hello, ".to_string()); | |
assert_eq!(*text.borrow(), "Hello, true, 100"); | |
} | |
#[test] | |
fn looping_send() { | |
let vec = RefCell::new(vec![]); | |
let observer = Observer::new(); | |
struct Foo; | |
impl Event for Foo {} | |
struct Bar; | |
impl Event for Bar {} | |
observer.listen::<Foo>(|_, events| { | |
vec.borrow_mut().push("foo"); | |
events.send(Bar); | |
}); | |
observer.listen::<Bar>(|_, events| { | |
let mut result = vec.borrow_mut(); | |
result.push("bar"); | |
if result.len() < 6 { | |
events.send(Foo); | |
} | |
}); | |
observer.send(Foo); | |
assert_eq!( | |
*vec.borrow(), | |
vec!["foo", "bar", "foo", "bar", "foo", "bar"] | |
); | |
} | |
#[test] | |
fn several_identical_events() { | |
let counter = Cell::new(0); | |
let observer = Observer::new(); | |
observer.listen::<bool>(|_, _| counter.set(counter.get() + 1)); | |
observer.listen::<bool>(|event, _| { | |
if **event == false { | |
counter.set(counter.get() + 1) | |
} | |
}); | |
observer.listen::<bool>(|_, _| counter.set(counter.get() + 1)); | |
observer.send(true); | |
assert_eq!(counter.get(), 2); | |
} | |
impl Event for bool {} | |
impl Event for u32 {} | |
impl Event for String {} | |
} | |
type DynEventFn = Box<dyn FnMut(&Box<dyn Event>, &mut EventPool)>; | |
struct ListenerRc { | |
event_type: TypeId, | |
parent: Option<Rc<RefCell<ListenerMap>>>, | |
fun: DynEventFn, | |
} | |
struct Listener { | |
rc: Rc<RefCell<ListenerRc>>, | |
} | |
impl Listener { | |
pub fn new<T: Event>(listener_fn: impl FnMut(&Box<T>, &mut EventPool)) -> Self { | |
let rc = ListenerRc { | |
parent: None, | |
event_type: TypeId::of::<T>(), | |
fun: Self::convert_to_dyn_event_fn(listener_fn), | |
}; | |
Self { | |
rc: Rc::new(RefCell::new(rc)), | |
} | |
} | |
#[inline] | |
pub fn notify(&self, arg: &Box<dyn Event>, event_pool: &mut EventPool) { | |
(self.rc.borrow_mut().fun)(arg, event_pool); | |
} | |
#[inline] | |
fn activate(&self, future_parent: &Observer) { | |
self.deactivate(); | |
let future_parent = &future_parent.listener_map; | |
self.rc.borrow_mut().parent = Some(Rc::clone(future_parent)); | |
future_parent.borrow_mut().add(self); | |
} | |
fn deactivate(&self) { | |
let parent = self.rc.borrow_mut().parent.take(); | |
if let Some(parent) = parent { | |
parent.borrow_mut().remove(self); | |
} | |
} | |
fn event_type(&self) -> TypeId { | |
self.rc.borrow().event_type.clone() | |
} | |
fn convert_to_dyn_event_fn<T: Event>( | |
listener_fn: impl FnMut(&Box<T>, &mut EventPool), | |
) -> DynEventFn { | |
let new_listener = Box::new(listener_fn); | |
unsafe { | |
mem::transmute::<Box<dyn FnMut(&Box<T>, &mut EventPool)>, DynEventFn>(new_listener) | |
} | |
} | |
} | |
impl Clone for Listener { | |
fn clone(&self) -> Self { | |
let is_second_clone_exists = Rc::strong_count(&self.rc) == 2; | |
if is_second_clone_exists { | |
panic!(""); | |
} | |
Self { | |
rc: self.rc.clone(), | |
} | |
} | |
} | |
impl PartialEq for Listener { | |
fn eq(&self, other: &Self) -> bool { | |
Rc::ptr_eq(&self.rc, &other.rc) | |
} | |
} | |
#[cfg(test)] | |
mod listener_tests { | |
use crate::{Event, EventPool, Listener, Observer, VecListeners}; | |
#[test] | |
fn new_listener() { | |
let mut text = String::new(); | |
let listener = Listener::new::<bool>(|event, _| { | |
if **event { | |
text.push_str("Its work"); | |
} | |
}); | |
notify(listener, true); | |
assert_eq!("Its work", text.as_str()); | |
} | |
#[test] | |
fn to_vec() { | |
let mut is_done = false; | |
let listener = Listener::new::<bool>(|_, _| { | |
is_done = true; | |
}); | |
let mut list = Vec::from_listener(listener, 10); | |
let listener = list.pop().unwrap(); | |
notify(listener, true); | |
assert_eq!(true, is_done); | |
} | |
#[test] | |
fn deactivate() { | |
let mut is_ok = true; | |
let observer = Observer::new(); | |
let listener = observer.listen::<bool>(|_, _| is_ok = false); | |
listener.deactivate(); | |
observer.send(true); | |
assert_eq!(is_ok, true); | |
} | |
fn notify(listener: Listener, val: impl Event) { | |
listener.notify(&value_to_event(val), &mut EventPool::new()) | |
} | |
fn value_to_event(val: impl Event) -> Box<dyn Event> { | |
Box::new(val) as Box<dyn Event> | |
} | |
} | |
struct EventPool { | |
events: VecDeque<Box<dyn Event>>, | |
} | |
impl EventPool { | |
fn new() -> Self { | |
Self { | |
events: VecDeque::with_capacity(10), | |
} | |
} | |
fn from(event: impl Event) -> Self { | |
let mut events = VecDeque::<Box<dyn Event>>::new(); | |
events.push_back(Box::new(event)); | |
Self { | |
events, | |
..EventPool::new() | |
} | |
} | |
fn send(&mut self, event: impl Event) { | |
let event = Box::new(event); | |
self.events.push_back(event); | |
} | |
fn pop(&mut self) -> Option<Box<dyn Event>> { | |
self.events.pop_front() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment