Skip to content

Instantly share code, notes, and snippets.

@mattrasband
Last active August 18, 2016 02:00
Show Gist options
  • Save mattrasband/75b2431bfc0ce9301003d7250b902ee2 to your computer and use it in GitHub Desktop.
Save mattrasband/75b2431bfc0ce9301003d7250b902ee2 to your computer and use it in GitHub Desktop.
rxjava observable
public class HotObservable implements Observable.OnSubscribe<String> {
private final List<Subscriber<? super String>> subscribers =
Collections.synchronizedList(new LinkedList<Subscriber<? super String>>());
private final OkHttpClient client = new OkHttpClient.Builder()
.readTimeout(0, TimeUnit.DAYS).build();
private final String eventUrl;
public SSEObservableHot(String eventUrl) {
this.eventUrl = eventUrl;
}
@Override
public void call(Subscriber<? super String> subscriber) {
subscribers.add(subscriber);
if (subscribers.size() == 1) {
new Thread(() -> {
try {
Response response = client.newCall(new Request.Builder()
.url(eventUrl)
.addHeader("Accept", "text/event-stream")
.build())
.execute();
if (!response.isSuccessful()) {
for (Subscriber<? super String> sub: subscribers) {
sub.onError(new IOException("Unable to connect to stream"));
}
return;
}
InputStream inputStream = response.body().byteStream();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
String line;
while ((line = bufferedReader.readLine()) != null) {
for (Subscriber<? super String> sub : subscribers) {
sub.onNext(line);
}
}
} catch (IOException e) {
for (Subscriber<? super String> sub: subscribers) {
sub.onError(e);
}
}
}).start();
}
}
}
@mattrasband
Copy link
Author

I know there are a few issues with this implementation (no protection on thread death, huge method) - I am just trying to learn how to make a hot observable with rxjava from an SSE stream.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment