Last active
June 18, 2022 05:59
-
-
Save abh006/6c8cd44803b7f24f7fb58caffb5f7d04 to your computer and use it in GitHub Desktop.
RxJS pipeline with zero backpressure.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { map, Subject, tap } from "rxjs"; | |
function getBalancedPipeline<T>( | |
generator: AsyncGenerator<T, void, void>, | |
pipeline = new Subject<T>() | |
) { | |
pipeline.subscribe({ | |
next: async () => { | |
const nextInput = await generator.next(); | |
if (!nextInput.done) { | |
pipeline.next(nextInput.value); | |
} else { | |
console.log("Generator exhausted"); | |
} | |
}, | |
}); | |
return pipeline; | |
} | |
async function* getData(): AsyncGenerator<string, void, void> { | |
const values = ["one", "two", "three", "four"]; | |
for (const value of values) { | |
yield value; | |
} | |
} | |
(async () => { | |
const generator = getData(); | |
const pipeline = await getBalancedPipeline(generator); | |
pipeline | |
.pipe( | |
map((value: string) => { | |
return value.toUpperCase(); | |
}), | |
tap((value) => console.log(value)) | |
) | |
.subscribe(); | |
const initial = await generator.next(); | |
if (!initial.done) { | |
pipeline.next(initial.value); | |
} | |
})(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment