Skip to content

Instantly share code, notes, and snippets.

@ilopX
Last active January 22, 2025 15:24
Show Gist options
  • Save ilopX/3e1cf3d3ca1daf5f4da2838b03a77c2b to your computer and use it in GitHub Desktop.
Save ilopX/3e1cf3d3ca1daf5f4da2838b03a77c2b to your computer and use it in GitHub Desktop.
Rust observer
#![allow(dead_code)]
use std::any::{Any, TypeId};
use std::cell::RefCell;
use std::collections::{HashMap, VecDeque};
use std::mem;
use std::rc::Rc;
fn main() {
let text = RefCell::new(String::new());
let observer = Observer::new();
let (red_listener, _, _, _) = UserEvents::new(&text, &observer).create_all();
red_listener.deactivate();
observer.send(RedEvent {});
observer.send(FirstEvent { val: 555 });
// FirstEvent sent to -> SecondEvent send to -> ThirdEvent change buffer
assert_eq!("first(555) second third", text.borrow().as_str());
println!("{}", text.borrow());
}
// This code implements a structure called Hub, designed as a centralized system for message
// processing. Hub performs the following tasks:
//
// 1. Message Reception: The structure accepts messages represented as other structures (objects)
// that may contain data or logic.
//
// 2. Message Processing: During processing, the Hub dispatches signals to subscribers that are
// registered to receive specific types of messages.
//
// 3. Nested Message Support: The structure allows messages to be sent within other messages being
// processed. These nested messages are queued and processed after the current layer of messages is
// completed.
//
// In summary, Hub provides an event-driven subscription and processing system with nested message
// handling capabilities, making it a versatile tool for complex asynchronous workflows.
////////////////////////////////////////////////////////////////////////////////////////////////////
// main function implementation
struct UserEvents<'a> {
text: &'a RefCell<String>,
observer: &'a Observer,
}
impl<'a> UserEvents<'a> {
fn new(text: &'a RefCell<String>, observer: &'a Observer) -> Self {
Self { text, observer }
}
fn create_all(&self) -> (Listener, Listener, Listener, Listener) {
(
self.create_red(),
self.create_first(),
self.create_second(),
self.create_third(),
)
}
fn create_red(&self) -> Listener {
self.observer.listen::<RedEvent>(|_, _| {
self.text.borrow_mut().push_str("some text");
})
}
fn create_first(&self) -> Listener {
self.observer.listen::<FirstEvent>(|first_event, events| {
events.send(SecondEvent {
message: format!("first({})", first_event.val),
});
})
}
fn create_second(&self) -> Listener {
self.observer.listen::<SecondEvent>(|second_event, events| {
let new_val = format!("{} second", second_event.message);
events.send(ThirdEvent {
string_val: new_val,
});
})
}
fn create_third(&self) -> Listener {
self.observer.listen::<ThirdEvent>(|third_event, _| {
let new_val = format!("{} third", third_event.string_val);
self.text.borrow_mut().push_str(&new_val);
})
}
}
struct FirstEvent {
val: u32,
}
struct SecondEvent {
message: String,
}
struct ThirdEvent {
string_val: String,
}
struct RedEvent {}
impl Event for FirstEvent {}
impl Event for SecondEvent {}
impl Event for ThirdEvent {}
impl Event for RedEvent {}
////////////////////////////////////////////////////////////////////////////////////////////////////
// Hub implementation
trait Event: Any {}
const DEFAULT_BUFFER_SIZE: usize = 10;
struct Observer {
listener_map: Rc<RefCell<ListenerMap>>,
}
impl Observer {
pub fn new() -> Self {
Self {
listener_map: Rc::new(RefCell::new(ListenerMap::new(DEFAULT_BUFFER_SIZE))),
}
}
pub fn listen<T: Event>(&self, listener_fn: impl FnMut(&Box<T>, &mut EventPool)) -> Listener {
let new_listener = Listener::new(listener_fn);
new_listener.activate(self);
new_listener
}
pub fn remove_listener(&self, listener: &Listener) {
listener.deactivate();
}
pub fn send(&self, event: impl Event) {
self.listener_map.borrow_mut().send(event);
}
}
struct ListenerMap {
buffer_size: usize,
listeners: HashMap<TypeId, Vec<Listener>>,
}
impl ListenerMap {
fn new(buffer_size: usize) -> Self {
Self {
buffer_size,
listeners: Default::default(),
}
}
fn add(&mut self, new_listener: &Listener) {
self.add_listener(new_listener.clone());
}
fn add_listener(&mut self, new_listener: Listener) {
let key = new_listener.event_type();
match self.listeners.get_mut(&key) {
Some(list) => list.push(new_listener),
None => {
let new_vec_listeners = Vec::from_listener(new_listener, self.buffer_size);
self.listeners.insert(key, new_vec_listeners);
}
}
}
fn remove(&mut self, listener: &Listener) {
self.listeners
.entry(listener.event_type())
.and_modify(|list| list.remove_first(listener));
}
fn send(&mut self, event: impl Event) {
let mut event_pool = EventPool::from(event);
while let Some(event) = event_pool.pop() {
self.call_listeners_for(event, &mut event_pool);
}
}
#[inline]
fn call_listeners_for(&mut self, event: Box<dyn Event>, mut event_pool: &mut EventPool) {
let event_type = (*event).type_id();
if let Some(listeners) = self.listeners.get(&event_type) {
listeners.notify_all(&event, &mut event_pool);
}
}
}
trait VecListeners<T: PartialEq> {
fn notify_all(&self, event: &Box<dyn Event>, pool: &mut EventPool);
fn remove_first(&mut self, element: &T);
fn from_listener(listener: Listener, buffer_size: usize) -> Vec<Listener> {
let mut new_list = Vec::with_capacity(buffer_size);
new_list.push(listener);
new_list
}
}
impl VecListeners<Listener> for Vec<Listener> {
fn notify_all(&self, event: &Box<dyn Event>, mut event_pool: &mut EventPool) {
for listener in self {
listener.notify(&event, &mut event_pool);
}
}
fn remove_first(&mut self, element: &Listener) {
let pos = self.iter().position(|a| a == element);
if let Some(index) = pos {
self.remove(index);
}
}
}
#[cfg(test)]
mod observer_tests {
use crate::{Event, Observer};
use std::cell::{Cell, RefCell};
#[test]
fn add_listener_and_send() {
let bool_val = Cell::new(false);
let observer = Observer::new();
observer.listen::<bool>(|val, _| bool_val.set(**val));
observer.send(true);
assert_eq!(bool_val.get(), true);
}
#[test]
fn listen_and_send_and_check_event_args() {
let counter = RefCell::new(0);
let observer = Observer::new();
struct SomeEvent(u32);
impl Event for SomeEvent {}
observer.listen::<SomeEvent>(|first_event, _| {
counter.replace(first_event.0);
});
observer.send(SomeEvent(123456));
assert_eq!(*counter.borrow(), 123456);
}
#[test]
fn listen_and_remove_listener_and_send() {
let bool_val = Cell::new(false);
let observer = Observer::new();
let listener = observer.listen::<u32>(|_, _| {
bool_val.replace(true);
});
listener.deactivate();
observer.send(100);
assert_eq!(bool_val.get(), false);
}
#[test]
fn send_from_other_event() {
let text = RefCell::new(String::from(""));
let observer = Observer::new();
observer.listen::<String>(|event, events| {
text.borrow_mut().push_str(event);
events.send(true);
});
observer.listen::<bool>(|event, events| {
let mut result = text.borrow_mut();
result.push_str(&event.to_string());
result.push_str(", ");
events.send(100);
});
observer.listen::<u32>(|event, _| {
let mut result = text.borrow_mut();
result.push_str(&event.to_string());
});
observer.send("Hello, ".to_string());
assert_eq!(*text.borrow(), "Hello, true, 100");
}
#[test]
fn looping_send() {
let vec = RefCell::new(vec![]);
let observer = Observer::new();
struct Foo;
impl Event for Foo {}
struct Bar;
impl Event for Bar {}
observer.listen::<Foo>(|_, events| {
vec.borrow_mut().push("foo");
events.send(Bar);
});
observer.listen::<Bar>(|_, events| {
let mut result = vec.borrow_mut();
result.push("bar");
if result.len() < 6 {
events.send(Foo);
}
});
observer.send(Foo);
assert_eq!(
*vec.borrow(),
vec!["foo", "bar", "foo", "bar", "foo", "bar"]
);
}
#[test]
fn several_identical_events() {
let counter = Cell::new(0);
let observer = Observer::new();
observer.listen::<bool>(|_, _| counter.set(counter.get() + 1));
observer.listen::<bool>(|event, _| {
if **event == false {
counter.set(counter.get() + 1)
}
});
observer.listen::<bool>(|_, _| counter.set(counter.get() + 1));
observer.send(true);
assert_eq!(counter.get(), 2);
}
impl Event for bool {}
impl Event for u32 {}
impl Event for String {}
}
type DynEventFn = Box<dyn FnMut(&Box<dyn Event>, &mut EventPool)>;
struct ListenerRc {
event_type: TypeId,
parent: Option<Rc<RefCell<ListenerMap>>>,
fun: DynEventFn,
}
struct Listener {
rc: Rc<RefCell<ListenerRc>>,
}
impl Listener {
pub fn new<T: Event>(listener_fn: impl FnMut(&Box<T>, &mut EventPool)) -> Self {
let rc = ListenerRc {
parent: None,
event_type: TypeId::of::<T>(),
fun: Self::convert_to_dyn_event_fn(listener_fn),
};
Self {
rc: Rc::new(RefCell::new(rc)),
}
}
#[inline]
pub fn notify(&self, arg: &Box<dyn Event>, event_pool: &mut EventPool) {
(self.rc.borrow_mut().fun)(arg, event_pool);
}
#[inline]
fn activate(&self, future_parent: &Observer) {
self.deactivate();
let future_parent = &future_parent.listener_map;
self.rc.borrow_mut().parent = Some(Rc::clone(future_parent));
future_parent.borrow_mut().add(self);
}
fn deactivate(&self) {
let parent = self.rc.borrow_mut().parent.take();
if let Some(parent) = parent {
parent.borrow_mut().remove(self);
}
}
fn event_type(&self) -> TypeId {
self.rc.borrow().event_type.clone()
}
fn convert_to_dyn_event_fn<T: Event>(
listener_fn: impl FnMut(&Box<T>, &mut EventPool),
) -> DynEventFn {
let new_listener = Box::new(listener_fn);
unsafe {
mem::transmute::<Box<dyn FnMut(&Box<T>, &mut EventPool)>, DynEventFn>(new_listener)
}
}
}
impl Clone for Listener {
fn clone(&self) -> Self {
let is_second_clone_exists = Rc::strong_count(&self.rc) == 2;
if is_second_clone_exists {
panic!("");
}
Self {
rc: self.rc.clone(),
}
}
}
impl PartialEq for Listener {
fn eq(&self, other: &Self) -> bool {
Rc::ptr_eq(&self.rc, &other.rc)
}
}
#[cfg(test)]
mod listener_tests {
use crate::{Event, EventPool, Listener, Observer, VecListeners};
#[test]
fn new_listener() {
let mut text = String::new();
let listener = Listener::new::<bool>(|event, _| {
if **event {
text.push_str("Its work");
}
});
notify(listener, true);
assert_eq!("Its work", text.as_str());
}
#[test]
fn to_vec() {
let mut is_done = false;
let listener = Listener::new::<bool>(|_, _| {
is_done = true;
});
let mut list = Vec::from_listener(listener, 10);
let listener = list.pop().unwrap();
notify(listener, true);
assert_eq!(true, is_done);
}
#[test]
fn deactivate() {
let mut is_ok = true;
let observer = Observer::new();
let listener = observer.listen::<bool>(|_, _| is_ok = false);
listener.deactivate();
observer.send(true);
assert_eq!(is_ok, true);
}
fn notify(listener: Listener, val: impl Event) {
listener.notify(&value_to_event(val), &mut EventPool::new())
}
fn value_to_event(val: impl Event) -> Box<dyn Event> {
Box::new(val) as Box<dyn Event>
}
}
struct EventPool {
events: VecDeque<Box<dyn Event>>,
}
impl EventPool {
fn new() -> Self {
Self {
events: VecDeque::with_capacity(10),
}
}
fn from(event: impl Event) -> Self {
let mut events = VecDeque::<Box<dyn Event>>::new();
events.push_back(Box::new(event));
Self {
events,
..EventPool::new()
}
}
fn send(&mut self, event: impl Event) {
let event = Box::new(event);
self.events.push_back(event);
}
fn pop(&mut self) -> Option<Box<dyn Event>> {
self.events.pop_front()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment