Created
November 18, 2017 07:25
-
-
Save ar-android/1b82861f2832386dc1ff18005f0b08b6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.klinikraisha.apiretrofit.relay; | |
import io.reactivex.functions.Predicate; | |
/** | |
* Created by ocittwo on 11/18/17. | |
*/ | |
public class AppendOnlyLinkedArrayList<T> { | |
private final int capacity; | |
private final Object[] head; | |
private Object[] tail; | |
private int offset; | |
/** | |
* Constructs an empty list with a per-link capacity. | |
* @param capacity the capacity of each link | |
*/ | |
AppendOnlyLinkedArrayList(int capacity) { | |
this.capacity = capacity; | |
this.head = new Object[capacity + 1]; | |
this.tail = head; | |
} | |
/** | |
* Append a non-null value to the list. | |
* <p>Don't add null to the list! | |
* @param value the value to append | |
*/ | |
void add(T value) { | |
final int c = capacity; | |
int o = offset; | |
if (o == c) { | |
Object[] next = new Object[c + 1]; | |
tail[c] = next; | |
tail = next; | |
o = 0; | |
} | |
tail[o] = value; | |
offset = o + 1; | |
} | |
/** | |
* Predicate interface suppressing the exception. | |
* | |
* @param <T> the value type | |
*/ | |
public interface NonThrowingPredicate<T> extends Predicate<T> { | |
@Override | |
boolean test(T t); | |
} | |
/** | |
* Loops over all elements of the array until a null element is encountered or | |
* the given predicate returns true. | |
* @param consumer the consumer of values that returns true if the forEach should terminate | |
*/ | |
@SuppressWarnings("unchecked") | |
void forEachWhile(NonThrowingPredicate<? super T> consumer) { | |
Object[] a = head; | |
final int c = capacity; | |
while (a != null) { | |
for (int i = 0; i < c; i++) { | |
Object o = a[i]; | |
if (o == null) { | |
break; | |
} | |
if (consumer.test((T)o)) { | |
break; | |
} | |
} | |
a = (Object[])a[c]; | |
} | |
} | |
@SuppressWarnings("unchecked") | |
boolean accept(Relay<? super T> observer) { | |
Object[] a = head; | |
final int c = capacity; | |
while (a != null) { | |
for (int i = 0; i < c; i++) { | |
Object o = a[i]; | |
if (o == null) { | |
break; | |
} | |
observer.accept((T) o); | |
} | |
a = (Object[])a[c]; | |
} | |
return false; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.klinikraisha.apiretrofit.relay; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicReference; | |
import io.reactivex.Observer; | |
import io.reactivex.disposables.Disposable; | |
/** | |
* Created by ocittwo on 11/18/17. | |
*/ | |
public final class PublishRelay<T> extends Relay<T> { | |
/** An empty subscribers array to avoid allocating it all the time. */ | |
@SuppressWarnings("rawtypes") | |
private static final PublishDisposable[] EMPTY = new PublishDisposable[0]; | |
/** The array of currently subscribed subscribers. */ | |
private final AtomicReference<PublishDisposable<T>[]> subscribers; | |
/** | |
* Constructs a PublishRelay. | |
*/ | |
public static <T> PublishRelay<T> create() { | |
return new PublishRelay<T>(); | |
} | |
/** | |
* Constructs a PublishRelay. | |
*/ | |
@SuppressWarnings("unchecked") private PublishRelay() { | |
subscribers = new AtomicReference<PublishDisposable<T>[]>(EMPTY); | |
} | |
@Override | |
public void subscribeActual(Observer<? super T> t) { | |
PublishDisposable<T> ps = new PublishDisposable<T>(t, this); | |
t.onSubscribe(ps); | |
add(ps); | |
// if cancellation happened while a successful add, the remove() didn't work | |
// so we need to do it again | |
if (ps.isDisposed()) { | |
remove(ps); | |
} | |
} | |
/** | |
* Adds the given subscriber to the subscribers array atomically. | |
* @param ps the subscriber to add | |
*/ | |
private void add(PublishDisposable<T> ps) { | |
for (;;) { | |
PublishDisposable<T>[] a = subscribers.get(); | |
int n = a.length; | |
@SuppressWarnings("unchecked") | |
PublishDisposable<T>[] b = new PublishDisposable[n + 1]; | |
System.arraycopy(a, 0, b, 0, n); | |
b[n] = ps; | |
if (subscribers.compareAndSet(a, b)) { | |
return; | |
} | |
} | |
} | |
/** | |
* Atomically removes the given subscriber if it is subscribed to the subject. | |
* @param ps the subject to remove | |
*/ | |
@SuppressWarnings("unchecked") | |
void remove(PublishDisposable<T> ps) { | |
for (;;) { | |
PublishDisposable<T>[] a = subscribers.get(); | |
if (a == EMPTY) { | |
return; | |
} | |
int n = a.length; | |
int j = -1; | |
for (int i = 0; i < n; i++) { | |
if (a[i] == ps) { | |
j = i; | |
break; | |
} | |
} | |
if (j < 0) { | |
return; | |
} | |
PublishDisposable<T>[] b; | |
if (n == 1) { | |
b = EMPTY; | |
} else { | |
b = new PublishDisposable[n - 1]; | |
System.arraycopy(a, 0, b, 0, j); | |
System.arraycopy(a, j + 1, b, j, n - j - 1); | |
} | |
if (subscribers.compareAndSet(a, b)) { | |
return; | |
} | |
} | |
} | |
@Override | |
public void accept(T value) { | |
if (value == null) throw new NullPointerException("value == null"); | |
for (PublishDisposable<T> s : subscribers.get()) { | |
s.onNext(value); | |
} | |
} | |
@Override | |
public boolean hasObservers() { | |
return subscribers.get().length != 0; | |
} | |
/** | |
* Wraps the actual subscriber, tracks its requests and makes cancellation | |
* to remove itself from the current subscribers array. | |
* | |
* @param <T> the value type | |
*/ | |
static final class PublishDisposable<T> extends AtomicBoolean implements Disposable { | |
private static final long serialVersionUID = 3562861878281475070L; | |
/** The actual subscriber. */ | |
final Observer<? super T> actual; | |
/** The subject state. */ | |
final PublishRelay<T> parent; | |
/** | |
* Constructs a PublishSubscriber, wraps the actual subscriber and the state. | |
* @param actual the actual subscriber | |
* @param parent the parent PublishProcessor | |
*/ | |
PublishDisposable(Observer<? super T> actual, PublishRelay<T> parent) { | |
this.actual = actual; | |
this.parent = parent; | |
} | |
void onNext(T t) { | |
if (!get()) { | |
actual.onNext(t); | |
} | |
} | |
@Override | |
public void dispose() { | |
if (compareAndSet(false, true)) { | |
parent.remove(this); | |
} | |
} | |
@Override | |
public boolean isDisposed() { | |
return get(); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.klinikraisha.apiretrofit.relay; | |
import io.reactivex.Observable; | |
import io.reactivex.functions.Consumer; | |
/** | |
* Created by ocittwo on 11/18/17. | |
*/ | |
public abstract class Relay<T> extends Observable<T> implements Consumer<T> { | |
/** {@inheritDoc} */ | |
@Override public abstract void accept(T value); // Redeclare without checked exception. | |
/** | |
* Returns true if the subject has any Observers. | |
* <p>The method is thread-safe. | |
*/ | |
public abstract boolean hasObservers(); | |
/** | |
* Wraps this Relay and serializes the calls to {@link #accept}, making it thread-safe. | |
* <p>The method is thread-safe. | |
*/ | |
public final Relay<T> toSerialized() { | |
if (this instanceof SerializedRelay) { | |
return this; | |
} | |
return new SerializedRelay<T>(this); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.klinikraisha.apiretrofit.bus; | |
import com.klinikraisha.apiretrofit.relay.PublishRelay; | |
import com.klinikraisha.apiretrofit.relay.Relay; | |
import io.reactivex.Observable; | |
import io.reactivex.disposables.Disposable; | |
import io.reactivex.functions.Consumer; | |
/** | |
* Created by ocittwo on 11/18/17. | |
*/ | |
public class RxEventBus { | |
private static volatile RxEventBus ourInstance = null; | |
public static RxEventBus getInstance() { | |
if (ourInstance == null) { | |
synchronized (RxEventBus.class) { | |
if (ourInstance == null) { | |
ourInstance = new RxEventBus(); | |
} | |
} | |
} | |
return ourInstance; | |
} | |
private final Relay<Object> relay; | |
private RxEventBus() { | |
relay = PublishRelay.create().toSerialized(); | |
} | |
public void post(Object event) { | |
relay.accept(event); | |
} | |
public <T> Disposable receive(final Class<T> clazz, Consumer<T> consumer) { | |
return receive(clazz).subscribe(consumer); | |
} | |
public <T> Observable<T> receive(final Class<T> clazz) { | |
return receive().ofType(clazz); | |
} | |
public Observable<Object> receive() { | |
return relay; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.klinikraisha.apiretrofit.relay; | |
import io.reactivex.Observer; | |
/** | |
* Created by ocittwo on 11/18/17. | |
*/ | |
public final class SerializedRelay<T> extends Relay<T> { | |
/** The actual subscriber to serialize Subscriber calls to. */ | |
private final Relay<T> actual; | |
/** Indicates an emission is going on, guarded by this. */ | |
private boolean emitting; | |
/** If not null, it holds the missed NotificationLite events. */ | |
private AppendOnlyLinkedArrayList<T> queue; | |
/** | |
* Constructor that wraps an actual relay. | |
*/ | |
SerializedRelay(final Relay<T> actual) { | |
this.actual = actual; | |
} | |
@Override | |
protected void subscribeActual(Observer<? super T> observer) { | |
actual.subscribe(observer); | |
} | |
@Override | |
public void accept(T value) { | |
synchronized (this) { | |
if (emitting) { | |
AppendOnlyLinkedArrayList<T> q = queue; | |
if (q == null) { | |
q = new AppendOnlyLinkedArrayList<T>(4); | |
queue = q; | |
} | |
q.add(value); | |
return; | |
} | |
emitting = true; | |
} | |
actual.accept(value); | |
emitLoop(); | |
} | |
/** Loops until all notifications in the queue has been processed. */ | |
private void emitLoop() { | |
for (;;) { | |
AppendOnlyLinkedArrayList<T> q; | |
synchronized (this) { | |
q = queue; | |
if (q == null) { | |
emitting = false; | |
return; | |
} | |
queue = null; | |
} | |
q.accept(actual); | |
} | |
} | |
@Override | |
public boolean hasObservers() { | |
return actual.hasObservers(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment