Skip to content

Instantly share code, notes, and snippets.

@vietj
Last active September 26, 2025 13:31
Show Gist options
  • Select an option

  • Save vietj/b30b3788254ba5adc50ebc63383de6e9 to your computer and use it in GitHub Desktop.

Select an option

Save vietj/b30b3788254ba5adc50ebc63383de6e9 to your computer and use it in GitHub Desktop.
package io.vertx.ext.web.codec.sse;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
import io.vertx.ext.web.codec.BodyCodec;
import io.vertx.ext.web.codec.spi.BodyStream;
public class SseBodyCodec2 implements BodyCodec<Void> {
private final Handler<ReadStream<SseEvent>> handler;
public SseBodyCodec2(Handler<ReadStream<SseEvent>> handler) {
this.handler = handler;
}
@Override
public BodyStream<Void> stream() throws Exception {
SseBodyStream stream = new SseBodyStream();
handler.handle(stream);
return stream;
}
static class SseBodyStream implements BodyStream<Void>, ReadStream<SseEvent> {
private static final int LOW_WATERMARK = 1024;
private static final int HIGH_WATERMARK = 4 * 1024;
private Handler<SseEvent> handler;
private Handler<Void> endHandler;
private Promise<Void> done = Promise.promise();
private long demand = Long.MAX_VALUE;
private Buffer content = Buffer.buffer();
private boolean ended;
private Handler<Void> drainHandler;
private boolean writeQueueFull;
@Override
public ReadStream<SseEvent> handler(@Nullable Handler<SseEvent> handler) {
this.handler = handler;
return this;
}
@Override
public ReadStream<SseEvent> pause() {
demand = 0L;
return this;
}
@Override
public ReadStream<SseEvent> resume() {
demand = Long.MAX_VALUE;
check();
return this;
}
@Override
public ReadStream<SseEvent> fetch(long l) {
demand = l;
check();
return this;
}
@Override
public ReadStream<SseEvent> endHandler(@Nullable Handler<Void> handler) {
this.endHandler = handler;
return this;
}
synchronized SseEvent nextSseEvent() {
// Extract an event from the content
throw new UnsupportedOperationException();
}
void check() {
while (true) {
long d = demand;
if (d == 0L) {
break;
}
SseEvent event = nextSseEvent();
if (event == null) {
if (ended) {
Handler<Void> h = endHandler;
if (h != null) {
h.handle(null);
}
}
break;
}
// Race possible
if (d != Long.MAX_VALUE) {
demand = d - 1;
}
Handler<SseEvent> h = handler;
if (h != null) {
h.handle(event);
}
}
if (content.length() < LOW_WATERMARK && writeQueueFull) {
writeQueueFull = false;
Handler<Void> h = drainHandler;
if (h != null) {
h.handle(null);
}
}
}
@Override
public Future<Void> write(Buffer buffer) {
content.appendBuffer(buffer);
check();
writeQueueFull |= writeQueueFull();
return Future.succeededFuture();
}
@Override
public boolean writeQueueFull() {
return content.length() >= HIGH_WATERMARK;
}
@Override
public WriteStream<Buffer> drainHandler(@Nullable Handler<Void> handler) {
drainHandler = handler;
return this;
}
@Override
public Future<Void> end() {
ended = true;
check();
return Future.succeededFuture();
}
@Override
public Future<Void> result() {
return done.future();
}
@Override
public SseBodyStream exceptionHandler(@Nullable Handler<Throwable> handler) {
return null;
}
@Override
public void handle(Throwable throwable) {
}
@Override
public WriteStream<Buffer> setWriteQueueMaxSize(int i) {
return this;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment