Created
May 30, 2025 15:36
-
-
Save dmgcodevil/0bf73fda4b814f9d7252a990fde1baaa to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package org.example; | |
| import org.redisson.Redisson; | |
| import org.redisson.api.RStream; | |
| import org.redisson.api.RedissonClient; | |
| import org.redisson.api.StreamMessageId; | |
| import org.redisson.api.stream.StreamAddArgs; | |
| import org.redisson.api.stream.StreamReadArgs; | |
| import reactor.core.publisher.Flux; | |
| import org.redisson.config.Config; | |
| import reactor.core.scheduler.Schedulers; | |
| import java.util.HashMap; | |
| import java.util.Map; | |
| import java.util.concurrent.ExecutorService; | |
| import java.util.concurrent.Executors; | |
| public class Main { | |
| enum ElementType { | |
| DATA, | |
| END | |
| } | |
| static class ReadStreamRedis { | |
| final RedissonClient client; | |
| ReadStreamRedis(RedissonClient client) { | |
| this.client = client; | |
| } | |
| Flux<byte[]> read(String name) { | |
| RStream<String, Object> stream = client.getStream(name); | |
| final StreamMessageId[] fromId = {StreamMessageId.ALL}; | |
| return Flux.create(sink -> { | |
| sink.onRequest(n -> { | |
| System.out.println("requested: " + n); | |
| while (n > 0) { | |
| Map<StreamMessageId, Map<String, Object>> map = stream.read(StreamReadArgs.greaterThan(fromId[0]) | |
| .count((int) Math.min(n, Integer.MAX_VALUE))); | |
| n -= map.size(); | |
| System.out.println("read messages count: " + map.size()); | |
| for (Map.Entry<StreamMessageId, Map<String, Object>> entry : map.entrySet()) { | |
| System.out.println("flux: " + entry.getKey()); | |
| if (entry.getKey().getId0() > fromId[0].getId0()) { | |
| fromId[0] = entry.getKey(); | |
| } | |
| Map<String, Object> element = entry.getValue(); | |
| if (ElementType.valueOf((String) element.get("type")) == ElementType.END) { | |
| System.out.println("stream complete"); | |
| sink.complete(); | |
| return; | |
| } | |
| byte[] data = (byte[]) element.get("data"); | |
| sink.next(data); | |
| } | |
| } | |
| }); | |
| }); | |
| } | |
| } | |
| static class Publisher implements Runnable { | |
| final RedissonClient client; | |
| final RStream<String, Object> stream; | |
| final int total; | |
| int count = 0; | |
| Publisher(RedissonClient client, String name, int total) { | |
| this.client = client; | |
| this.stream = client.getStream(name); | |
| this.total = total; | |
| } | |
| @Override | |
| public void run() { | |
| while (count < total) { | |
| Map<String, Object> entries = new HashMap<>(); | |
| entries.put("type", ElementType.DATA.name()); | |
| entries.put("data", String.valueOf(count).getBytes()); | |
| StreamMessageId messageId = stream.add(StreamAddArgs.entries(entries)); | |
| System.out.println("published: " + messageId); | |
| count++; | |
| // try { | |
| // Thread.sleep(1000); | |
| // } catch (InterruptedException e) { | |
| // throw new RuntimeException(e); | |
| // } | |
| } | |
| Map<String, Object> entries = new HashMap<>(); | |
| entries.put("type", ElementType.END.name()); | |
| entries.put("data", new byte[0]); | |
| stream.add(StreamAddArgs.entries(entries)); | |
| } | |
| } | |
| public static void main(String[] args) throws InterruptedException { | |
| Config config = new Config(); | |
| config.useSingleServer() | |
| .setAddress("redis://127.0.0.1:6379"); | |
| final RedissonClient client = Redisson.create(config); | |
| // System.out.println(client.getStream("test").delete()); | |
| final int totalMessages = 1000; | |
| Publisher publisher = new Publisher(client, "test", totalMessages); | |
| ExecutorService threadPool = Executors.newFixedThreadPool(2); | |
| // threadPool.submit(publisher); | |
| ReadStreamRedis readStreamRedis = new ReadStreamRedis(client); | |
| readStreamRedis.read("test").publishOn(Schedulers.parallel()).subscribe(data -> { | |
| System.out.println("client received message: " + new String(data)); | |
| // try { | |
| // Thread.sleep(1000); | |
| // } catch (InterruptedException e) { | |
| // throw new RuntimeException(e); | |
| // } | |
| }); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment