Skip to content

Instantly share code, notes, and snippets.

@peterkuterna
Forked from anaisbetts/RxOkHttp.java
Created August 25, 2014 10:57
Show Gist options
  • Save peterkuterna/9feae438f3a790b54201 to your computer and use it in GitHub Desktop.
Save peterkuterna/9feae438f3a790b54201 to your computer and use it in GitHub Desktop.
public Observable<Toaster> streamToasters() {
// This service returns Toaster JSON objects, one per line, as
// chunked HTTP responses. This means that as the service finds
// more toasters, we recieve them incrementally, instead of waiting
// for the entire operation to complete
Request rq = new Request.Builder()
.url(ToasterService)
.get()
.addHeader("Authorization", "Bearer " + getToken())
.build();
return RxOkHttp.streamLines(client, rq)
.map(x -> new Gson().fromJson(x, Toaster.class));
}
package org.paulbetts.shroom.core;
import android.os.AsyncTask;
import com.squareup.okhttp.*;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import rx.Observable;
import rx.Subscriber;
import rx.subscriptions.Subscriptions;
import rx.functions.*;
public class RxOkHttp {
public static Observable<Response> request(OkHttpClient client, Request request) {
return Observable.create((Subscriber<? super Response> subj) -> {
final Call call = client.newCall(request);
subj.add(Subscriptions.create(call::cancel));
call.enqueue(new Callback() {
@Override
public void onFailure(Request request, IOException e) {
subj.onError(e);
}
@Override
public void onResponse(Response response) throws IOException {
Throwable error = getFailureExceptionOnBadStatus(response);
if (error != null) {
subj.onError(error);
return;
}
subj.onNext(response);
subj.onCompleted();
}
});
});
}
public static Observable<byte[]> streamBytes(OkHttpClient client, Request request) {
return request(client, request)
.flatMap(response -> Observable.create((Subscriber<? super byte[]> subj) -> {
AsyncTask t = new AsyncTask() {
@Override
protected Void doInBackground(Object[] objects) {
InputStream stream;
byte[] buffer = new byte[65536];
int bytesRead = 0;
stream = response.body().byteStream();
try {
while (bytesRead > -1 && !subj.isUnsubscribed()) {
bytesRead = stream.read(buffer, 0, 65536);
if (bytesRead < 1) continue;
subj.onNext(Arrays.copyOfRange(buffer, 0, bytesRead));
}
if (!subj.isUnsubscribed()) subj.onCompleted();
stream.close();
} catch (IOException ex) {
subj.onError(ex);
}
return null;
}
};
subj.add(Subscriptions.create(() -> t.cancel(false)));
t.execute();
}));
}
public static Observable<String> streamStrings(OkHttpClient client, Request request) {
return streamBytes(client, request)
.map(bytes -> {
try {
return new String(bytes, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("UTF8 isn't supported this will never happen");
}
});
}
public static Observable<String> streamLines(OkHttpClient client, Request request) {
return streamStrings(client, request)
.concatWith(Observable.just("\n"))
.flatMap(new Func1<String, Observable<? extends String>>() {
String remainingString = "";
@Override
public Observable<? extends String> call(String s) {
String[] lines = (remainingString + s).split("\n");
if (s.charAt(s.length() - 1) != '\n') {
remainingString = lines[lines.length - 1];
return Observable.from(Arrays.copyOfRange(lines, 0, lines.length - 1));
}
remainingString = "";
return Observable.from(lines);
}
})
.filter(x -> x.length() > 0);
}
private static Throwable getFailureExceptionOnBadStatus(Response resp) {
if (resp.code() < 399) return null;
return new FailedResponseException(resp);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment