Last active
September 26, 2025 13:31
-
-
Save vietj/b30b3788254ba5adc50ebc63383de6e9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package io.vertx.ext.web.codec.sse; | |
| import io.vertx.codegen.annotations.Nullable; | |
| import io.vertx.core.Future; | |
| import io.vertx.core.Handler; | |
| import io.vertx.core.Promise; | |
| import io.vertx.core.buffer.Buffer; | |
| import io.vertx.core.streams.ReadStream; | |
| import io.vertx.core.streams.WriteStream; | |
| import io.vertx.ext.web.codec.BodyCodec; | |
| import io.vertx.ext.web.codec.spi.BodyStream; | |
| public class SseBodyCodec2 implements BodyCodec<Void> { | |
| private final Handler<ReadStream<SseEvent>> handler; | |
| public SseBodyCodec2(Handler<ReadStream<SseEvent>> handler) { | |
| this.handler = handler; | |
| } | |
| @Override | |
| public BodyStream<Void> stream() throws Exception { | |
| SseBodyStream stream = new SseBodyStream(); | |
| handler.handle(stream); | |
| return stream; | |
| } | |
| static class SseBodyStream implements BodyStream<Void>, ReadStream<SseEvent> { | |
| private static final int LOW_WATERMARK = 1024; | |
| private static final int HIGH_WATERMARK = 4 * 1024; | |
| private Handler<SseEvent> handler; | |
| private Handler<Void> endHandler; | |
| private Promise<Void> done = Promise.promise(); | |
| private long demand = Long.MAX_VALUE; | |
| private Buffer content = Buffer.buffer(); | |
| private boolean ended; | |
| private Handler<Void> drainHandler; | |
| private boolean writeQueueFull; | |
| @Override | |
| public ReadStream<SseEvent> handler(@Nullable Handler<SseEvent> handler) { | |
| this.handler = handler; | |
| return this; | |
| } | |
| @Override | |
| public ReadStream<SseEvent> pause() { | |
| demand = 0L; | |
| return this; | |
| } | |
| @Override | |
| public ReadStream<SseEvent> resume() { | |
| demand = Long.MAX_VALUE; | |
| check(); | |
| return this; | |
| } | |
| @Override | |
| public ReadStream<SseEvent> fetch(long l) { | |
| demand = l; | |
| check(); | |
| return this; | |
| } | |
| @Override | |
| public ReadStream<SseEvent> endHandler(@Nullable Handler<Void> handler) { | |
| this.endHandler = handler; | |
| return this; | |
| } | |
| synchronized SseEvent nextSseEvent() { | |
| // Extract an event from the content | |
| throw new UnsupportedOperationException(); | |
| } | |
| void check() { | |
| while (true) { | |
| long d = demand; | |
| if (d == 0L) { | |
| break; | |
| } | |
| SseEvent event = nextSseEvent(); | |
| if (event == null) { | |
| if (ended) { | |
| Handler<Void> h = endHandler; | |
| if (h != null) { | |
| h.handle(null); | |
| } | |
| } | |
| break; | |
| } | |
| // Race possible | |
| if (d != Long.MAX_VALUE) { | |
| demand = d - 1; | |
| } | |
| Handler<SseEvent> h = handler; | |
| if (h != null) { | |
| h.handle(event); | |
| } | |
| } | |
| if (content.length() < LOW_WATERMARK && writeQueueFull) { | |
| writeQueueFull = false; | |
| Handler<Void> h = drainHandler; | |
| if (h != null) { | |
| h.handle(null); | |
| } | |
| } | |
| } | |
| @Override | |
| public Future<Void> write(Buffer buffer) { | |
| content.appendBuffer(buffer); | |
| check(); | |
| writeQueueFull |= writeQueueFull(); | |
| return Future.succeededFuture(); | |
| } | |
| @Override | |
| public boolean writeQueueFull() { | |
| return content.length() >= HIGH_WATERMARK; | |
| } | |
| @Override | |
| public WriteStream<Buffer> drainHandler(@Nullable Handler<Void> handler) { | |
| drainHandler = handler; | |
| return this; | |
| } | |
| @Override | |
| public Future<Void> end() { | |
| ended = true; | |
| check(); | |
| return Future.succeededFuture(); | |
| } | |
| @Override | |
| public Future<Void> result() { | |
| return done.future(); | |
| } | |
| @Override | |
| public SseBodyStream exceptionHandler(@Nullable Handler<Throwable> handler) { | |
| return null; | |
| } | |
| @Override | |
| public void handle(Throwable throwable) { | |
| } | |
| @Override | |
| public WriteStream<Buffer> setWriteQueueMaxSize(int i) { | |
| return this; | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment