Skip to content

Instantly share code, notes, and snippets.

@ar-android
Created November 18, 2017 07:25
Show Gist options
  • Save ar-android/1b82861f2832386dc1ff18005f0b08b6 to your computer and use it in GitHub Desktop.
Save ar-android/1b82861f2832386dc1ff18005f0b08b6 to your computer and use it in GitHub Desktop.
package com.klinikraisha.apiretrofit.relay;
import io.reactivex.functions.Predicate;
/**
* Created by ocittwo on 11/18/17.
*/
public class AppendOnlyLinkedArrayList<T> {
private final int capacity;
private final Object[] head;
private Object[] tail;
private int offset;
/**
* Constructs an empty list with a per-link capacity.
* @param capacity the capacity of each link
*/
AppendOnlyLinkedArrayList(int capacity) {
this.capacity = capacity;
this.head = new Object[capacity + 1];
this.tail = head;
}
/**
* Append a non-null value to the list.
* <p>Don't add null to the list!
* @param value the value to append
*/
void add(T value) {
final int c = capacity;
int o = offset;
if (o == c) {
Object[] next = new Object[c + 1];
tail[c] = next;
tail = next;
o = 0;
}
tail[o] = value;
offset = o + 1;
}
/**
* Predicate interface suppressing the exception.
*
* @param <T> the value type
*/
public interface NonThrowingPredicate<T> extends Predicate<T> {
@Override
boolean test(T t);
}
/**
* Loops over all elements of the array until a null element is encountered or
* the given predicate returns true.
* @param consumer the consumer of values that returns true if the forEach should terminate
*/
@SuppressWarnings("unchecked")
void forEachWhile(NonThrowingPredicate<? super T> consumer) {
Object[] a = head;
final int c = capacity;
while (a != null) {
for (int i = 0; i < c; i++) {
Object o = a[i];
if (o == null) {
break;
}
if (consumer.test((T)o)) {
break;
}
}
a = (Object[])a[c];
}
}
@SuppressWarnings("unchecked")
boolean accept(Relay<? super T> observer) {
Object[] a = head;
final int c = capacity;
while (a != null) {
for (int i = 0; i < c; i++) {
Object o = a[i];
if (o == null) {
break;
}
observer.accept((T) o);
}
a = (Object[])a[c];
}
return false;
}
}
package com.klinikraisha.apiretrofit.relay;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
/**
* Created by ocittwo on 11/18/17.
*/
public final class PublishRelay<T> extends Relay<T> {
/** An empty subscribers array to avoid allocating it all the time. */
@SuppressWarnings("rawtypes")
private static final PublishDisposable[] EMPTY = new PublishDisposable[0];
/** The array of currently subscribed subscribers. */
private final AtomicReference<PublishDisposable<T>[]> subscribers;
/**
* Constructs a PublishRelay.
*/
public static <T> PublishRelay<T> create() {
return new PublishRelay<T>();
}
/**
* Constructs a PublishRelay.
*/
@SuppressWarnings("unchecked") private PublishRelay() {
subscribers = new AtomicReference<PublishDisposable<T>[]>(EMPTY);
}
@Override
public void subscribeActual(Observer<? super T> t) {
PublishDisposable<T> ps = new PublishDisposable<T>(t, this);
t.onSubscribe(ps);
add(ps);
// if cancellation happened while a successful add, the remove() didn't work
// so we need to do it again
if (ps.isDisposed()) {
remove(ps);
}
}
/**
* Adds the given subscriber to the subscribers array atomically.
* @param ps the subscriber to add
*/
private void add(PublishDisposable<T> ps) {
for (;;) {
PublishDisposable<T>[] a = subscribers.get();
int n = a.length;
@SuppressWarnings("unchecked")
PublishDisposable<T>[] b = new PublishDisposable[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = ps;
if (subscribers.compareAndSet(a, b)) {
return;
}
}
}
/**
* Atomically removes the given subscriber if it is subscribed to the subject.
* @param ps the subject to remove
*/
@SuppressWarnings("unchecked")
void remove(PublishDisposable<T> ps) {
for (;;) {
PublishDisposable<T>[] a = subscribers.get();
if (a == EMPTY) {
return;
}
int n = a.length;
int j = -1;
for (int i = 0; i < n; i++) {
if (a[i] == ps) {
j = i;
break;
}
}
if (j < 0) {
return;
}
PublishDisposable<T>[] b;
if (n == 1) {
b = EMPTY;
} else {
b = new PublishDisposable[n - 1];
System.arraycopy(a, 0, b, 0, j);
System.arraycopy(a, j + 1, b, j, n - j - 1);
}
if (subscribers.compareAndSet(a, b)) {
return;
}
}
}
@Override
public void accept(T value) {
if (value == null) throw new NullPointerException("value == null");
for (PublishDisposable<T> s : subscribers.get()) {
s.onNext(value);
}
}
@Override
public boolean hasObservers() {
return subscribers.get().length != 0;
}
/**
* Wraps the actual subscriber, tracks its requests and makes cancellation
* to remove itself from the current subscribers array.
*
* @param <T> the value type
*/
static final class PublishDisposable<T> extends AtomicBoolean implements Disposable {
private static final long serialVersionUID = 3562861878281475070L;
/** The actual subscriber. */
final Observer<? super T> actual;
/** The subject state. */
final PublishRelay<T> parent;
/**
* Constructs a PublishSubscriber, wraps the actual subscriber and the state.
* @param actual the actual subscriber
* @param parent the parent PublishProcessor
*/
PublishDisposable(Observer<? super T> actual, PublishRelay<T> parent) {
this.actual = actual;
this.parent = parent;
}
void onNext(T t) {
if (!get()) {
actual.onNext(t);
}
}
@Override
public void dispose() {
if (compareAndSet(false, true)) {
parent.remove(this);
}
}
@Override
public boolean isDisposed() {
return get();
}
}
}
package com.klinikraisha.apiretrofit.relay;
import io.reactivex.Observable;
import io.reactivex.functions.Consumer;
/**
* Created by ocittwo on 11/18/17.
*/
public abstract class Relay<T> extends Observable<T> implements Consumer<T> {
/** {@inheritDoc} */
@Override public abstract void accept(T value); // Redeclare without checked exception.
/**
* Returns true if the subject has any Observers.
* <p>The method is thread-safe.
*/
public abstract boolean hasObservers();
/**
* Wraps this Relay and serializes the calls to {@link #accept}, making it thread-safe.
* <p>The method is thread-safe.
*/
public final Relay<T> toSerialized() {
if (this instanceof SerializedRelay) {
return this;
}
return new SerializedRelay<T>(this);
}
}
package com.klinikraisha.apiretrofit.bus;
import com.klinikraisha.apiretrofit.relay.PublishRelay;
import com.klinikraisha.apiretrofit.relay.Relay;
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
/**
* Created by ocittwo on 11/18/17.
*/
public class RxEventBus {
private static volatile RxEventBus ourInstance = null;
public static RxEventBus getInstance() {
if (ourInstance == null) {
synchronized (RxEventBus.class) {
if (ourInstance == null) {
ourInstance = new RxEventBus();
}
}
}
return ourInstance;
}
private final Relay<Object> relay;
private RxEventBus() {
relay = PublishRelay.create().toSerialized();
}
public void post(Object event) {
relay.accept(event);
}
public <T> Disposable receive(final Class<T> clazz, Consumer<T> consumer) {
return receive(clazz).subscribe(consumer);
}
public <T> Observable<T> receive(final Class<T> clazz) {
return receive().ofType(clazz);
}
public Observable<Object> receive() {
return relay;
}
}
package com.klinikraisha.apiretrofit.relay;
import io.reactivex.Observer;
/**
* Created by ocittwo on 11/18/17.
*/
public final class SerializedRelay<T> extends Relay<T> {
/** The actual subscriber to serialize Subscriber calls to. */
private final Relay<T> actual;
/** Indicates an emission is going on, guarded by this. */
private boolean emitting;
/** If not null, it holds the missed NotificationLite events. */
private AppendOnlyLinkedArrayList<T> queue;
/**
* Constructor that wraps an actual relay.
*/
SerializedRelay(final Relay<T> actual) {
this.actual = actual;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
actual.subscribe(observer);
}
@Override
public void accept(T value) {
synchronized (this) {
if (emitting) {
AppendOnlyLinkedArrayList<T> q = queue;
if (q == null) {
q = new AppendOnlyLinkedArrayList<T>(4);
queue = q;
}
q.add(value);
return;
}
emitting = true;
}
actual.accept(value);
emitLoop();
}
/** Loops until all notifications in the queue has been processed. */
private void emitLoop() {
for (;;) {
AppendOnlyLinkedArrayList<T> q;
synchronized (this) {
q = queue;
if (q == null) {
emitting = false;
return;
}
queue = null;
}
q.accept(actual);
}
}
@Override
public boolean hasObservers() {
return actual.hasObservers();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment