Skip to content

Instantly share code, notes, and snippets.

@alexwen
Created April 25, 2015 15:59
Show Gist options
  • Save alexwen/4b337bc669509a696b5b to your computer and use it in GitHub Desktop.
Save alexwen/4b337bc669509a696b5b to your computer and use it in GitHub Desktop.
OkHttp Observable Callback
public Observable<FilesWrapper> download(List<Thing> things) {
return Observable.from(things)
.flatMap(thing -> {
File file = new File(getExternalCacheDir() + File.separator + thing.getName());
if (file.exists()) {
return Observable.just(file);
}
Request request = new Request.Builder().url(thing.getUrl()).build();
final ObservableCallback callback = new ObservableCallback();
client.newCall(request).enqueue(callback);
return callback.getObservable().map(response -> {
try (BufferedSink sink = Okio.buffer(Okio.sink(file))) {
final ResponseBody body = response.body();
sink.writeAll(body.source());
} catch (IOException io) {
throw OnErrorThrowable.from(OnErrorThrowable.addValueAsLastCause(io, thing));
}
return file;
});
})
.toList()
.map(files -> new FilesWrapper(files));
}
private static class ObservableCallback implements Callback {
private final AsyncSubject<Response> subject = AsyncSubject.create();
@Override
public void onFailure(Request request, IOException e) {
subject.onError(OnErrorThrowable.from(OnErrorThrowable.addValueAsLastCause(e, request)));
}
@Override
public void onResponse(Response response) throws IOException {
subject.onNext(response);
subject.onCompleted();
}
public Observable<Response> getObservable() {
return subject.asObservable();
}
}
@code-flavius-mester
Copy link

after sink.writeAll make sure you close the stream with sink.close();
Otherwise you will have problems when opening the file.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment