Last active
August 21, 2023 08:19
-
-
Save stanio/a8c2114f5e39fd77173d7267e9a85ff0 to your computer and use it in GitHub Desktop.
Utility for working with Locks (replacing synchronized blocks)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* This module, both source code and documentation, | |
* is in the Public Domain, and comes with NO WARRANTY. | |
*/ | |
package net.example.concurrent; | |
import static java.util.Objects.requireNonNull; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.locks.Condition; | |
import java.util.concurrent.locks.Lock; | |
import java.util.concurrent.locks.ReentrantLock; | |
/** | |
* Encapsulates a {@code Lock} and provides convenience methods for using it. | |
* <pre> | |
* <code> Lock lock; | |
* ... | |
* lock.lock(); | |
* try { | |
* // ... | |
* } finally { | |
* lock.unlock(); | |
* }</code></pre> | |
* <p> | |
* The above:</p> | |
* <pre> | |
* <code> Sync sync = Sync.of(lock); | |
* ... | |
* sync.withLock(() -> { | |
* // ... | |
* });</code></pre> | |
* | |
* @see Lock | |
*/ | |
public class Sync<L extends Lock> { | |
private final L lock; | |
Sync(L lock) { | |
this.lock = lock; | |
} | |
/** | |
* {@return a new <code>Sync</code> instance backed up by the given lock} | |
* | |
* @param <L> the type of lock | |
* @param lock the lock backing the {@code Sync} instance | |
*/ | |
public static <L extends Lock> Sync<L> of(L lock) { | |
return new Sync<>(requireNonNull(lock, "null lock")); | |
} | |
/** | |
* {@return a new <code>Sync</code> instance backed up by a | |
* <code>ReentrantLock</code>} | |
* <p> | |
* <i>Example:</i></p> | |
* <pre> | |
* <code> private final Sync<ReentranLock> sync = Sync.reentrant(); | |
* | |
* sync.withLock(() -> { | |
* // ... | |
* }); | |
* | |
* assert !sync.getLock().isHeldByCurrentThread();</code></pre> | |
*/ | |
public static Sync<ReentrantLock> reentrant() { | |
return new Sync<>(new ReentrantLock()); | |
} | |
/** | |
* {@return the underlying <code>Lock</code>} | |
*/ | |
public L getLock() { | |
return lock; | |
} | |
/** | |
* {@return a new <code>Condition</code> instance for the underlying | |
* <code>Lock</code> instance} | |
* | |
* @throws UnsupportedOperationException if the underlying {@code Lock} | |
* implementation does not support conditions | |
* @see Lock#newCondition() | |
*/ | |
public Condition newCondition() { | |
return lock.newCondition(); | |
} | |
/** | |
* Executes the given task in a lock-obtained block. The lock is finally | |
* released regardless of the runnable outcome. | |
* | |
* @param <E> the base type of exceptions thrown by he given runnable | |
* (normally resolved by the Java compiler) | |
* @param runnable a task to execute while the lock is held | |
* @throws E if the given runnable throws exception | |
* @see Lock#lock() | |
* @see #resultWithLock(ThrowingSupplier) | |
*/ | |
public <E extends Throwable> | |
void withLock(ThrowingRunnable<E> runnable) throws E { | |
Lock l = getLock(); | |
l.lock(); | |
try { | |
runnable.run(); | |
} finally { | |
l.unlock(); | |
} | |
} | |
/** | |
* <i>withLockInterruptibly</i> | |
* | |
* @param <E> the base type of exceptions thrown by he given runnable | |
* (normally resolved by the Java compiler) | |
* @param runnable a task to execute while the lock is held | |
* @throws InterruptedException if the current thread is interrupted | |
* while acquiring the lock (and interruption of lock acquisition | |
* is supported) | |
* @throws E if the given runnable throws exception | |
* @see Lock#lockInterruptibly() | |
* @see #resultWithLockInterruptibly(ThrowingSupplier) | |
*/ | |
public <E extends Throwable> | |
void withLockInterruptibly(ThrowingRunnable<E> runnable) | |
throws InterruptedException, E { | |
Lock l = getLock(); | |
l.lockInterruptibly(); | |
try { | |
runnable.run(); | |
} finally { | |
l.unlock(); | |
} | |
} | |
/** | |
* <i>tryLock</i> | |
* | |
* @param <E> the base type of exceptions thrown by he given runnable | |
* (normally resolved by the Java compiler) | |
* @param runnable a task to execute if the lock is obtained successfully | |
* @return {@code true} if the lock was acquired and {@code false} otherwise | |
* @throws E if the given runnable throws exception | |
* @see Lock#tryLock() | |
*/ | |
public <E extends Throwable> | |
boolean tryLock(ThrowingRunnable<E> runnable) | |
throws E { | |
Lock l = getLock(); | |
if (!l.tryLock()) | |
return false; | |
try { | |
runnable.run(); | |
} finally { | |
l.unlock(); | |
} | |
return true; | |
} | |
/** | |
* <i>tryLock</i> | |
* | |
* @param <E> the base type of exceptions thrown by he given runnable | |
* (normally resolved by the Java compiler) | |
* @param runnable a task to execute if the lock is obtained successfully | |
* @param time the maximum time to wait for the lock | |
* @param unit the time unit of the time argument | |
* @return {@code true} if the lock was acquired and {@code false} if the | |
* waiting time elapsed before the lock was acquired | |
* @throws InterruptedException if the current thread is interrupted | |
* while acquiring the lock (and interruption of lock acquisition | |
* is supported) | |
* @throws E if the given runnable throws exception | |
* @see Lock#tryLock(long, TimeUnit) | |
*/ | |
public <E extends Throwable> | |
boolean tryLock(ThrowingRunnable<E> runnable, long time, TimeUnit unit) | |
throws InterruptedException, E { | |
Lock l = getLock(); | |
if (!l.tryLock(time, unit)) | |
return false; | |
try { | |
runnable.run(); | |
} finally { | |
l.unlock(); | |
} | |
return true; | |
} | |
/** | |
* Computes a result with a lock-synchronized block. | |
* <pre> | |
* <code> Object value = sync.resultWithLock(() -> { | |
* // ... | |
* return ...; | |
* });</code></pre> | |
* | |
* @param <T> the type of the result | |
* @param <E> the base type of exceptions thrown by the given supplier | |
* (normally resolved by the Java compiler) | |
* @param supplier function computing the result | |
* @return a result of given type | |
* @throws E if computing exception occurs | |
* @see Lock#lock() | |
* @see #resultWithLockInterruptibly(ThrowingSupplier) | |
*/ | |
public <T, E extends Throwable> | |
T resultWithLock(ThrowingSupplier<T, E> supplier) throws E { | |
Lock l = getLock(); | |
l.lock(); | |
try { | |
return supplier.get(); | |
} finally { | |
l.unlock(); | |
} | |
} | |
/** | |
* <i>resultWithLockInterruptibly</i> | |
* | |
* @param <T> the type of the result | |
* @param <E> the base type of exceptions thrown by the given supplier | |
* (normally resolved by the Java compiler) | |
* @param supplier function computing the result | |
* @return a result of given type | |
* @throws E if computing exception occurs | |
* @throws InterruptedException if the current thread is interrupted | |
* while acquiring the lock (and interruption of lock acquisition | |
* is supported) | |
* @see Lock#lockInterruptibly() | |
* @see #resultWithLock(ThrowingSupplier) | |
*/ | |
public <T, E extends Throwable> | |
T resultWithLockInterruptibly(ThrowingSupplier<T, E> supplier) | |
throws E, InterruptedException { | |
Lock l = getLock(); | |
l.lockInterruptibly(); | |
try { | |
return supplier.get(); | |
} finally { | |
l.unlock(); | |
} | |
} | |
/** | |
* A runnable that may throw checked exception. | |
* | |
* @param <E> the base type of exceptions thrown by this runnable | |
* @see Runnable | |
*/ | |
@FunctionalInterface | |
public static interface ThrowingRunnable<E extends Throwable> { | |
void run() throws E; | |
} | |
/** | |
* Supplier of results that may throw checked exception. | |
* | |
* @param <T> the type of results supplied by this supplier | |
* @param <E> the base type of exceptions thrown by this supplier | |
* @see java.util.function.Supplier | |
*/ | |
@FunctionalInterface | |
public static interface ThrowingSupplier<T, E extends Throwable> { | |
T get() throws E; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* This module, both source code and documentation, | |
* is in the Public Domain, and comes with NO WARRANTY. | |
*/ | |
package net.example.concurrent; | |
import java.util.concurrent.SynchronousQueue; | |
import java.util.concurrent.ThreadPoolExecutor; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.locks.Lock; | |
import java.util.stream.LongStream; | |
public class SyncTest { | |
private static final int COUNT = 10_000; | |
private final Sync<? extends Lock> sync = Sync.reentrant(); | |
private long sum; | |
public static void main(String[] args) throws Exception { | |
SyncTest test = new SyncTest(); | |
try (ParallelExecutor executor = test.newExecutor(); | |
LongStream range = LongStream.range(1, COUNT)) { | |
//range.parallel().forEach(test::addWithLock); | |
range.sequential().forEach(executor::add); | |
} | |
long expected = LongStream.range(1, COUNT).sum(); | |
// REVISIT: Ensure volatile test.sum here. | |
if (test.sum != expected) { | |
throw new AssertionError( "Sum\n" | |
+ "expected: " + expected | |
+ "\n but got: " + test.sum); | |
} | |
System.out.println("Ok"); | |
} | |
void addWithLock(long n) { | |
sync.withLock(() -> add(n)); | |
} | |
void add(long n) { | |
sum += n; | |
} | |
private ParallelExecutor newExecutor() { | |
return new ParallelExecutor(); | |
} | |
class ParallelExecutor implements AutoCloseable { | |
private final ThreadPoolExecutor executor = | |
new ThreadPoolExecutor(10, 1_000, 1, TimeUnit.MINUTES, | |
new SynchronousQueue<Runnable>()); | |
void add(long n) { | |
executor.execute(() -> SyncTest.this.addWithLock(n)); | |
} | |
@Override | |
public void close() throws Exception { | |
System.out.append("Max execution threads: ") | |
.println(executor.getLargestPoolSize()); | |
executor.shutdown(); | |
if (!executor.awaitTermination(15, TimeUnit.SECONDS)) { | |
throw new IllegalStateException("Timed out after 1 minute"); | |
} | |
} | |
} // class ParallelExecutor | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment