Skip to content

Instantly share code, notes, and snippets.

@MkhytarMkhoian
Last active July 2, 2017 13:39
Show Gist options
  • Select an option

  • Save MkhytarMkhoian/92a493f98c6b8752bd45780feca58fdd to your computer and use it in GitHub Desktop.

Select an option

Save MkhytarMkhoian/92a493f98c6b8752bd45780feca58fdd to your computer and use it in GitHub Desktop.
private QueryObservable createQuery(Func1<Set<String>, Boolean> tableFilter, String sql, String... args) {
if(this.transactions.get() != null) {
throw new IllegalStateException("Cannot create observable query in transaction. Use query() for a query inside a transaction.");
} else {
BriteDatabase.DatabaseQuery query = new BriteDatabase.DatabaseQuery(tableFilter, sql, args);
final Observable queryObservable = this.triggers
.filter(tableFilter) // Only trigger on tables we care about.
.map(query) // DatabaseQuery maps to itself to save an allocation.
.onBackpressureLatest() // Guard against uncontrollable frequency of upstream emissions.
.startWith(query)
.observeOn(scheduler)
.compose(queryTransformer) // Apply the user's query transformer.
.onBackpressureLatest() // Guard against uncontrollable frequency of scheduler executions.
.doOnSubscribe(this.ensureNotInTransaction);
return new QueryObservable(new OnSubscribe() {
public void call(Subscriber<? super Query> subscriber) {
queryObservable.unsafeSubscribe(subscriber);
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment