Last active
April 4, 2020 01:49
-
-
Save Scottmitch/79db65e9f70992616c7b4e5f991f35da to your computer and use it in GitHub Desktop.
Attempt to provide an isolated reproducer for https://github.com/reactive-streams/reactive-streams-jvm/issues/477
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright © 2020 Apple Inc. and the ServiceTalk project authors | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package io.servicetalk.concurrent.reactivestreams.tck; | |
import org.jctools.queues.SpscLinkedQueue; | |
import org.reactivestreams.Publisher; | |
import org.reactivestreams.Subscriber; | |
import org.reactivestreams.Subscription; | |
import org.reactivestreams.tck.PublisherVerification; | |
import org.reactivestreams.tck.TestEnvironment; | |
import org.testng.annotations.AfterClass; | |
import org.testng.annotations.BeforeClass; | |
import org.testng.annotations.Test; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.Executor; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import static io.servicetalk.concurrent.reactivestreams.ReactiveStreamsAdapters.toReactiveStreamsPublisher; | |
@Test | |
public class AsyncPublisherTest extends PublisherVerification<Integer> { | |
private static ExecutorService exec; | |
private static io.servicetalk.concurrent.api.Executor stExec; | |
public AsyncPublisherTest() { | |
super(new TestEnvironment(200)); | |
} | |
@BeforeClass | |
public static void before() { | |
exec = Executors.newCachedThreadPool(); | |
stExec = io.servicetalk.concurrent.api.Executors.newCachedThreadExecutor(); | |
} | |
@AfterClass | |
public static void after() throws ExecutionException, InterruptedException { | |
exec.shutdown(); | |
stExec.closeAsync().toFuture().get(); | |
} | |
@Override | |
public Publisher<Integer> createPublisher(long elements) { | |
return toReactiveStreamsPublisher( | |
io.servicetalk.concurrent.api.Publisher.from(items(elements)) | |
// io.servicetalk.concurrent.api.Publisher.range(0, (int) elements) | |
.publishOnOverride(stExec)); | |
// return new OffloadedSubscribePublisher<Integer>(exec, new RangePublisher(1, elements)); | |
} | |
@Override | |
public Publisher<Integer> createFailedPublisher() { | |
// TODO Auto-generated method stub | |
return null; | |
} | |
private static Integer[] items(long count) { | |
int c = (int)count; | |
Integer[] result = new Integer[c]; | |
for (int i = 0; i < c; i++) { | |
result[i] = i; | |
} | |
return result; | |
} | |
@Override | |
public long maxElementsFromPublisher() { | |
return 1024; | |
} | |
@Test(invocationCount = 1000) | |
public void loopTests() throws Throwable { | |
super.optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage(); | |
super.required_createPublisher1MustProduceAStreamOfExactly1Element(); | |
super.required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements(); | |
} | |
/** | |
* This isn't exactly what we do for our default offloading strategy, as we may hop threads between signals, | |
* but provides some motivation. | |
* @param <T> The type of onNext. | |
*/ | |
public static final class OffloadedSubscribePublisher<T> implements Publisher<T> { | |
private static final Object ON_COMPLETE = new Object(); | |
private static final Object NULL = new Object(); | |
private static final Object CANCEL = new Object(); | |
private final Executor executor; | |
private final Publisher<T> publisher; | |
public OffloadedSubscribePublisher(Executor executor, Publisher<T> publisher) { | |
this.executor = executor; | |
this.publisher = publisher; | |
} | |
@Override | |
public void subscribe(final Subscriber<? super T> s) { | |
if (s == null) { throw new NullPointerException(); } | |
final SpscLinkedQueue<Object> signals = new SpscLinkedQueue<Object>(); | |
executor.execute(new Runnable() { | |
@Override | |
public void run() { | |
Object signal = null; | |
for (;;) { | |
try { | |
do { | |
signal = signals.poll(); | |
} while (signal == null); | |
if (signal instanceof OnSubscribeWrapper) { | |
s.onSubscribe(((OnSubscribeWrapper) signal).subscription); | |
} else if (signal == ON_COMPLETE) { | |
s.onComplete(); | |
break; | |
} else if (signal instanceof OnErrorWrapper) { | |
s.onError(((OnErrorWrapper) signal).throwable); | |
break; | |
} else if (signal == CANCEL) { | |
break; | |
} else { | |
@SuppressWarnings("unchecked") | |
T next = signal == NULL ? null : (T) signal; | |
s.onNext(next); | |
} | |
// } catch (InterruptedException e) { | |
// ignore for now | |
} catch (Throwable cause) { | |
if (signal == ON_COMPLETE || signal instanceof OnErrorWrapper) { | |
cause.printStackTrace(); // test code, log this error | |
} else { | |
s.onError(cause); | |
} | |
break; | |
} | |
} | |
} | |
}); | |
publisher.subscribe(new Subscriber<T>() { | |
@Override | |
public void onSubscribe(final Subscription s) { | |
signals.offer(new OnSubscribeWrapper(new Subscription() { | |
@Override | |
public void request(long n) { | |
s.request(n); | |
} | |
@Override | |
public void cancel() { | |
try { | |
signals.offer(CANCEL); | |
} finally { | |
s.cancel(); | |
} | |
} | |
})); | |
} | |
@Override | |
public void onNext(T t) { | |
signals.offer(t == null ? NULL : t); | |
} | |
@Override | |
public void onError(Throwable t) { | |
signals.offer(new OnErrorWrapper(t)); | |
} | |
@Override | |
public void onComplete() { | |
signals.offer(ON_COMPLETE); | |
} | |
}); | |
} | |
private static final class OnErrorWrapper { | |
final Throwable throwable; | |
private OnErrorWrapper(Throwable throwable) { | |
this.throwable = throwable; | |
} | |
} | |
private static final class OnSubscribeWrapper { | |
final Subscription subscription; | |
private OnSubscribeWrapper(Subscription subscription) { | |
this.subscription = subscription; | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment