Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save dmgcodevil/0bf73fda4b814f9d7252a990fde1baaa to your computer and use it in GitHub Desktop.

Select an option

Save dmgcodevil/0bf73fda4b814f9d7252a990fde1baaa to your computer and use it in GitHub Desktop.
package org.example;
import org.redisson.Redisson;
import org.redisson.api.RStream;
import org.redisson.api.RedissonClient;
import org.redisson.api.StreamMessageId;
import org.redisson.api.stream.StreamAddArgs;
import org.redisson.api.stream.StreamReadArgs;
import reactor.core.publisher.Flux;
import org.redisson.config.Config;
import reactor.core.scheduler.Schedulers;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
enum ElementType {
DATA,
END
}
static class ReadStreamRedis {
final RedissonClient client;
ReadStreamRedis(RedissonClient client) {
this.client = client;
}
Flux<byte[]> read(String name) {
RStream<String, Object> stream = client.getStream(name);
final StreamMessageId[] fromId = {StreamMessageId.ALL};
return Flux.create(sink -> {
sink.onRequest(n -> {
System.out.println("requested: " + n);
while (n > 0) {
Map<StreamMessageId, Map<String, Object>> map = stream.read(StreamReadArgs.greaterThan(fromId[0])
.count((int) Math.min(n, Integer.MAX_VALUE)));
n -= map.size();
System.out.println("read messages count: " + map.size());
for (Map.Entry<StreamMessageId, Map<String, Object>> entry : map.entrySet()) {
System.out.println("flux: " + entry.getKey());
if (entry.getKey().getId0() > fromId[0].getId0()) {
fromId[0] = entry.getKey();
}
Map<String, Object> element = entry.getValue();
if (ElementType.valueOf((String) element.get("type")) == ElementType.END) {
System.out.println("stream complete");
sink.complete();
return;
}
byte[] data = (byte[]) element.get("data");
sink.next(data);
}
}
});
});
}
}
static class Publisher implements Runnable {
final RedissonClient client;
final RStream<String, Object> stream;
final int total;
int count = 0;
Publisher(RedissonClient client, String name, int total) {
this.client = client;
this.stream = client.getStream(name);
this.total = total;
}
@Override
public void run() {
while (count < total) {
Map<String, Object> entries = new HashMap<>();
entries.put("type", ElementType.DATA.name());
entries.put("data", String.valueOf(count).getBytes());
StreamMessageId messageId = stream.add(StreamAddArgs.entries(entries));
System.out.println("published: " + messageId);
count++;
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
}
Map<String, Object> entries = new HashMap<>();
entries.put("type", ElementType.END.name());
entries.put("data", new byte[0]);
stream.add(StreamAddArgs.entries(entries));
}
}
public static void main(String[] args) throws InterruptedException {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379");
final RedissonClient client = Redisson.create(config);
// System.out.println(client.getStream("test").delete());
final int totalMessages = 1000;
Publisher publisher = new Publisher(client, "test", totalMessages);
ExecutorService threadPool = Executors.newFixedThreadPool(2);
// threadPool.submit(publisher);
ReadStreamRedis readStreamRedis = new ReadStreamRedis(client);
readStreamRedis.read("test").publishOn(Schedulers.parallel()).subscribe(data -> {
System.out.println("client received message: " + new String(data));
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment