Last active
June 7, 2021 16:01
-
-
Save DarkSeraphim/a5d0aeb91d94cf47e91905cd60eaa768 to your computer and use it in GitHub Desktop.
lock-free, eventually consistent Listener queues, with concurrent adds, iteration and removes. (CAUTION: definitely not tested properly)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
interface Listener<T> { | |
boolean isActive(); // active, default true | |
void remove(); // active = false | |
void fire(T event); | |
} | |
private final AtomicInteger next = new AtomicInteger(0); | |
private final AtomicInteger listenerCount = new AtomicInteger(0); | |
private final AtomicInteger removed = new AtomicInteger(0); | |
private volatile Listener<T>[] listeners = createArray(16); | |
private void waitForResize() { | |
while (!isResizing.get()) { | |
Thread.onSpinWait(); | |
} | |
} | |
public AutoClosable add(Listener<T> listener) { | |
int index = next.getAndIncrement(); | |
while (index >= listeners.length) { | |
// TODO: we need to ensure this doesn't trigger again right after a resize | |
waitForResize(); | |
if (index >= listeners.length && isResizing.swap(false, true)) { | |
int actualLength = pending.length - removed.get(); // Can do this lazily I think | |
pendingBuffer = createArray(actualLength * 2); // TODO: shortcut when removed is actually big enough to account for the future allocation | |
while (listenerCount.get() != listener.length) { | |
// Wait for all adds within capacity to finish | |
// corollary: all active and incoming adds will be trapped in the while loop, we can reset their count at the end of the while | |
Thread.onSpinWait(); | |
} | |
int ptr = 0; | |
for (var listener : listeners) { | |
if (listener != null && listener.isActive()) { | |
pendingBuffer[ptr] = listener; | |
ptr++; // move ptr to next | |
} | |
} | |
listeners = pendingBuffer; | |
next.set(ptr); // Move back ptr | |
listenerCount.set(ptr); // Does this work? | |
removed.set(0); // Reset removed counter | |
isResizing.set(false); | |
} else { | |
waitForResize(); | |
} | |
index = next.getAndIncrement(); // Start loop again | |
} | |
listeners[index] = listener; | |
listenerCount.increment(); | |
return listener::remove; | |
} | |
private void remove(Listener<T> listener) { | |
waitForResize(); // Don't allow removals _while_ we're resizing | |
listener.remove(); | |
removed.increment(); | |
} | |
void fire(T event) { | |
for (Listener<T> listener : listeners) { | |
listener.fire(event); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment