Last active
January 26, 2022 01:25
-
-
Save NiteshKant/62a912129ea47b657f7739776b0c3b1f to your computer and use it in GitHub Desktop.
Demonstrates ServiceTalk multicast operator issue where if only one subscriber requests data, nothing is delivered
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.apple.acs.traffic.servicediscovery.insights.aggregator; | |
import io.servicetalk.concurrent.PublisherSource; | |
import io.servicetalk.concurrent.PublisherSource.Processor; | |
import io.servicetalk.concurrent.api.Publisher; | |
import io.servicetalk.concurrent.internal.TerminalNotification; | |
import javax.annotation.Nullable; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import static io.servicetalk.concurrent.api.Processors.newPublisherProcessor; | |
import static io.servicetalk.concurrent.api.SourceAdapters.fromSource; | |
import static io.servicetalk.concurrent.api.SourceAdapters.toSource; | |
public class MulticastBug { | |
public static void main(String[] args) throws InterruptedException { | |
final Processor<Integer, Integer> processor = newPublisherProcessor(128); | |
Publisher<Integer> source = fromSource(processor) | |
.beforeRequest(value -> System.out.println("Requesting upstream: " + value)) | |
.multicast(1); | |
final IntegerSubscriber sub1 = new IntegerSubscriber(); | |
toSource(source).subscribe(sub1); | |
final IntegerSubscriber sub2 = new IntegerSubscriber(); | |
toSource(source).subscribe(sub2); | |
for (int i = 0; i < 5; i++) { | |
processor.onNext(i); | |
} | |
sub1.awaitSubscription.await(); | |
sub1.subscription.request(1); | |
System.out.println("Awaiting signal for subscriber."); | |
sub1.signals.take(); | |
System.out.println("Got signal for subscriber."); | |
} | |
private static class IntegerSubscriber implements PublisherSource.Subscriber<Integer> { | |
private static final AtomicInteger counter = new AtomicInteger(); | |
private final int id = counter.incrementAndGet(); | |
private final CountDownLatch awaitSubscription = new CountDownLatch(1); | |
private PublisherSource.Subscription subscription; | |
private final LinkedBlockingQueue<Object> signals = new LinkedBlockingQueue<>(); | |
@Override | |
public void onSubscribe(PublisherSource.Subscription subscription) { | |
this.subscription = new PublisherSource.Subscription() { | |
@Override | |
public void request(long n) { | |
System.out.println("Subscriber: " + id + " requesting: " + n); | |
subscription.request(n); | |
} | |
@Override | |
public void cancel() { | |
subscription.cancel(); | |
} | |
}; | |
awaitSubscription.countDown(); | |
} | |
@Override | |
public void onNext(@Nullable Integer integer) { | |
assert integer != null; | |
signals.offer(integer); | |
subscription.request(1); | |
} | |
@Override | |
public void onError(Throwable t) { | |
signals.offer(TerminalNotification.error(t)); | |
} | |
@Override | |
public void onComplete() { | |
signals.offer(TerminalNotification.complete()); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment