Created
January 12, 2018 13:12
-
-
Save crdrost/2edb930143630866eedae871f330099e to your computer and use it in GitHub Desktop.
Bidirectional streams inspired by @andrestaltz
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//json interpolation for string subs. Just used in shape(). | |
const in_json = (strs, ...vals) => | |
strs[0] + vals.map((v, i) => JSON.stringify(v) + strs[i + 1]).join(""); | |
// validate shape of an object having appropriate methods. | |
// PLEASE do this, using if() makes this sort of "that fn doesn't handle this particular event" | |
// unassertable, and that means that you have whole undebuggable pipelines where "somewhere in | |
// this pipeline that information gets lost, I don't know where!" | |
const shape = (obj, ...method_names) => { | |
const missing_methods = method_names.some(m => typeof obj[m] !== "function"); | |
if (missing_methods) { | |
throw new Error( | |
in_json`shape mismatch, expected methods ${method_names}, got methods ${Object.keys( | |
obj | |
).filter(k => typeof obj[k] === "function")}` | |
); | |
} | |
}; | |
const mapNext = (source, transform) => { | |
return { | |
subscribe(sink) { | |
const sinkM = {}; | |
Object.keys(sink).forEach(k => (sinkM[k] = val => sink[k](val))); | |
sinkM.next = data => { | |
sink.next(transform(data)); | |
}; | |
source.subscribe(sinkM); | |
} | |
}; | |
}; | |
const interval = { | |
subscribe(other) { | |
shape(other, "subscribe", "next"); | |
const handle = setInterval(() => other.next(i++), 1000); | |
let i = 0; | |
other.subscribe({ | |
next() { | |
i = 0; | |
}, | |
dispose() { | |
clearInterval(handle); | |
} | |
}); | |
// you were also returning dispose() above, but that seems nonidiomatic for this | |
// sort of code where subscribe() is apparently only important for its side-effects. | |
} | |
}; | |
const logger = { | |
subscribe(other) { | |
shape(other, "next"); | |
setTimeout(() => other.next(), 4500); | |
}, | |
next(value) { | |
console.log(value); | |
} | |
}; | |
interval.subscribe(logger); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment