Created
August 18, 2014 20:39
-
-
Save NiteshKant/da9ed536b88339e9b454 to your computer and use it in GitHub Desktop.
If the client unsubscribes without getting the content, the next request reuses the connection and gets the older response.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2014 Netflix, Inc. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package io.reactivex.netty.protocol.http; | |
import io.netty.buffer.ByteBuf; | |
import io.netty.handler.codec.http.HttpResponseStatus; | |
import io.netty.handler.logging.LogLevel; | |
import io.reactivex.netty.RxNetty; | |
import io.reactivex.netty.protocol.http.client.HttpClientRequest; | |
import io.reactivex.netty.protocol.http.client.HttpClientResponse; | |
import io.reactivex.netty.protocol.http.server.HttpServerRequest; | |
import io.reactivex.netty.protocol.http.server.HttpServerResponse; | |
import io.reactivex.netty.protocol.http.server.RequestHandler; | |
import org.junit.Assert; | |
import rx.Observable; | |
import rx.functions.Action1; | |
import rx.functions.Func1; | |
import rx.schedulers.Schedulers; | |
import rx.schedulers.TestScheduler; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
/** | |
* @author Nitesh Kant | |
*/ | |
public class TTT { | |
public static void main(String[] args) { | |
final TestScheduler serverTestScheduler = Schedulers.test(); | |
final AtomicBoolean delayedResponse = new AtomicBoolean(true); | |
RxNetty.newHttpServerBuilder(9999, new RequestHandler<ByteBuf, ByteBuf>() { | |
@Override | |
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, | |
final HttpServerResponse<ByteBuf> response) { | |
if (!delayedResponse.get()) { | |
response.setStatus(HttpResponseStatus.NOT_FOUND); | |
return response.close(true); | |
} | |
return Observable.interval(1, TimeUnit.DAYS, serverTestScheduler) | |
.flatMap(new Func1<Long, Observable<Void>>() { | |
@Override | |
public Observable<Void> call(Long aLong) { | |
response.setStatus(HttpResponseStatus.NO_CONTENT); | |
return response.close(true); | |
} | |
}); | |
} | |
}).enableWireLogging(LogLevel.ERROR).build().start(); | |
HttpClientResponse<ByteBuf> response = RxNetty.createHttpClient("localhost", 9999) | |
.submit(HttpClientRequest.createGet("")) | |
.timeout(1, TimeUnit.SECONDS) | |
.doOnError(new Action1<Throwable>() { | |
@Override | |
public void call(Throwable throwable) { | |
System.out.println(">>>>>>>>>>>>>>"); | |
throwable.printStackTrace(); | |
serverTestScheduler.advanceTimeBy(1, TimeUnit.DAYS); | |
delayedResponse.set(false); | |
} | |
}) | |
.retry(1).toBlocking().single(); | |
Assert.assertEquals(HttpResponseStatus.NOT_FOUND, response.getStatus()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment