Last active
June 10, 2019 20:13
-
-
Save SergejIsbrecht/c09b3ebd1175c367daa922887563d65c to your computer and use it in GitHub Desktop.
ReactiveCache5000
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.sergejisbrecht.reactive; | |
import com.google.common.annotations.Beta; | |
import com.google.common.annotations.VisibleForTesting; | |
import io.reactivex.Observable; | |
import io.reactivex.disposables.Disposable; | |
import io.reactivex.disposables.Disposables; | |
import io.reactivex.functions.Consumer; | |
import io.reactivex.subjects.PublishSubject; | |
import io.reactivex.subjects.Subject; | |
import io.vavr.control.Option; | |
import javax.annotation.Nullable; | |
import javax.annotation.concurrent.ThreadSafe; | |
import java.util.HashMap; | |
import java.util.Objects; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
/** | |
* Thin thread-safe wrapper around an ordinary {@link HashMap}, in order to provide reactive access | |
* to changes in map. | |
* | |
* <p>NOTE: for internal use only -- API may change, therefore do not fucking use it! | |
* | |
* @param <K> key -- must be immutable by contract | |
* @param <V> value -- must be immutable by contract | |
*/ | |
@ThreadSafe | |
@Beta | |
public final class ReactiveHashMap<K, V> implements Disposable { | |
@VisibleForTesting final Subject<Command> command$$; | |
private final Observable<HashMap<K, V>> cache$; | |
private final AtomicBoolean isDisposed; | |
private ReactiveHashMap(@Nullable Consumer<? super Disposable> disposableConsumer) { | |
this(disposableConsumer, 16); | |
} | |
private ReactiveHashMap( | |
@Nullable Consumer<? super Disposable> disposableConsumer, int initialCapacity) { | |
this.command$$ = PublishSubject.<Command>create().toSerialized(); | |
this.isDisposed = new AtomicBoolean(false); | |
this.cache$ = | |
command$$ | |
// access to HashMap is sync. by RxJava onNext contract. The contract is enforced by | |
// #toSerialized() on Publisher | |
.scan(new HashMap<>(initialCapacity), this::updateMap) | |
// replay(1) will always be filled, due to seed value in cache$ | |
.replay(1) | |
// instant connect with -1 | |
.autoConnect( | |
-1, | |
disp -> { | |
Disposable disposable = | |
Disposables.fromAction( | |
() -> { | |
disp.dispose(); | |
dispose(); | |
}); | |
if (disposableConsumer != null) { | |
disposableConsumer.accept(disposable); | |
} | |
}) | |
// necessary, because when instance is disposed the replay(1) cache will not be cleared. | |
// On key-observing the cached value will be emitted although the subject has already | |
// completed. | |
.takeWhile(ignore -> !isDisposed.get()); | |
} | |
public static <K, V> ReactiveHashMap<K, V> create( | |
Consumer<? super Disposable> disposableConsumer, int initialCapacity) { | |
Objects.requireNonNull(disposableConsumer); | |
return new ReactiveHashMap<>(disposableConsumer, initialCapacity); | |
} | |
public static <K, V> ReactiveHashMap<K, V> create( | |
Consumer<? super Disposable> disposableConsumer) { | |
Objects.requireNonNull(disposableConsumer); | |
return new ReactiveHashMap<>(disposableConsumer); | |
} | |
public static <K, V> ReactiveHashMap<K, V> create() { | |
return new ReactiveHashMap<>(null); | |
} | |
/** | |
* Insert given {@code value} for {@code key} in cache. Operation will replace given {@code value} | |
* for {@code key}, when {@code key} is already in cache. | |
* | |
* @param value must be immutable by contract for propagation through {@link Observable} | |
* @exception NullPointerException when either {@code key} or {@code value} is null. | |
* @exception IllegalStateException when instance is already disposed via {@link | |
* Disposable#dispose()} | |
*/ | |
public void put(K key, V value) { | |
Objects.requireNonNull(key, "key can not be null."); | |
Objects.requireNonNull(value, "value can not be null."); | |
checkDisposed(); | |
command$$.onNext( | |
new Command.Insert<K, V>() { | |
@Override | |
public K key() { | |
return key; | |
} | |
@Override | |
public V value() { | |
return value; | |
} | |
}); | |
} | |
/** | |
* Delete given {@code key} from cache. Operation will be NOP, when {@code key} is not found in | |
* the cache. | |
* | |
* @exception IllegalStateException when instance is already disposed via {@link | |
* Disposable#dispose()} | |
*/ | |
public void delete(K key) { | |
Objects.requireNonNull(key, "key can not be null."); | |
checkDisposed(); | |
command$$.onNext( | |
new Command.Delete<K>() { | |
@Override | |
public K key() { | |
return key; | |
} | |
}); | |
} | |
/** | |
* Observe {@code value} changes for given {@code key}. When {@code key} is not in cache or | |
* removed, an {@link Option#none()} will be emitted. When a {@code value} is set for {@code key} | |
* an {@link Option#of(Object)} will be emitted. | |
* | |
* <p>NOTE: when instance is already disposed an {@link Observable} will be returned, which | |
* instantly completes. | |
* | |
* <p>NOTE: {@link io.reactivex.Observer#onNext(Object)} will be called from changing operation | |
* thread. For example Subscriber S1 subscribes on Thread T1 for key X. Now Thread T2 deletes key | |
* X. Subscription S1 will receive it's onNext invocation from T2 (sync) | |
*/ | |
public Observable<Option<V>> observe(K key) { | |
Objects.requireNonNull(key, "key can not be null."); | |
return cache$.map( | |
kvHashMap -> { | |
V v = kvHashMap.get(key); | |
if (v == null) { | |
return Option.none(); | |
} | |
return Option.of(v); | |
}); | |
} | |
private HashMap<K, V> updateMap(HashMap<K, V> map, Command command) { | |
if (command instanceof Command.Insert) { | |
Command.Insert<K, V> insertAction = (Command.Insert<K, V>) command; | |
map.put(insertAction.key(), insertAction.value()); | |
} else if (command instanceof Command.Delete) { | |
Command.Delete<K> deleteAction = (Command.Delete<K>) command; | |
map.remove(deleteAction.key()); | |
} | |
// NOP | |
return map; | |
} | |
@Override | |
public void dispose() { | |
if (isDisposed.compareAndSet(false, true)) { | |
command$$.onComplete(); | |
} | |
} | |
@Override | |
public boolean isDisposed() { | |
return isDisposed.get(); | |
} | |
private void checkDisposed() { | |
if (isDisposed()) { | |
throw new IllegalStateException( | |
"[ReactiveHashMap] Instance does not process any operations anymore due to resource disposable"); | |
} | |
} | |
private interface Command { | |
interface Insert<K, V> extends Command { | |
K key(); | |
V value(); | |
} | |
interface Delete<K> extends Command { | |
K key(); | |
} | |
} | |
} | |
-- Tests | |
package com.sergejisbrecht.reactive; | |
import io.reactivex.disposables.CompositeDisposable; | |
import io.reactivex.disposables.Disposable; | |
import io.reactivex.functions.Predicate; | |
import io.reactivex.observers.TestObserver; | |
import io.vavr.control.Option; | |
import org.junit.jupiter.api.BeforeEach; | |
import org.junit.jupiter.api.Test; | |
import static org.assertj.core.api.Assertions.assertThat; | |
import static org.assertj.core.api.Assertions.assertThatThrownBy; | |
class ReactiveHashMapTest { | |
private ReactiveHashMap<String, Integer> classUnderTest; | |
private CompositeDisposable disp; | |
@BeforeEach | |
void setUp() { | |
disp = new CompositeDisposable(); | |
classUnderTest = ReactiveHashMap.create(disp::add); | |
} | |
@Test | |
void name() { | |
TestObserver<Option<Integer>> test$ = classUnderTest.observe("T1").test(); | |
classUnderTest.put("T1", 42); | |
classUnderTest.delete("T1"); | |
test$ | |
.assertNotComplete() | |
.assertNoErrors() | |
.assertValueCount(3) | |
.assertValueAt(0, iterableContains()) | |
.assertValueAt(1, iterableContains(42)) | |
.assertValueAt(2, iterableContains()); | |
} | |
@Test | |
void subscribeAfterValuePutIn() { | |
classUnderTest.put("T1", 42); | |
TestObserver<Option<Integer>> test$ = classUnderTest.observe("T1").test(); | |
test$ | |
.assertNotComplete() | |
.assertNoErrors() | |
.assertValueCount(1) | |
.assertValueAt(0, iterableContains(42)); | |
} | |
@Test | |
void whenPassedInDisposableIsDisposed_noOperationsShouldBeProcessed() { | |
classUnderTest.put("T1", 666); | |
TestObserver<Option<Integer>> test$ = classUnderTest.observe("T1").test(); | |
disp.dispose(); | |
assertThatThrownBy( | |
() -> { | |
classUnderTest.put("T1", 666); | |
}) | |
.isExactlyInstanceOf(IllegalStateException.class); | |
} | |
@Test | |
void whenInstanceIsDisposed_noOperationsShouldBeProcessed() { | |
classUnderTest.put("T1", 666); | |
TestObserver<Option<Integer>> test$ = classUnderTest.observe("T1").test(); | |
classUnderTest.dispose(); | |
assertThatThrownBy( | |
() -> { | |
classUnderTest.put("T1", 666); | |
}) | |
.isExactlyInstanceOf(IllegalStateException.class); | |
} | |
@Test | |
void whenInstanceIsCreatedWithDisposable_disposingProvidedDisposableShouldRemoveSubscriber() { | |
assertThat(classUnderTest.command$$.hasObservers()).isTrue(); | |
Disposable disp1 = classUnderTest.observe("T1").subscribe(); | |
disp.dispose(); | |
assertThat(classUnderTest.command$$.hasObservers()).isFalse(); | |
} | |
@Test | |
void whenInstanceIsCreatedWithoutDisposable_openSubscriptionsShouldBeClosed() { | |
ReactiveHashMap<Integer, Integer> classUnderTest = ReactiveHashMap.create(); | |
assertThat(classUnderTest.command$$.hasObservers()).isTrue(); | |
Disposable disp1 = classUnderTest.observe(42).subscribe(); | |
Disposable disp2 = classUnderTest.observe(42).subscribe(); | |
assertThat(disp1.isDisposed()).isFalse(); | |
assertThat(disp2.isDisposed()).isFalse(); | |
classUnderTest.dispose(); | |
assertThat(disp1.isDisposed()).isTrue(); | |
assertThat(disp2.isDisposed()).isTrue(); | |
assertThat(classUnderTest.command$$.hasObservers()).isFalse(); | |
} | |
@Test | |
void observeDisposedInstance() { | |
ReactiveHashMap<Integer, Integer> classUnderTest = ReactiveHashMap.create(); | |
classUnderTest.dispose(); | |
classUnderTest.observe(42).test().assertNoErrors().assertNoValues().assertComplete(); | |
} | |
private Predicate<Option<Integer>> iterableContains(Integer... values) { | |
return iterable -> { | |
assertThat(iterable).containsExactly(values); | |
return true; | |
}; | |
} | |
} |
Yeah thanks, already commited :D, this is not the newest version. Yuri did some modifications that little bugger.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
https://en.wikipedia.org/wiki/Command_pattern :) . Decouples the update of the hashmap from the possible mutations (prob. there will be more of them in the future)