Skip to content

Instantly share code, notes, and snippets.

@wentout
Last active March 8, 2025 00:02
Show Gist options
  • Save wentout/c0b1735cd9c1070529998d7d8b42c59a to your computer and use it in GitHub Desktop.
Save wentout/c0b1735cd9c1070529998d7d8b42c59a to your computer and use it in GitHub Desktop.
Publisher Subscriber pattern made via Wrapper
'use strict';
// for testing this code please use the following command
//
// node PubSubOnWrappers.js | grep stack
//
// to see if stack size exceed
//
// for huge amount of async subs in Node.js we are receiving
// "Exception in PromiseRejectCallback"
// among with
// "RangeError: Maximum call stack size exceeded"
//
// it does nothing with ability receive some subscriptions still,
// though it means the whole idea works correctly only for small amount of subs
//
// also made some tests for browsers
// FireFox works correctly giving that stack errors and producing messages
// for Chrome seems it's Dev Tools are very slow
// so I recommend to run it from HTML file made with following code
// <script src="PubSubOnWrappers.js"></script>
// after page completed you may open the console and see how it was working
(function () {
const names = {
n: 'notification : ',
s: 'subscription : '
};
const defaultSubscriberConfig = {
// if subscription count is bigger than max call stack size
// then it exceeds, so we may use asyn subscribers for that
async: false,
stand: 'fifo',
};
const SYMBOL_NOTIFY = Symbol('notify');
class Publisher {
static self_notification(...args) {
console.log(names.n, ...args);
}
static #subscribers = new Map();
static init(publisher) {
if (!publisher) {
throw new Error('missing init publisher');
}
delete publisher.notify;
const done = Reflect.defineProperty(publisher, SYMBOL_NOTIFY, {
get() {
return Publisher.self_notification;
},
configurable: true,
enumerable: true
});
const payload = {
subscribers: new Map,
last: null
};
Publisher.#subscribers.set(publisher, payload);
return done;
}
constructor() {
Publisher.init(this);
}
static upsert(publisher, handler, subscriber) {
if (!publisher) {
throw new Error('missing upsert publisher');
}
delete publisher.notify;
if (!handler) {
throw new Error('missing upsert handler');
}
if (!subscriber) {
throw new Error('missing upsert subscriber');
}
const done = Reflect.defineProperty(publisher, SYMBOL_NOTIFY, {
get() {
return handler;
},
configurable: true,
enumerable: true
});
const { subscribers, last } = Publisher.#subscribers.get(publisher);
if (last === null) {
subscribers.set(subscriber, null);
} else {
subscribers.set(last, subscriber);
subscribers.set(subscriber, null);
}
subscribers.set(subscriber, null);
const payload = { subscribers, last: subscriber };
Publisher.#subscribers.set(publisher, payload);
return done;
}
static reinit(publisher) {
const { subscribers, last } = Publisher.#subscribers.get(publisher);
Publisher.init(publisher);
subscribers.forEach((next, subscriber) => {
if (!subscriber.subscribed) {
return;
}
subscriber.init();
});
// console.log('REINIT DONE', Publisher.subscribers(publisher));
}
static notify(publisher, ...args) {
const { subscribers } = Publisher.#subscribers.get(publisher);
subscribers.forEach((next, subscriber) => {
subscriber.notify(...args);
});
}
}
class Subscriber {
#handler = undefined
#publisher = undefined
#subscribed = false
#config = { ...defaultSubscriberConfig }
get subscribed() {
return this.#subscribed;
}
constructor(publisher, handler, config = { ...defaultSubscriberConfig }) {
const self = this;
self.#publisher = publisher;
self.#handler = handler;
self.#config = Object.assign({ ...self.#config }, config);
self.init();
}
init() {
const self = this;
const publisher = self.#publisher;
const stack = Reflect.getOwnPropertyDescriptor(publisher, SYMBOL_NOTIFY).get();
const kind = (self.#config.stand === 'fifo') ? true : false;
let notify = kind ? (
self.#config.async ?
async (...args) => {
stack(...args);
self.#handler(...args);
} :
(...args) => {
stack(...args);
self.#handler(...args);
}
) : (
self.#config.async ?
async (...args) => {
self.#handler(...args);
stack(...args);
} :
(...args) => {
self.#handler(...args);
stack(...args);
}
);
const done = Publisher.upsert(publisher, notify, self);
self.#subscribed = true;
// console.log('subscribed:', done);
}
unsubscribe() {
this.#subscribed = false;
Publisher.reinit(this.#publisher);
}
notify(...args) {
this.#handler(...args);
}
}
console.time('test duration');
const publishers = {};
for (let i = 1; i <= 6; i++) {
const name = `publisher${i}`;
const publisher = new Publisher;
const publisherProxy = new Proxy(publisher, {
set(target, prop, value, receiver) {
receiver[SYMBOL_NOTIFY](prop, value);
return true;
}
});
const made = Reflect.setPrototypeOf(publisher, publisherProxy);
publishers[name] = publisher;
console.log(`publisher [${name}] was made: `, made);
}
const {
publisher1,
publisher2,
publisher3,
publisher4,
publisher5,
publisher6,
} = publishers;
const subscriber01 = new Subscriber(publisher1, (...args) => {
console.log(names.s, 'p1 > 01 : ', ...args);
});
// const s1h = Reflect.getOwnPropertyDescriptor(publisher1, SYMBOL_NOTIFY).get();
// const s2h = Reflect.getOwnPropertyDescriptor(publisher2, SYMBOL_NOTIFY).get();
// console.log('s1h === s2h : ', s1h === s2h);
publisher1.topic = 111;
const subscriber02 = new Subscriber(publisher1, (...args) => {
console.log(names.s, 'p1 > 02 : ', ...args);
});
console.log('--- fully subscribed ---');
publisher1.topic = 222;
console.log('--- unsubscribing first ---');
subscriber01.unsubscribe();
publisher1.topic = 333;
subscriber02.unsubscribe();
console.log('--- both unsubscribed ---');
publisher1.topic = 444;
console.log('--- subscribing twice ---');
const subscriber03 = new Subscriber(publisher1, (...args) => {
console.log(names.s, 'p1 > 03 : ', ...args);
});
const subscriber04 = new Subscriber(publisher1, (...args) => {
console.log(names.s, 'p1 > 04 : ', ...args);
});
console.log('--- subscribed twice ---');
publisher1.topic = 555;
console.log('--- subscribed third ---');
const subscriber05 = new Subscriber(publisher1, (...args) => {
console.log(names.s, 'p1 > 05 : ', ...args);
});
publisher1.topic = 123;
subscriber04.unsubscribe();
console.log('--- unsubscribed p1 > 04 ---');
publisher1.topic = 321;
subscriber03.unsubscribe();
console.log('--- unsubscribed p1 > 03 ---');
publisher1.topic = 777;
const subscriber07 = new Subscriber(publisher1, (...args) => {
console.log(names.s, 'p1 > 07 : ', ...args);
});
console.log('--- subscribed 07 first ---');
publisher1.topic = 888;
const subscriber21 = new Subscriber(publisher2, (...args) => {
console.log(names.s, 'p2 > 21 : ', ...args);
});
const subscriber22 = new Subscriber(publisher2, (...args) => {
console.log(names.s, 'p2 > 22 : ', ...args);
});
console.log('--- subscribed P2 s21 s22 first ---');
publisher2.topic = 999;
console.log('--- test async ---');
const thousands = {};
for (let i = 0; i < 9; i++) {
const name = `s: ` + i
thousands[name] = new Subscriber(publisher2, (...args) => {
console.log(names.s, `p2 > ${name}`, ...args);
}, { async: true });
console.log(`subscribed p2 ${name}`);
}
publisher2.test = `test async`;
console.log('--- test lifo ---');
for (let i = 0; i < 10; i++) {
const name = `lifo s: ` + i;
thousands[name] = new Subscriber(publisher3, (...args) => {
console.log(names.s, `p3 > ${name}`, ...args);
}, { async: true, stand: 'lifo' });
console.log(`subscribed p3 ${name}`);
}
// max call stack size exceed if more
publisher3.test = `test lifo`;
console.log('--- reinit test ---');
for (let i = 0; i < 10; i++) {
if (i % 2 !== 0) continue;
const name = `s: ` + i
thousands[name].unsubscribe();
console.log(`reinit for ${name}`)
}
publisher2.test = `reinit test 1`;
publisher2.test = `reinit test 2`;
console.log('--- reinit test lifo ---');
for (let i = 0; i < 10; i++) {
if (i % 3 !== 0) continue;
const name = `lifo s: ` + i;
thousands[name].unsubscribe();
console.log(`reinit done for ${name}`)
}
publisher3.testLifoReinit = 'lifo reinit';
console.log('--- stack amount test ---');
const stack_amt = 1_000;
for (let i = 0; i < stack_amt; i++) {
const name = `h: ` + i
thousands[name] = new Subscriber(publisher4, (...args) => {
console.log(names.s, `p4 > ${name}`, ...args);
});
if (i % 1000 === 0) {
console.log(`current stack test sub: ${name}`);
}
if (i === (stack_amt - 1)) {
console.log(`Preparation of stack test for [ ${stack_amt} ] SYNC subscrptions Finished`);
}
}
publisher4['WOW syncStackTest : '] = 'aha';
// try {
// setTimeout(() => {
// publisher4['WOW syncStackTest : '] = 'aha';
// });
// } catch (error) {
// console.error(error);
// }
console.log('--- huge async amount test ---');
const tsubs_amt = 999_999;
var tsubs_invocation_count = 0;
const skip_message = 'real thing';
for (let i = 0; i < tsubs_amt; i++) {
const name = `h: ` + i
thousands[name] = new Subscriber(publisher5, (topic, ...args) => {
tsubs_invocation_count++;
if (topic === skip_message) {
return;
}
try {
console.log(names.s, `p5 > ${name}`, topic, ...args);
} catch (error) {
console.error(error);
}
}, { async: true });
if (i % 1000 === 0) {
console.log(`added another thousand of huge subs, current : ${name}`);
}
}
console.log('--- making two unsubs for that huge, rebalance ---');
console.log('--- please wait ---');
thousands['h: 8'].unsubscribe();
thousands[`h: ${tsubs_amt - 3}`].unsubscribe();
console.log('--- testing after that unsub huge, rebalance ---');
console.log(`\n\n\nPreparation of [ ${tsubs_amt} ] ASYNC subscrptions DONE`);
console.time(skip_message);
const start = Date.now();
Publisher.notify(publisher5, skip_message);
const diff = Date.now() - start;
const real_tsubs_invocation_count = tsubs_invocation_count + 0;
console.timeEnd(skip_message);
console.log(skip_message, diff);
console.log(real_tsubs_invocation_count);
tsubs_invocation_count = 0;
const promise_queue_amt = 999_999;
for (let i = 0; i < promise_queue_amt; i++) {
const name = `pq: ` + i
thousands[name] = new Subscriber(publisher6, (topic, ...args) => {
tsubs_invocation_count++;
try {
console.log(names.s, `p5 > ${name}`, topic, ...args);
} catch (error) {
console.error(error);
}
}, { async: true });
if (i % 1000 === 0) {
console.log(`added another thousand of huge subs, current : ${name}`);
}
}
// publisher6.huge = 'huge test';
// const UseTimeoutToRuinInternalPromiseQueue = false;
// const UseQueueHeating = false;
// if (UseTimeoutToRuinInternalPromiseQueue) {
// if (UseQueueHeating) {
// publisher6.huge = 'huge Pre HEAT after unsub';
// }
// console.log('and now we are may be failing');
// if (UseQueueHeating) {
// console.log(' or maybe not, cause we made Pre-Heated PromiseQueue callstack');
// }
// console.log('please wait for 10 sec to see failure');
// setTimeout(() => {
// try {
// publisher6.huge = 'huge fail after unsub test';
// } catch (error) {
// console.error(error);
// }
// }, 10000);
// } else {
// publisher6.huge = 'huge direct run after unsub test';
const name = `promise_queue_amt notification for: [ ${promise_queue_amt} ]`;
console.time(name);
publisher6[SYMBOL_NOTIFY]('test', 'value');
console.timeEnd(name);
// }
console.timeEnd('test duration');
console.log('promise_queue_amt async stack: ', tsubs_invocation_count, ' from ', promise_queue_amt);
console.log('real_tsubs_invocation_count', real_tsubs_invocation_count, ' from ', tsubs_amt - 2, ` made during ${diff}ms`);
})();
// process.on('uncaughtException', function(error) {
// console.error(error);
// process.exit();
// });
// process.on('unhandledRejection', function(error) {
// console.error(error);
// process.exit();
// });
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment