Last active
January 10, 2021 12:07
-
-
Save wkgcass/929f9bee30b946dd830e2a4a1a669403 to your computer and use it in GitHub Desktop.
TimeWheel example (TODO)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package vproxybase.util.time.timewheel; | |
import vproxybase.util.LogType; | |
import vproxybase.util.Logger; | |
import vproxybase.util.time.TimeElem; | |
public class TimeElemImpl<T> implements TimeElem<T> { | |
final long timeoutTs; | |
private final T value; | |
boolean polled = false; | |
boolean canceled = false; | |
public TimeElemImpl(long timeoutTs, T value) { | |
this.timeoutTs = timeoutTs; | |
this.value = value; | |
} | |
@Override | |
public T get() { | |
return value; | |
} | |
@Override | |
public void removeSelf() { | |
if (polled) { | |
Logger.warn(LogType.IMPROPER_USE, "canceling a time event while it's already handled"); | |
return; | |
} | |
if (canceled) { | |
return; | |
} | |
canceled = true; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package vproxybase.util.time.timewheel; | |
import java.util.LinkedList; | |
import java.util.List; | |
public class TimeoutQueue<E> { | |
private final LinkedList<TimeElemImpl<E>> queue = new LinkedList<>(); | |
public void add(TimeElemImpl<E> e) { | |
if (e.canceled) { | |
return; | |
} | |
queue.add(e); | |
} | |
public E poll() { | |
TimeElemImpl<E> e; | |
while ((e = queue.pollFirst()) != null) { | |
if (e.canceled) { | |
continue; | |
} | |
e.polled = true; | |
return e.get(); | |
} | |
return null; | |
} | |
public void addAll(List<TimeElemImpl<E>> list) { | |
for (var e : list) { | |
add(e); | |
} | |
} | |
public boolean isEmpty() { | |
// TODO | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package vproxybase.util.time.timewheel; | |
import vproxybase.util.time.TimeElem; | |
import vproxybase.util.time.TimeQueue; | |
import java.util.PriorityQueue; | |
public class TimeWheel<T> implements TimeQueue<T> { | |
private final TimeoutQueue<T> timeoutQueue = new TimeoutQueue<>(); | |
private final TimeWheelLayer<T>[] layers; | |
private final PriorityQueue<TimeElemImpl<T>> backlogQueue = new PriorityQueue<>((a, b) -> (int) (a.timeoutTs - b.timeoutTs)); | |
private long last; // last timestamp | |
public TimeWheel(int[] capacityOfEachLayer, int layer0Tick) { | |
//noinspection unchecked | |
layers = new TimeWheelLayer[capacityOfEachLayer.length]; | |
int tick = layer0Tick; | |
for (int layer = 0; layer < capacityOfEachLayer.length; ++layer) { | |
int cap = capacityOfEachLayer[layer]; | |
layers[layer] = new TimeWheelLayer<>(tick, layer, cap); | |
tick *= cap; | |
} | |
} | |
@Override | |
public TimeElem<T> add(long current, int timeout, T elem) { | |
update(current); | |
// TODO | |
} | |
@Override | |
public T poll() { | |
T t = timeoutQueue.poll(); | |
if (t != null) return t; | |
// TODO | |
} | |
@Override | |
public boolean isEmpty() { | |
// TODO | |
} | |
@Override | |
public int nextTime(long current) { | |
update(current); | |
if (!timeoutQueue.isEmpty()) return 0; // may directly poll | |
// TODO | |
} | |
private void update(long current) { | |
if (last == current) { | |
return; // no need to update | |
} | |
if (last == 0) { | |
last = current; | |
for (var layer : layers) { | |
layer.curSlotTs = current; | |
} | |
return; // not initialized yet, no need to process elements | |
} | |
int updatedLayer; // the layers whose index <= updatedLayer are updated | |
for (updatedLayer = 0; updatedLayer < layers.length; ++updatedLayer) { | |
var layer = layers[updatedLayer]; | |
int delta = (int) (current - layer.curSlotTs); | |
int off = layer.off + delta / layer.tick; | |
layer.flush(layer.off, off, timeoutQueue); | |
layer.curSlotTs += (off - layer.off) * layer.tick; | |
if (off < layer.cap) { | |
layer.off = off; | |
break; // still within this layer after updating | |
} else { | |
layer.off = off % layer.cap; | |
} | |
} | |
if (updatedLayer == layers.length) { | |
// all layers are flushed | |
// handle those added into wrapped slots in the highest layer | |
var layer = layers[layers.length - 1]; | |
layer.flush(0, layer.off, timeoutQueue); | |
} | |
// otherwise | |
flatten(updatedLayer); | |
} | |
private void flatten(int updatedLayer) { | |
// extract time elements from higher layer to lower layer | |
for (int i = updatedLayer; i > 0; --i) { | |
var layer = layers[updatedLayer]; | |
var lower = layers[updatedLayer - 1]; | |
var elements = layer.slots[layer.off]; | |
for (var e : elements) { | |
// release to lower layer | |
int delta = (int) (e.timeoutTs - lower.curSlotTs); | |
lower.store(lower.off + delta / lower.tick, e); | |
} | |
elements.clear(); | |
} | |
// for layer0, current tick events also should be flushed | |
var layer0 = layers[0]; | |
layer0.flush(layer0.off, layer0.off + 1, timeoutQueue); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package vproxybase.util.time.timewheel; | |
import java.util.LinkedList; | |
import java.util.List; | |
public class TimeWheelLayer<T> { | |
int off = 0; // current offset | |
long curSlotTs; // timestamp for current slot | |
public final int tick; // time interval between buckets | |
public final int layer; // layer of this wheel, starting from 0 with the most little tick | |
public final int cap; // how many elements this layer can hold | |
public final List<TimeElemImpl<T>>[] slots; | |
public TimeWheelLayer(int tick, int layer, int cap) { | |
this.tick = tick; | |
this.layer = layer; | |
this.cap = cap; | |
//noinspection unchecked | |
slots = new List[cap]; | |
for (int i = 0; i < cap; ++i) { | |
slots[i] = new LinkedList<>(); | |
} | |
} | |
public void store(int index, TimeElemImpl<T> elem) { | |
slots[index].add(elem); | |
} | |
public void flush(int from, int to, TimeoutQueue<T> timeoutQueue) { | |
for (int i = from; i < cap && i < to; ++i) { | |
timeoutQueue.addAll(slots[i]); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment