Last active
March 8, 2025 00:02
-
-
Save wentout/c0b1735cd9c1070529998d7d8b42c59a to your computer and use it in GitHub Desktop.
Publisher Subscriber pattern made via Wrapper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
// for testing this code please use the following command | |
// | |
// node PubSubOnWrappers.js | grep stack | |
// | |
// to see if stack size exceed | |
// | |
// for huge amount of async subs in Node.js we are receiving | |
// "Exception in PromiseRejectCallback" | |
// among with | |
// "RangeError: Maximum call stack size exceeded" | |
// | |
// it does nothing with ability receive some subscriptions still, | |
// though it means the whole idea works correctly only for small amount of subs | |
// | |
// also made some tests for browsers | |
// FireFox works correctly giving that stack errors and producing messages | |
// for Chrome seems it's Dev Tools are very slow | |
// so I recommend to run it from HTML file made with following code | |
// <script src="PubSubOnWrappers.js"></script> | |
// after page completed you may open the console and see how it was working | |
(function () { | |
const names = { | |
n: 'notification : ', | |
s: 'subscription : ' | |
}; | |
const defaultSubscriberConfig = { | |
// if subscription count is bigger than max call stack size | |
// then it exceeds, so we may use asyn subscribers for that | |
async: false, | |
stand: 'fifo', | |
}; | |
const SYMBOL_NOTIFY = Symbol('notify'); | |
class Publisher { | |
static self_notification(...args) { | |
console.log(names.n, ...args); | |
} | |
static #subscribers = new Map(); | |
static init(publisher) { | |
if (!publisher) { | |
throw new Error('missing init publisher'); | |
} | |
delete publisher.notify; | |
const done = Reflect.defineProperty(publisher, SYMBOL_NOTIFY, { | |
get() { | |
return Publisher.self_notification; | |
}, | |
configurable: true, | |
enumerable: true | |
}); | |
const payload = { | |
subscribers: new Map, | |
last: null | |
}; | |
Publisher.#subscribers.set(publisher, payload); | |
return done; | |
} | |
constructor() { | |
Publisher.init(this); | |
} | |
static upsert(publisher, handler, subscriber) { | |
if (!publisher) { | |
throw new Error('missing upsert publisher'); | |
} | |
delete publisher.notify; | |
if (!handler) { | |
throw new Error('missing upsert handler'); | |
} | |
if (!subscriber) { | |
throw new Error('missing upsert subscriber'); | |
} | |
const done = Reflect.defineProperty(publisher, SYMBOL_NOTIFY, { | |
get() { | |
return handler; | |
}, | |
configurable: true, | |
enumerable: true | |
}); | |
const { subscribers, last } = Publisher.#subscribers.get(publisher); | |
if (last === null) { | |
subscribers.set(subscriber, null); | |
} else { | |
subscribers.set(last, subscriber); | |
subscribers.set(subscriber, null); | |
} | |
subscribers.set(subscriber, null); | |
const payload = { subscribers, last: subscriber }; | |
Publisher.#subscribers.set(publisher, payload); | |
return done; | |
} | |
static reinit(publisher) { | |
const { subscribers, last } = Publisher.#subscribers.get(publisher); | |
Publisher.init(publisher); | |
subscribers.forEach((next, subscriber) => { | |
if (!subscriber.subscribed) { | |
return; | |
} | |
subscriber.init(); | |
}); | |
// console.log('REINIT DONE', Publisher.subscribers(publisher)); | |
} | |
static notify(publisher, ...args) { | |
const { subscribers } = Publisher.#subscribers.get(publisher); | |
subscribers.forEach((next, subscriber) => { | |
subscriber.notify(...args); | |
}); | |
} | |
} | |
class Subscriber { | |
#handler = undefined | |
#publisher = undefined | |
#subscribed = false | |
#config = { ...defaultSubscriberConfig } | |
get subscribed() { | |
return this.#subscribed; | |
} | |
constructor(publisher, handler, config = { ...defaultSubscriberConfig }) { | |
const self = this; | |
self.#publisher = publisher; | |
self.#handler = handler; | |
self.#config = Object.assign({ ...self.#config }, config); | |
self.init(); | |
} | |
init() { | |
const self = this; | |
const publisher = self.#publisher; | |
const stack = Reflect.getOwnPropertyDescriptor(publisher, SYMBOL_NOTIFY).get(); | |
const kind = (self.#config.stand === 'fifo') ? true : false; | |
let notify = kind ? ( | |
self.#config.async ? | |
async (...args) => { | |
stack(...args); | |
self.#handler(...args); | |
} : | |
(...args) => { | |
stack(...args); | |
self.#handler(...args); | |
} | |
) : ( | |
self.#config.async ? | |
async (...args) => { | |
self.#handler(...args); | |
stack(...args); | |
} : | |
(...args) => { | |
self.#handler(...args); | |
stack(...args); | |
} | |
); | |
const done = Publisher.upsert(publisher, notify, self); | |
self.#subscribed = true; | |
// console.log('subscribed:', done); | |
} | |
unsubscribe() { | |
this.#subscribed = false; | |
Publisher.reinit(this.#publisher); | |
} | |
notify(...args) { | |
this.#handler(...args); | |
} | |
} | |
console.time('test duration'); | |
const publishers = {}; | |
for (let i = 1; i <= 6; i++) { | |
const name = `publisher${i}`; | |
const publisher = new Publisher; | |
const publisherProxy = new Proxy(publisher, { | |
set(target, prop, value, receiver) { | |
receiver[SYMBOL_NOTIFY](prop, value); | |
return true; | |
} | |
}); | |
const made = Reflect.setPrototypeOf(publisher, publisherProxy); | |
publishers[name] = publisher; | |
console.log(`publisher [${name}] was made: `, made); | |
} | |
const { | |
publisher1, | |
publisher2, | |
publisher3, | |
publisher4, | |
publisher5, | |
publisher6, | |
} = publishers; | |
const subscriber01 = new Subscriber(publisher1, (...args) => { | |
console.log(names.s, 'p1 > 01 : ', ...args); | |
}); | |
// const s1h = Reflect.getOwnPropertyDescriptor(publisher1, SYMBOL_NOTIFY).get(); | |
// const s2h = Reflect.getOwnPropertyDescriptor(publisher2, SYMBOL_NOTIFY).get(); | |
// console.log('s1h === s2h : ', s1h === s2h); | |
publisher1.topic = 111; | |
const subscriber02 = new Subscriber(publisher1, (...args) => { | |
console.log(names.s, 'p1 > 02 : ', ...args); | |
}); | |
console.log('--- fully subscribed ---'); | |
publisher1.topic = 222; | |
console.log('--- unsubscribing first ---'); | |
subscriber01.unsubscribe(); | |
publisher1.topic = 333; | |
subscriber02.unsubscribe(); | |
console.log('--- both unsubscribed ---'); | |
publisher1.topic = 444; | |
console.log('--- subscribing twice ---'); | |
const subscriber03 = new Subscriber(publisher1, (...args) => { | |
console.log(names.s, 'p1 > 03 : ', ...args); | |
}); | |
const subscriber04 = new Subscriber(publisher1, (...args) => { | |
console.log(names.s, 'p1 > 04 : ', ...args); | |
}); | |
console.log('--- subscribed twice ---'); | |
publisher1.topic = 555; | |
console.log('--- subscribed third ---'); | |
const subscriber05 = new Subscriber(publisher1, (...args) => { | |
console.log(names.s, 'p1 > 05 : ', ...args); | |
}); | |
publisher1.topic = 123; | |
subscriber04.unsubscribe(); | |
console.log('--- unsubscribed p1 > 04 ---'); | |
publisher1.topic = 321; | |
subscriber03.unsubscribe(); | |
console.log('--- unsubscribed p1 > 03 ---'); | |
publisher1.topic = 777; | |
const subscriber07 = new Subscriber(publisher1, (...args) => { | |
console.log(names.s, 'p1 > 07 : ', ...args); | |
}); | |
console.log('--- subscribed 07 first ---'); | |
publisher1.topic = 888; | |
const subscriber21 = new Subscriber(publisher2, (...args) => { | |
console.log(names.s, 'p2 > 21 : ', ...args); | |
}); | |
const subscriber22 = new Subscriber(publisher2, (...args) => { | |
console.log(names.s, 'p2 > 22 : ', ...args); | |
}); | |
console.log('--- subscribed P2 s21 s22 first ---'); | |
publisher2.topic = 999; | |
console.log('--- test async ---'); | |
const thousands = {}; | |
for (let i = 0; i < 9; i++) { | |
const name = `s: ` + i | |
thousands[name] = new Subscriber(publisher2, (...args) => { | |
console.log(names.s, `p2 > ${name}`, ...args); | |
}, { async: true }); | |
console.log(`subscribed p2 ${name}`); | |
} | |
publisher2.test = `test async`; | |
console.log('--- test lifo ---'); | |
for (let i = 0; i < 10; i++) { | |
const name = `lifo s: ` + i; | |
thousands[name] = new Subscriber(publisher3, (...args) => { | |
console.log(names.s, `p3 > ${name}`, ...args); | |
}, { async: true, stand: 'lifo' }); | |
console.log(`subscribed p3 ${name}`); | |
} | |
// max call stack size exceed if more | |
publisher3.test = `test lifo`; | |
console.log('--- reinit test ---'); | |
for (let i = 0; i < 10; i++) { | |
if (i % 2 !== 0) continue; | |
const name = `s: ` + i | |
thousands[name].unsubscribe(); | |
console.log(`reinit for ${name}`) | |
} | |
publisher2.test = `reinit test 1`; | |
publisher2.test = `reinit test 2`; | |
console.log('--- reinit test lifo ---'); | |
for (let i = 0; i < 10; i++) { | |
if (i % 3 !== 0) continue; | |
const name = `lifo s: ` + i; | |
thousands[name].unsubscribe(); | |
console.log(`reinit done for ${name}`) | |
} | |
publisher3.testLifoReinit = 'lifo reinit'; | |
console.log('--- stack amount test ---'); | |
const stack_amt = 1_000; | |
for (let i = 0; i < stack_amt; i++) { | |
const name = `h: ` + i | |
thousands[name] = new Subscriber(publisher4, (...args) => { | |
console.log(names.s, `p4 > ${name}`, ...args); | |
}); | |
if (i % 1000 === 0) { | |
console.log(`current stack test sub: ${name}`); | |
} | |
if (i === (stack_amt - 1)) { | |
console.log(`Preparation of stack test for [ ${stack_amt} ] SYNC subscrptions Finished`); | |
} | |
} | |
publisher4['WOW syncStackTest : '] = 'aha'; | |
// try { | |
// setTimeout(() => { | |
// publisher4['WOW syncStackTest : '] = 'aha'; | |
// }); | |
// } catch (error) { | |
// console.error(error); | |
// } | |
console.log('--- huge async amount test ---'); | |
const tsubs_amt = 999_999; | |
var tsubs_invocation_count = 0; | |
const skip_message = 'real thing'; | |
for (let i = 0; i < tsubs_amt; i++) { | |
const name = `h: ` + i | |
thousands[name] = new Subscriber(publisher5, (topic, ...args) => { | |
tsubs_invocation_count++; | |
if (topic === skip_message) { | |
return; | |
} | |
try { | |
console.log(names.s, `p5 > ${name}`, topic, ...args); | |
} catch (error) { | |
console.error(error); | |
} | |
}, { async: true }); | |
if (i % 1000 === 0) { | |
console.log(`added another thousand of huge subs, current : ${name}`); | |
} | |
} | |
console.log('--- making two unsubs for that huge, rebalance ---'); | |
console.log('--- please wait ---'); | |
thousands['h: 8'].unsubscribe(); | |
thousands[`h: ${tsubs_amt - 3}`].unsubscribe(); | |
console.log('--- testing after that unsub huge, rebalance ---'); | |
console.log(`\n\n\nPreparation of [ ${tsubs_amt} ] ASYNC subscrptions DONE`); | |
console.time(skip_message); | |
const start = Date.now(); | |
Publisher.notify(publisher5, skip_message); | |
const diff = Date.now() - start; | |
const real_tsubs_invocation_count = tsubs_invocation_count + 0; | |
console.timeEnd(skip_message); | |
console.log(skip_message, diff); | |
console.log(real_tsubs_invocation_count); | |
tsubs_invocation_count = 0; | |
const promise_queue_amt = 999_999; | |
for (let i = 0; i < promise_queue_amt; i++) { | |
const name = `pq: ` + i | |
thousands[name] = new Subscriber(publisher6, (topic, ...args) => { | |
tsubs_invocation_count++; | |
try { | |
console.log(names.s, `p5 > ${name}`, topic, ...args); | |
} catch (error) { | |
console.error(error); | |
} | |
}, { async: true }); | |
if (i % 1000 === 0) { | |
console.log(`added another thousand of huge subs, current : ${name}`); | |
} | |
} | |
// publisher6.huge = 'huge test'; | |
// const UseTimeoutToRuinInternalPromiseQueue = false; | |
// const UseQueueHeating = false; | |
// if (UseTimeoutToRuinInternalPromiseQueue) { | |
// if (UseQueueHeating) { | |
// publisher6.huge = 'huge Pre HEAT after unsub'; | |
// } | |
// console.log('and now we are may be failing'); | |
// if (UseQueueHeating) { | |
// console.log(' or maybe not, cause we made Pre-Heated PromiseQueue callstack'); | |
// } | |
// console.log('please wait for 10 sec to see failure'); | |
// setTimeout(() => { | |
// try { | |
// publisher6.huge = 'huge fail after unsub test'; | |
// } catch (error) { | |
// console.error(error); | |
// } | |
// }, 10000); | |
// } else { | |
// publisher6.huge = 'huge direct run after unsub test'; | |
const name = `promise_queue_amt notification for: [ ${promise_queue_amt} ]`; | |
console.time(name); | |
publisher6[SYMBOL_NOTIFY]('test', 'value'); | |
console.timeEnd(name); | |
// } | |
console.timeEnd('test duration'); | |
console.log('promise_queue_amt async stack: ', tsubs_invocation_count, ' from ', promise_queue_amt); | |
console.log('real_tsubs_invocation_count', real_tsubs_invocation_count, ' from ', tsubs_amt - 2, ` made during ${diff}ms`); | |
})(); | |
// process.on('uncaughtException', function(error) { | |
// console.error(error); | |
// process.exit(); | |
// }); | |
// process.on('unhandledRejection', function(error) { | |
// console.error(error); | |
// process.exit(); | |
// }); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment