Last active
November 18, 2020 09:26
-
-
Save mikehearn/4781ce7f00228762adfb to your computer and use it in GitHub Desktop.
Some code for JavaFX observable collections (maps and sets) which replicate changes between threads. From the open source, Apache licensed Lighthouse project. Check there for the latest code.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Contact: [email protected] | |
package lighthouse.threading; | |
import com.google.common.util.concurrent.Uninterruptibles; | |
import javafx.application.Platform; | |
import lighthouse.protocol.LHUtils; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.util.concurrent.CompletableFuture; | |
import java.util.concurrent.Executor; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.concurrent.ScheduledThreadPoolExecutor; | |
import java.util.concurrent.atomic.AtomicReference; | |
import java.util.function.Supplier; | |
import static com.google.common.base.Preconditions.checkState; | |
import static lighthouse.protocol.LHUtils.checkedGet; | |
/** An extended executor interface that supports thread affinity assertions and short circuiting. */ | |
public interface AffinityExecutor extends Executor { | |
/** Returns true if the current thread is equal to the thread this executor is backed by. */ | |
public boolean isOnThread(); | |
/** Throws an IllegalStateException if the current thread is equal to the thread this executor is backed by. */ | |
public void checkOnThread(); | |
/** If isOnThread() then runnable is invoked immediately, otherwise the closure is queued onto the backing thread. */ | |
public void executeASAP(LHUtils.UncheckedRunnable runnable); | |
/** | |
* Runs the given function on the executor, blocking until the result is available. Be careful not to deadlock this | |
* way! Make sure the executor can't possibly be waiting for the calling thread. | |
*/ | |
public default <T> T fetchFrom(Supplier<T> fetcher) { | |
if (isOnThread()) | |
return fetcher.get(); | |
else | |
return checkedGet(CompletableFuture.supplyAsync(fetcher, this)); | |
} | |
public abstract static class BaseAffinityExecutor implements AffinityExecutor { | |
protected final Thread.UncaughtExceptionHandler exceptionHandler; | |
protected BaseAffinityExecutor() { | |
exceptionHandler = Thread.currentThread().getUncaughtExceptionHandler(); | |
} | |
@Override | |
public abstract boolean isOnThread(); | |
@Override | |
public void checkOnThread() { | |
checkState(isOnThread(), "On wrong thread: %s", Thread.currentThread()); | |
} | |
@Override | |
public void executeASAP(LHUtils.UncheckedRunnable runnable) { | |
final Runnable command = () -> { | |
try { | |
runnable.run(); | |
} catch (Throwable throwable) { | |
exceptionHandler.uncaughtException(Thread.currentThread(), throwable); | |
} | |
}; | |
if (isOnThread()) | |
command.run(); | |
else { | |
execute(command); | |
} | |
} | |
// Must comply with the Executor definition w.r.t. exceptions here. | |
@Override | |
public abstract void execute(Runnable command); | |
} | |
public static AffinityExecutor UI_THREAD = new BaseAffinityExecutor() { | |
@Override | |
public boolean isOnThread() { | |
return Platform.isFxApplicationThread(); | |
} | |
@Override | |
public void execute(Runnable command) { | |
Platform.runLater(command); | |
} | |
}; | |
public static AffinityExecutor SAME_THREAD = new BaseAffinityExecutor() { | |
@Override | |
public boolean isOnThread() { | |
return true; | |
} | |
@Override | |
public void execute(Runnable command) { | |
command.run(); | |
} | |
}; | |
public static class ServiceAffinityExecutor extends BaseAffinityExecutor { | |
private static final Logger log = LoggerFactory.getLogger(ServiceAffinityExecutor.class); | |
protected AtomicReference<Thread> whichThread = new AtomicReference<>(null); | |
private final Thread.UncaughtExceptionHandler handler = Thread.currentThread().getUncaughtExceptionHandler(); | |
public final ScheduledThreadPoolExecutor service; | |
public ServiceAffinityExecutor(String threadName) { | |
service = new ScheduledThreadPoolExecutor(1, runnable -> { | |
Thread thread = new Thread(runnable); | |
thread.setDaemon(true); | |
thread.setName(threadName); | |
whichThread.set(thread); | |
return thread; | |
}, (runnable, executor) -> { | |
log.warn("Ignored execution attempt due to shutdown: {}", runnable); | |
}); | |
} | |
@Override | |
public boolean isOnThread() { | |
return Thread.currentThread() == whichThread.get(); | |
} | |
@Override | |
public void execute(Runnable command) { | |
service.execute(() -> { | |
try { | |
command.run(); | |
} catch (Throwable e) { | |
if (handler != null) | |
handler.uncaughtException(Thread.currentThread(), e); | |
else | |
e.printStackTrace(); | |
} | |
}); | |
} | |
} | |
/** | |
* An executor useful for unit tests: allows the current thread to block until a command arrives from another | |
* thread, which is then executed. Inbound closures/commands stack up until they are cleared by looping. | |
*/ | |
public static class Gate extends BaseAffinityExecutor { | |
private final Thread thisThread = Thread.currentThread(); | |
private final LinkedBlockingQueue<Runnable> commandQ = new LinkedBlockingQueue<>(); | |
private final boolean alwaysQueue; | |
public Gate() { | |
this(false); | |
} | |
/** If alwaysQueue is true, executeASAP will never short-circuit and will always queue up. */ | |
public Gate(boolean alwaysQueue) { | |
this.alwaysQueue = alwaysQueue; | |
} | |
@Override | |
public boolean isOnThread() { | |
return !alwaysQueue && Thread.currentThread() == thisThread; | |
} | |
@Override | |
public void execute(Runnable command) { | |
Uninterruptibles.putUninterruptibly(commandQ, command); | |
} | |
public void waitAndRun() { | |
final Runnable runnable = Uninterruptibles.takeUninterruptibly(commandQ); | |
runnable.run(); | |
} | |
public int getTaskQueueSize() { | |
return commandQ.size(); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Contact: [email protected] | |
public class LHUtils { | |
private static final Logger log = LoggerFactory.getLogger(LHUtils.class); | |
public static List<Path> listDir(Path dir) throws IOException { | |
List<Path> contents = new LinkedList<>(); | |
try (Stream<Path> list = Files.list(dir)) { | |
list.forEach(contents::add); | |
} | |
return contents; | |
} | |
//region Generic Java 8 enhancements | |
public interface UncheckedRun<T> { | |
public T run() throws Throwable; | |
} | |
public interface UncheckedRunnable { | |
public void run() throws Throwable; | |
} | |
public static <T> T unchecked(UncheckedRun<T> run) { | |
try { | |
return run.run(); | |
} catch (Throwable throwable) { | |
throw new RuntimeException(throwable); | |
} | |
} | |
public static void uncheck(UncheckedRunnable run) { | |
try { | |
run.run(); | |
} catch (Throwable throwable) { | |
throw new RuntimeException(throwable); | |
} | |
} | |
public static void ignoreAndLog(UncheckedRunnable runnable) { | |
try { | |
runnable.run(); | |
} catch (Throwable t) { | |
log.error("Ignoring error", t); | |
} | |
} | |
public static <T> T ignoredAndLogged(UncheckedRun<T> runnable) { | |
try { | |
return runnable.run(); | |
} catch (Throwable t) { | |
log.error("Ignoring error", t); | |
return null; | |
} | |
} | |
@SuppressWarnings("unchecked") | |
public static <T, E extends Throwable> T checkedGet(Future<T> future) throws E { | |
try { | |
return future.get(); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} catch (ExecutionException e) { | |
throw (E) e.getCause(); | |
} | |
} | |
public static boolean didThrow(UncheckedRun run) { | |
try { | |
run.run(); | |
return false; | |
} catch (Throwable throwable) { | |
return true; | |
} | |
} | |
public static boolean didThrow(UncheckedRunnable run) { | |
try { | |
run.run(); | |
return false; | |
} catch (Throwable throwable) { | |
return true; | |
} | |
} | |
public static <T> T stopwatched(String description, UncheckedRun<T> run) { | |
long now = System.currentTimeMillis(); | |
T result = unchecked(run::run); | |
log.info("{}: {}ms", description, System.currentTimeMillis() - now); | |
return result; | |
} | |
public static void stopwatch(String description, UncheckedRunnable run) { | |
long now = System.currentTimeMillis(); | |
uncheck(run::run); | |
log.info("{}: {}ms", description, System.currentTimeMillis() - now); | |
} | |
//endregion | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Contact: [email protected] | |
package lighthouse.threading; | |
import javafx.beans.InvalidationListener; | |
import javafx.beans.Observable; | |
import javafx.collections.*; | |
import java.util.concurrent.Executor; | |
/** | |
* An attempt to make multi-threading and observable/reactive UI programming work together inside JavaFX without too | |
* many headaches. This class allows you to register change listeners on the target Observable which will be | |
* run with the given {@link java.util.concurrent.Executor}. In this way an observable collection which is updated by | |
* one thread can be observed from another thread without needing to use explicit locks or explicit marshalling. | |
*/ | |
public class MarshallingObservers { | |
public static InvalidationListener addListener(Observable observable, InvalidationListener listener, Executor executor) { | |
InvalidationListener l = x -> executor.execute(() -> listener.invalidated(x)); | |
observable.addListener(l); | |
return l; | |
} | |
public static <T> ListChangeListener<T> addListener(ObservableList<T> observable, ListChangeListener<T> listener, Executor executor) { | |
ListChangeListener<T> l = (ListChangeListener.Change<? extends T> c) -> executor.execute(() -> { | |
// Change objects are not thread safe. They may be reused by listeners following this one. However, | |
// we cheat here and exploit knowledge of the implementation: a change is basically immutable and | |
// self contained except for the iteration state. So we synchronize on the change and reset it at the | |
// start to ensure we can iterate over it safely. Note that set changes actually are immutable and | |
// so don't need this. | |
synchronized (c) { | |
c.reset(); | |
listener.onChanged(c); | |
} | |
}); | |
observable.addListener(l); | |
return l; | |
} | |
public static <T> SetChangeListener<T> addListener(ObservableSet<T> observable, SetChangeListener<T> listener, Executor executor) { | |
SetChangeListener<T> l = (SetChangeListener.Change<? extends T> c) -> executor.execute(() -> listener.onChanged(c)); | |
observable.addListener(l); | |
return l; | |
} | |
public static <K, V> MapChangeListener<K, V> addListener(ObservableMap<K, V> observable, MapChangeListener<K, V> listener, Executor executor) { | |
MapChangeListener<K, V> l = (MapChangeListener.Change<? extends K, ? extends V> c) -> executor.execute(() -> listener.onChanged(c)); | |
observable.addListener(l); | |
return l; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Contact: [email protected] | |
package lighthouse.threading; | |
import javafx.beans.WeakListener; | |
import javafx.collections.*; | |
import java.lang.ref.WeakReference; | |
import java.util.ArrayList; | |
import java.util.LinkedList; | |
import java.util.List; | |
import java.util.Set; | |
/** | |
* Utility functions that mirror changes from one list into another list. JavaFX already provides this functionality | |
* of course under the name "content binding"; a mirror is a content binding that relays changes into other threads | |
* first. Thus you can have an ObservableList which is updated in one thread, but still bound to directly in the UI | |
* thread, without needing to worry about cross-thread interference. | |
*/ | |
public class ObservableMirrors { | |
/** | |
* Creates an unmodifiable list that asynchronously follows changes in mirrored, with changes applied using | |
* the given executor. This should only be called on the thread that owns the list to be mirrored, as the contents | |
* will be read. | |
*/ | |
public static <T> ObservableList<T> mirrorList(ObservableList<T> mirrored, AffinityExecutor runChangesIn) { | |
ObservableList<T> result = FXCollections.observableArrayList(); | |
result.setAll(mirrored); | |
mirrored.addListener(new ListMirror<T>(result, runChangesIn)); | |
return FXCollections.unmodifiableObservableList(result); | |
} | |
private static class ListMirror<E> implements ListChangeListener<E>, WeakListener { | |
private final WeakReference<ObservableList<E>> targetList; | |
private final AffinityExecutor runChangesIn; | |
public ListMirror(ObservableList<E> list, AffinityExecutor runChangesIn) { | |
this.targetList = new WeakReference<>(list); | |
this.runChangesIn = runChangesIn; | |
} | |
@Override | |
public void onChanged(Change<? extends E> change) { | |
final List<E> list = targetList.get(); | |
if (list == null) { | |
change.getList().removeListener(this); | |
} else { | |
// If we're already in the right thread this will just run the change immediately, as per normal. | |
// Change objects are not thread safe. They may be reused by listeners following this one. However, | |
// we cheat here and exploit knowledge of the implementation: a change is basically immutable and | |
// self contained except for the iteration state. So we synchronize on the change and reset it at the | |
// start to ensure we can iterate over it safely. Note that set changes actually are immutable and | |
// so don't need this. | |
LinkedList<List<? extends E>> sublists = new LinkedList<>(); | |
while (change.next()) { | |
if (change.wasPermutated()) { | |
sublists.add(new ArrayList<>(change.getList().subList(change.getFrom(), change.getTo()))); | |
} else if (change.wasAdded()) { | |
sublists.add(new ArrayList<>(change.getAddedSubList())); | |
} | |
} | |
runChangesIn.executeASAP(() -> { | |
synchronized (change) { | |
change.reset(); | |
while (change.next()) { | |
if (change.wasPermutated()) { | |
list.subList(change.getFrom(), change.getTo()).clear(); | |
list.addAll(change.getFrom(), sublists.pollFirst()); | |
} else { | |
if (change.wasRemoved()) { | |
list.subList(change.getFrom(), change.getFrom() + change.getRemovedSize()).clear(); | |
} | |
if (change.wasAdded()) { | |
list.addAll(change.getFrom(), sublists.pollFirst()); | |
} | |
} | |
} | |
} | |
}); | |
} | |
} | |
@Override | |
public boolean wasGarbageCollected() { | |
return targetList.get() == null; | |
} | |
// Do we really need these? | |
@Override | |
public int hashCode() { | |
final List<E> list = targetList.get(); | |
return (list == null)? 0 : list.hashCode(); | |
} | |
@Override | |
public boolean equals(Object obj) { | |
if (this == obj) { | |
return true; | |
} | |
final List<E> list1 = targetList.get(); | |
if (list1 == null) { | |
return false; | |
} | |
if (obj instanceof ListMirror) { | |
final ListMirror<?> other = (ListMirror<?>) obj; | |
final List<?> list2 = other.targetList.get(); | |
return list1 == list2; | |
} | |
return false; | |
} | |
} | |
public static <K, V> ObservableMap<K, V> mirrorMap(ObservableMap<K, V> mirrored, AffinityExecutor runChangesIn) { | |
ObservableMap<K, V> result = FXCollections.observableHashMap(); | |
result.putAll(mirrored); | |
mirrored.addListener(new MapMirror<K, V>(result, runChangesIn)); | |
return result; | |
} | |
private static class MapMirror<K, V> implements MapChangeListener<K, V>, WeakListener { | |
private final WeakReference<ObservableMap<K, V>> targetMap; | |
private final AffinityExecutor runChangesIn; | |
public MapMirror(ObservableMap<K, V> targetMap, AffinityExecutor runChangesIn) { | |
this.targetMap = new WeakReference<>(targetMap); | |
this.runChangesIn = runChangesIn; | |
} | |
@Override | |
public boolean wasGarbageCollected() { | |
return targetMap.get() == null; | |
} | |
@Override | |
public void onChanged(Change<? extends K, ? extends V> change) { | |
final ObservableMap<K, V> map = targetMap.get(); | |
if (map == null) { | |
change.getMap().removeListener(this); | |
} else { | |
runChangesIn.executeASAP(() -> { | |
if (change.wasAdded()) { | |
map.put(change.getKey(), change.getValueAdded()); | |
} else if (change.wasRemoved()) { | |
map.remove(change.getKey()); | |
} | |
}); | |
} | |
} | |
} | |
/** | |
* Creates an unmodifiable list that asynchronously follows changes in mirrored, with changes applied using | |
* the given executor. This should only be called on the thread that owns the list to be mirrored, as the contents | |
* will be read. | |
*/ | |
public static <T> ObservableSet<T> mirrorSet(ObservableSet<T> mirrored, AffinityExecutor runChangesIn) { | |
@SuppressWarnings("unchecked") ObservableSet<T> result = FXCollections.observableSet(); | |
result.addAll(mirrored); | |
mirrored.addListener(new SetMirror<T>(result, runChangesIn)); | |
return FXCollections.unmodifiableObservableSet(result); | |
} | |
private static class SetMirror<E> implements SetChangeListener<E>, WeakListener { | |
private final WeakReference<ObservableSet<E>> targetSet; | |
private final AffinityExecutor runChangesIn; | |
public SetMirror(ObservableSet<E> set, AffinityExecutor runChangesIn) { | |
this.targetSet = new WeakReference<>(set); | |
this.runChangesIn = runChangesIn; | |
} | |
@Override | |
public void onChanged(final Change<? extends E> change) { | |
final ObservableSet<E> set = targetSet.get(); | |
if (set == null) { | |
change.getSet().removeListener(this); | |
} else { | |
// If we're already in the right thread this will just run the change immediately, as per normal. | |
runChangesIn.executeASAP(() -> { | |
if (change.wasAdded()) | |
set.add(change.getElementAdded()); | |
if (change.wasRemoved()) | |
set.remove(change.getElementRemoved()); | |
}); | |
} | |
} | |
@Override | |
public boolean wasGarbageCollected() { | |
return targetSet.get() == null; | |
} | |
@Override | |
public int hashCode() { | |
final ObservableSet<E> set = targetSet.get(); | |
return (set == null)? 0 : set.hashCode(); | |
} | |
@Override | |
public boolean equals(Object obj) { | |
if (this == obj) { | |
return true; | |
} | |
final Set<E> set1 = targetSet.get(); | |
if (set1 == null) { | |
return false; | |
} | |
if (obj instanceof SetMirror) { | |
final SetMirror<?> other = (SetMirror<?>) obj; | |
final Set<?> list2 = other.targetSet.get(); | |
return set1 == list2; | |
} | |
return false; | |
} | |
} | |
} |
hey @TomasMikula sorry I didn't see this message before. Yes you were right. It's fixed in the current code.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi Mike,
if I'm not missing something, in your
ListMirror
class, calls toand
from the
runChangesIn
executor (i.e. generally from a different thread) will return wrong results or throw out-of-bounds exception in case the source list has been modified beforeexecuteASAP
actually executed the action. Am I right?