Skip to content

Instantly share code, notes, and snippets.

@SergejIsbrecht
Last active June 10, 2019 20:13
Show Gist options
  • Save SergejIsbrecht/c09b3ebd1175c367daa922887563d65c to your computer and use it in GitHub Desktop.
Save SergejIsbrecht/c09b3ebd1175c367daa922887563d65c to your computer and use it in GitHub Desktop.
ReactiveCache5000
package com.sergejisbrecht.reactive;
import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.Disposables;
import io.reactivex.functions.Consumer;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;
import io.vavr.control.Option;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Thin thread-safe wrapper around an ordinary {@link HashMap}, in order to provide reactive access
* to changes in map.
*
* <p>NOTE: for internal use only -- API may change, therefore do not fucking use it!
*
* @param <K> key -- must be immutable by contract
* @param <V> value -- must be immutable by contract
*/
@ThreadSafe
@Beta
public final class ReactiveHashMap<K, V> implements Disposable {
@VisibleForTesting final Subject<Command> command$$;
private final Observable<HashMap<K, V>> cache$;
private final AtomicBoolean isDisposed;
private ReactiveHashMap(@Nullable Consumer<? super Disposable> disposableConsumer) {
this(disposableConsumer, 16);
}
private ReactiveHashMap(
@Nullable Consumer<? super Disposable> disposableConsumer, int initialCapacity) {
this.command$$ = PublishSubject.<Command>create().toSerialized();
this.isDisposed = new AtomicBoolean(false);
this.cache$ =
command$$
// access to HashMap is sync. by RxJava onNext contract. The contract is enforced by
// #toSerialized() on Publisher
.scan(new HashMap<>(initialCapacity), this::updateMap)
// replay(1) will always be filled, due to seed value in cache$
.replay(1)
// instant connect with -1
.autoConnect(
-1,
disp -> {
Disposable disposable =
Disposables.fromAction(
() -> {
disp.dispose();
dispose();
});
if (disposableConsumer != null) {
disposableConsumer.accept(disposable);
}
})
// necessary, because when instance is disposed the replay(1) cache will not be cleared.
// On key-observing the cached value will be emitted although the subject has already
// completed.
.takeWhile(ignore -> !isDisposed.get());
}
public static <K, V> ReactiveHashMap<K, V> create(
Consumer<? super Disposable> disposableConsumer, int initialCapacity) {
Objects.requireNonNull(disposableConsumer);
return new ReactiveHashMap<>(disposableConsumer, initialCapacity);
}
public static <K, V> ReactiveHashMap<K, V> create(
Consumer<? super Disposable> disposableConsumer) {
Objects.requireNonNull(disposableConsumer);
return new ReactiveHashMap<>(disposableConsumer);
}
public static <K, V> ReactiveHashMap<K, V> create() {
return new ReactiveHashMap<>(null);
}
/**
* Insert given {@code value} for {@code key} in cache. Operation will replace given {@code value}
* for {@code key}, when {@code key} is already in cache.
*
* @param value must be immutable by contract for propagation through {@link Observable}
* @exception NullPointerException when either {@code key} or {@code value} is null.
* @exception IllegalStateException when instance is already disposed via {@link
* Disposable#dispose()}
*/
public void put(K key, V value) {
Objects.requireNonNull(key, "key can not be null.");
Objects.requireNonNull(value, "value can not be null.");
checkDisposed();
command$$.onNext(
new Command.Insert<K, V>() {
@Override
public K key() {
return key;
}
@Override
public V value() {
return value;
}
});
}
/**
* Delete given {@code key} from cache. Operation will be NOP, when {@code key} is not found in
* the cache.
*
* @exception IllegalStateException when instance is already disposed via {@link
* Disposable#dispose()}
*/
public void delete(K key) {
Objects.requireNonNull(key, "key can not be null.");
checkDisposed();
command$$.onNext(
new Command.Delete<K>() {
@Override
public K key() {
return key;
}
});
}
/**
* Observe {@code value} changes for given {@code key}. When {@code key} is not in cache or
* removed, an {@link Option#none()} will be emitted. When a {@code value} is set for {@code key}
* an {@link Option#of(Object)} will be emitted.
*
* <p>NOTE: when instance is already disposed an {@link Observable} will be returned, which
* instantly completes.
*
* <p>NOTE: {@link io.reactivex.Observer#onNext(Object)} will be called from changing operation
* thread. For example Subscriber S1 subscribes on Thread T1 for key X. Now Thread T2 deletes key
* X. Subscription S1 will receive it's onNext invocation from T2 (sync)
*/
public Observable<Option<V>> observe(K key) {
Objects.requireNonNull(key, "key can not be null.");
return cache$.map(
kvHashMap -> {
V v = kvHashMap.get(key);
if (v == null) {
return Option.none();
}
return Option.of(v);
});
}
private HashMap<K, V> updateMap(HashMap<K, V> map, Command command) {
if (command instanceof Command.Insert) {
Command.Insert<K, V> insertAction = (Command.Insert<K, V>) command;
map.put(insertAction.key(), insertAction.value());
} else if (command instanceof Command.Delete) {
Command.Delete<K> deleteAction = (Command.Delete<K>) command;
map.remove(deleteAction.key());
}
// NOP
return map;
}
@Override
public void dispose() {
if (isDisposed.compareAndSet(false, true)) {
command$$.onComplete();
}
}
@Override
public boolean isDisposed() {
return isDisposed.get();
}
private void checkDisposed() {
if (isDisposed()) {
throw new IllegalStateException(
"[ReactiveHashMap] Instance does not process any operations anymore due to resource disposable");
}
}
private interface Command {
interface Insert<K, V> extends Command {
K key();
V value();
}
interface Delete<K> extends Command {
K key();
}
}
}
-- Tests
package com.sergejisbrecht.reactive;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Predicate;
import io.reactivex.observers.TestObserver;
import io.vavr.control.Option;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
class ReactiveHashMapTest {
private ReactiveHashMap<String, Integer> classUnderTest;
private CompositeDisposable disp;
@BeforeEach
void setUp() {
disp = new CompositeDisposable();
classUnderTest = ReactiveHashMap.create(disp::add);
}
@Test
void name() {
TestObserver<Option<Integer>> test$ = classUnderTest.observe("T1").test();
classUnderTest.put("T1", 42);
classUnderTest.delete("T1");
test$
.assertNotComplete()
.assertNoErrors()
.assertValueCount(3)
.assertValueAt(0, iterableContains())
.assertValueAt(1, iterableContains(42))
.assertValueAt(2, iterableContains());
}
@Test
void subscribeAfterValuePutIn() {
classUnderTest.put("T1", 42);
TestObserver<Option<Integer>> test$ = classUnderTest.observe("T1").test();
test$
.assertNotComplete()
.assertNoErrors()
.assertValueCount(1)
.assertValueAt(0, iterableContains(42));
}
@Test
void whenPassedInDisposableIsDisposed_noOperationsShouldBeProcessed() {
classUnderTest.put("T1", 666);
TestObserver<Option<Integer>> test$ = classUnderTest.observe("T1").test();
disp.dispose();
assertThatThrownBy(
() -> {
classUnderTest.put("T1", 666);
})
.isExactlyInstanceOf(IllegalStateException.class);
}
@Test
void whenInstanceIsDisposed_noOperationsShouldBeProcessed() {
classUnderTest.put("T1", 666);
TestObserver<Option<Integer>> test$ = classUnderTest.observe("T1").test();
classUnderTest.dispose();
assertThatThrownBy(
() -> {
classUnderTest.put("T1", 666);
})
.isExactlyInstanceOf(IllegalStateException.class);
}
@Test
void whenInstanceIsCreatedWithDisposable_disposingProvidedDisposableShouldRemoveSubscriber() {
assertThat(classUnderTest.command$$.hasObservers()).isTrue();
Disposable disp1 = classUnderTest.observe("T1").subscribe();
disp.dispose();
assertThat(classUnderTest.command$$.hasObservers()).isFalse();
}
@Test
void whenInstanceIsCreatedWithoutDisposable_openSubscriptionsShouldBeClosed() {
ReactiveHashMap<Integer, Integer> classUnderTest = ReactiveHashMap.create();
assertThat(classUnderTest.command$$.hasObservers()).isTrue();
Disposable disp1 = classUnderTest.observe(42).subscribe();
Disposable disp2 = classUnderTest.observe(42).subscribe();
assertThat(disp1.isDisposed()).isFalse();
assertThat(disp2.isDisposed()).isFalse();
classUnderTest.dispose();
assertThat(disp1.isDisposed()).isTrue();
assertThat(disp2.isDisposed()).isTrue();
assertThat(classUnderTest.command$$.hasObservers()).isFalse();
}
@Test
void observeDisposedInstance() {
ReactiveHashMap<Integer, Integer> classUnderTest = ReactiveHashMap.create();
classUnderTest.dispose();
classUnderTest.observe(42).test().assertNoErrors().assertNoValues().assertComplete();
}
private Predicate<Option<Integer>> iterableContains(Integer... values) {
return iterable -> {
assertThat(iterable).containsExactly(values);
return true;
};
}
}
@Pfoerd
Copy link

Pfoerd commented Apr 29, 2019

https://en.wikipedia.org/wiki/Command_pattern :) . Decouples the update of the hashmap from the possible mutations (prob. there will be more of them in the future)

@SergejIsbrecht
Copy link
Author

Yeah thanks, already commited :D, this is not the newest version. Yuri did some modifications that little bugger.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment