Версией конфигурации или просто версией будем называть число реконфигураций работающей системы с момента начала работы, изначально это ноль. Каждый узел будет хранить последнюю известную ему версию конфигурации, и версия конфигурации будет использоваться в токене.
Узлы будут хранить:
- множество уникальных узлов в кольце, упорядоченное по MAC-адресу;
- отображение взаимнооднозначное соответствие MAC-адресов и IP-адресов;
- список цифр числа π, начиная с "3";
- последнюю известную узлу версию конфигурации, начальное значение -- 0;
- состояние -- одно из
WORKINGиCONFIG, начальное значение --CONFIG.
Узел должен параллельно ждать служебных сообщений по UDP и входящее соединение по TCP.
Длина такта времени
TICK = 3 seconds
Порт для UDP Broadcast
BROADCAST_PORT = 1234
Порт для TCP
TCP_PORT = 4321
Число повторений broadcast при реконфигурации
BROADCASTS_COUNT = 5
Передаются как UDP broadcast на порт BROADCAST_PORT (в псевдокоде это просто broadcast). Принимаются прослушиванием этого порта в параллельном потоке.
bytes: [0x02,
macAddr0, ..., macAddr5,
version >>> 24, ..., version >>> 0]
Это сообщение узел будет передавать чтобы начать реконфигурацию или чтобы сообщить о себе в ходе реконфигурации. Это нужну узлу в следующих случаях:
- у него произошёл тайм-аут при ожидании сообщения от предыдущего узла в кольце;
- к нему попытался присоединиться для передачи токена узел, соединение от которого не ожидалось (не предыдущий в кольце или неизвестный);
- к нему попытался присоединиться для передачи токена ожидаемый узел, но он передал неправильную версию;
- при получении такого же сообщения с меньшей версией -- чтобы заявить, что актуальная версия уже больше;
- если после очередной реконфигурации узел не обнаружил соседей;
- в ходе реконфигурации (
reconfigure);
Если узел получает сообщение Reconfigure(macAddr, version) с большей версией, чем актуальная для него, то он принимает новую версию и переходит в состояние реконфигурации
receiveReconfigure() {
forever do {
message = receive service message from broadcast
if (message.version > this.version)
reconfigure(message.version, message.macAddr)
else if (message.version < this.version || message.version == this.version && state != CONFIG)
reconfigure(this.version + 1, this.macAddr)
}
}
В состоянии реконфигурации узел рассылает сообщения со своим MAC-адресом и версией и параллельно принимает такие сообщения от других узлов. Если вдруг узлу приходит сообщение Reconfigure с неправильной версией, то он считает эту попытку реконфигурации неудачной и начинает новую (рекурсивный вызов reconfigure).
reconfigure(newVersion, initializerMacAddr) {
state = CONFIG
this.version = newVersion
neighbours = { this.macAddr, initializerMacAddr }
thread1: for (TICK) time do {
BROADCAST_COUNT times do {
send broadcast Reconfigure(this.macAddr, version)
}
}
isBadConfig = false
thread2: for (TICK) time do {
message = receive service message from broadcast
if (message.version == this.version)
neighbours U= message.macAddr
else
isBadConfig = true
stop thread1
return
}
join thread1, thread2
isBadConfig |= neighbours.size <= 1
if (isBadConfig)
reconfigure(version + 1)
else
state = WORKING
updateNextAndPrev()
if (neighbours.first == this.macAddr)
for (TICK) time do wait()
gotToken()
}
wait() нужно, чтобы случайно не попытаться подключиться к узлу, который в данный момент всё ещё находится в состоянии CONFIG.
initReconfigure(newVersion) {
while (this.version < newVersion) do
send broadcast Reconfigure(this.macAddr, newVersion)
}
Этот метод позволяет узлу начать реконфигурацию, будет использоваться ниже.
Каждый узел на основе neighbours знает, кому он должен передавать токен и от кого должен его получать. Вот так:
updateNextAndPrev() {
neighbours.remove(this.macAddr);
nextInRing = neighbours.ceiling(this.macAddr) ?: neighbours.first()
prevInRing = neighbours.floor(this.macAddr) ?: neighbours.last()
neighbours.add(this.macAddr)
}
bytes: [0x03,
macAddr0, ..., macAddr5,
version >>> 24, ..., version >>> 0,
nDigits >>> 24, ..., nDigits >>> 0,
3, ...]
^ цифры числа π, каждая отдельным байтом,
всего nDigits штук
Отвечать на токен не будем, гарантий TCP хватит.
Узел ждёт соединение в течение TICK, если не дождался, значит, всё плохо, и нужно начать реконфигурацию. Если дождался, вычитывает токен, проверяет его и обновляет свою версию числа π, после чего пытается передавать токен.
Если кто-то пытается соединиться с узлом, пока тот находится в состоянии CONFIG, то, наверное, он не знает, что сейчас идёт реконфигурация, поэтому, на всякий случай, нужно ему сообщить, начав новую реконфигурацию.
receveToken() {
forever do {
wait for (hasToken == false)
connection = accept connection with timeout (TICK)
if (timeout)
initReconfigure(version + 1)
else
token = read token from connection
close connection
if (state == CONFIG || token.version != this.version || token.macAddr != prevInRing)
initReconfigure(max(token.version, this.version) + 1)
else
update π from token
calculate 20 more digits of π
gotToken()
on error
initReconfigure(version + 1)
}
}
Когда узел получает токен, он пытается установить соединение со следующим в кольце в течение TICK, если не получилось, паникует, иначе просто передаёт токен.
hasToken = false
gotToken() { hasToken = true }
sendToken() {
forever do {
wait for (hasToken == true)
connection = connect to nextInRing with timeout (TICK)
if (timeout)
initReconfigure(version + 1)
else with connection
write new token to connection
close connection
on error
initReconfigure(version + 1)
hasToken = false
}
}
run() {
thread1: receiveReconfigure()
thread2: receiveToken()
thread3: sendToken()
initReconfigure(version + 1)
}