Created
July 11, 2019 14:58
-
-
Save athlan/b357b718a28a9ea83973ed9ff25eb77d to your computer and use it in GitHub Desktop.
FlyweightConcurrentSupplier Java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pl.athlan.common.concurrent; | |
import static java.util.Objects.requireNonNull; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.locks.ReadWriteLock; | |
import java.util.concurrent.locks.ReentrantReadWriteLock; | |
import java.util.function.Supplier; | |
/** | |
* Thread-safe Supplier implementation of accessing decorated {@link Supplier} | |
* in a way that only one thread can access it at the time and value obtained from decorated Supplier | |
* is shared across waiting threads. | |
* | |
* <p>If no threads are waiting after execution, shared value is not stored. | |
* | |
* <p>Use case: When computation of value is memory consuming and result between obtaining it | |
* within t0 and t+1 does not matter (it can be cached/shared). Crucial is to free up a memory after execution. | |
* | |
* @param <T> the type of results supplied by this supplier | |
*/ | |
public final class FlyweightConcurrentSupplier<T> implements Supplier<T> { | |
private final Supplier<T> delegate; | |
private final ReadWriteLock rwl = new ReentrantReadWriteLock(); | |
private final AtomicInteger queueLength = new AtomicInteger(); | |
private T sharedValue; | |
private volatile boolean sharedValueAvailable; | |
public FlyweightConcurrentSupplier(Supplier<T> delegate) { | |
this.delegate = requireNonNull(delegate, "delegate cannot be null"); | |
} | |
@Override | |
public T get() { | |
queueLength.getAndIncrement(); | |
rwl.readLock().lock(); | |
if (!sharedValueAvailable) { | |
// Must release read lock before acquiring write lock | |
rwl.readLock().unlock(); | |
rwl.writeLock().lock(); | |
try { | |
// Recheck state because another thread might have | |
// acquired write lock and changed state before we did. | |
if (!sharedValueAvailable) { | |
sharedValue = delegate.get(); | |
sharedValueAvailable = true; | |
} | |
// Downgrade by acquiring read lock before releasing write lock | |
rwl.readLock().lock(); | |
} finally { | |
rwl.writeLock().unlock(); // Unlock write, still hold read | |
} | |
} | |
try { | |
T result = sharedValue; | |
// No thread is waiting for shared value, so can be cleaned up. | |
if (queueLength.decrementAndGet() == 0) { | |
rwl.readLock().unlock(); | |
rwl.writeLock().lock(); | |
// Recheck if no thread joined at the meantime | |
// and would potentially get null. | |
if (queueLength.compareAndSet(0, 0)) { | |
sharedValue = null; | |
sharedValueAvailable = false; | |
} | |
rwl.readLock().lock(); | |
rwl.writeLock().unlock(); | |
} | |
return result; | |
} finally { | |
rwl.readLock().unlock(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment