Распределённая блокировка — очень удобный инструмент в кластере для того, чтобы обеспечивать эксклюзивный доступ к некоторому общему ресурсу. Другими словами, цель такой блокировки — обеспечить доступ к ресурсу лишь одному сервису/запросу в данный момент времени. Таким образом предотвращается гонка за данными и неконсистентность данных. Распределённая (или кластерная) блокировка называется потому, что она обеспечивается несколькими узлами и выход из строя одного из них не повлияет на приложение.
Tarantool 3 — это новая версия in-memory базы данных, цель которой быть более простой и понятной. 3я полностью совместим с файлами и репликацией 2ой версии.
Перечислю основные характеристики Tarantool 3 по сравнению со 2ой версией:
- Cервер с конфигом
- Ориентация на кластерность
- Удобная система триггеров
- Переопределение сетевого API (IPROTO)
- Упрощение настройки репликации
- Имена узлов вместо UUID
- Расширенная статистика по потреблению памяти
- Значения полей по умолчанию
tarantool — база данных tt - утилита командной строки для:
- создания и упаковки tarantool 3 приложений
- запуска и управления локальных кластеров
Блокировки будут храниться в таблице (спейсе).
name | token | expire |
---|
Взятой блокировкой будет считаться наличие строки в таблице c ненулевым expire. Отпущенной блокировкой будет считаться или отсутствие строки в таблице, или нулевой expire.
Для надежного хранения состояния блокировки настроим синхронную репликацию. Это делается при создании таблицы (спейса). Для операций с блокировками будем использовать транзакции в режиме linearizable.
Для автоматического переключения лидера, настроим raft фейловер.
Ниже хранимая процедура на Lua для взятия блокировки. Процедура принимает параметры: имя и таймаут.
Процедура проверяет существует ли блокировка, если нет создаёт новую. Если блокировка существовала,
то проверяет отпущена ли она (expire == 0
). Если отпущена, то наращивает token
и устанавливает
время expire
. Если блокировка уже была взята, процедура возвращает nil
.
function _G.acquireLock(name, timeout)
box.begin({txn_isolation="linearizable", timeout=timeout})
local lock = box.space.locks:get(name)
if lock == nil then
lock = {name, 0, clock.time64() + timeout * 1e9}
box.space.locks:insert(lock)
box.commit()
return lock
end
if lock['expire'] == 0 then
lock = {name, lock['token'] + 1, clock.time64() + timeout * 1e9}
log.info(lock)
box.space.locks:put(lock)
box.commit()
return lock
end
box.commit()
return nil
end
Ниже хранимая процедура на Lua для отпускания блокировки. Процедура принимает параметры: имя и токен.
Процедура проверяет, что токен блокировки совпадает и expire
не равен нулю. Тогда блокировка отпускается
и процедура возвращает true
. Иначе процедура возвращает false
.
function _G.releaseLock(name, token)
box.begin({txn_isolation="linearizable"})
local lock = box.space.locks:get(name)
if lock == nil then
box.commit()
return false
end
if lock['token'] == token and lock['expire'] ~= 0 then
local lock = {name, token, 0}
box.space.locks:put(lock)
box.commit()
return true
end
box.commit()
return false
end
Для создания таблицы (спейса) с блокировками используется фоновая процедура. Фоновая процедура дожидается, что узел станет лидером. Затем создает таблицу с необходимыми полями и затем запускает цикл для обработки таймаутов блокировок.
fiber.create(function()
fiber.name("expire-lock-fiber")
box.ctl.wait_rw()
box.schema.space.create("locks", {
is_sync=true,
if_not_exists=true})
box.space.locks:format({{name="name", type="string"},
{name='token', type='unsigned'},
{name='expire', type='unsigned'}})
box.space.locks:create_index('name', {
parts={{field="name", type="string"}},
if_not_exists=true})
box.space.locks:create_index('expire', {
parts={{field="expire", type="unsigned"}},
unique=false,
if_not_exists=true})
box.schema.user.grant('guest', 'super', nil, nil, {if_not_exists=true})
while true do
box.ctl.wait_rw()
local now = clock.time64()
for _, t in box.space.locks.index.expire:pairs({0}, {iterator="GT"}) do
if t[3] < now then
local rc, err = pcall(box.space.locks.update, box.space.locks, {t["name"]}, {{"=", "expire", 0}})
if not rc then
log.info(err)
break
end
end
end
fiber.sleep(1)
end
end)
Сначала инициализируем рабочее окружение для разработки
tt init
Создадим директорию будущего приложения. Приложения располагаются в instances.enabled
mkdir instances.enabled/app
Создадим файл конфигурации будущего локального кластера. Этот файл будет содержать топологию и настройки узлов кластера.
touch instances.enabled/app/config.yml
В конфигурации укажем, что мы хотим кластер из 3х узлов. Укажем с каким параметрами фейловера должен работать кластер. Укажем также о включении mvcc режима работы. Также в этом файле мы укажем файл с хранимыми процедурами.
credentials:
users:
client:
password: "secret"
replication:
failover: election
database:
use_mvcc_engine: true
app:
file: init.lua
groups:
group-001:
replicasets:
replicaset-001:
replication:
bootstrap_strategy: config
bootstrap_leader: instance-001
instances:
instance-001:
iproto:
listen:
- uri: 127.0.0.1:3301
instance-002:
iproto:
listen:
- uri: 127.0.0.1:3302
instance-003:
iproto:
listen:
- uri: 127.0.0.1:3303
Создадим файл управляющий запуском локальных узлов кластера.
touch instances.enabled/app/instances.yaml
Укажем три узла для запуска
instance-001:
instance-002:
instance-003:
Создадим файл init.lua
touch instances.enabled/app/init.lua
Полный листинг приложения
local log = require('log')
local fiber = require('fiber')
local clock = require('clock')
log.info("starting application")
function _G.acquireLock(name, timeout)
box.begin({txn_isolation="linearizable", timeout=timeout})
local lock = box.space.locks:get(name)
if lock == nil then
lock = {name, 0, clock.time64() + timeout * 1e9}
box.space.locks:insert(lock)
box.commit()
return lock
end
if lock['expire'] == 0 then
lock = {name, lock['token'] + 1, clock.time64() + timeout * 1e9}
box.space.locks:put(lock)
box.commit()
return lock
end
box.commit()
return nil
end
function _G.releaseLock(name, token)
box.begin({txn_isolation="linearizable"})
local lock = box.space.locks:get(name)
if lock == nil then
box.commit()
return false
end
if lock['token'] == token and lock['expire'] ~= 0 then
local lock = {name, token, 0}
box.space.locks:put(lock)
box.commit()
return true
end
box.commit()
return false
end
fiber.create(function()
fiber.name("expire-lock-fiber")
box.ctl.wait_rw()
box.schema.space.create("locks", {
is_sync=true,
if_not_exists=true})
box.space.locks:format({{name="name", type="string"},
{name='token', type='unsigned'},
{name='expire', type='unsigned'}})
box.space.locks:create_index('name', {
parts={{field="name", type="string"}},
if_not_exists=true})
box.space.locks:create_index('expire', {
parts={{field="expire", type="unsigned"}},
unique=false,
if_not_exists=true})
box.schema.user.grant('guest', 'super', nil, nil, {if_not_exists=true})
while true do
box.ctl.wait_rw()
local now = clock.time64()
for _, t in box.space.locks.index.expire:pairs({0}, {iterator="GT"}) do
if t[3] < now then
local rc, err = pcall(box.space.locks.update, box.space.locks, {t["name"]}, {{"=", "expire", 0}})
if not rc then
log.info(err)
break
end
end
end
fiber.sleep(1)
end
end)
Для запуска кластера из узлов Tarantool 3 воспользуемся командой tt
tt start
Для проверки статуса узлов выполним команду:
tt status
Для проверки того, что кластер собрался, необходимо подключится по очереди на узлы и проверить состояние репликации.
tt connect app:instance-001
> box.info.replication
Пример хорошего вывода статуса репликации
---
- 1:
id: 1
uuid: 09652d7e-6b1d-4304-aad0-6ae058c847c8
lsn: 16
upstream:
status: follow
idle: 0.92459100019187
peer: 127.0.0.1:3301
lag: 7.9154968261719e-05
name: instance-001
downstream:
status: follow
idle: 0.91285500023514
vclock: {1: 16, 2: 5186, 3: 5013}
lag: 0
2:
id: 2
uuid: 8e62d6ea-badf-490d-9153-a342d0e48fd8
lsn: 5186
name: instance-002
3:
id: 3
uuid: 7f40d300-e9e6-4916-adaa-b669b672256b
lsn: 5013
upstream:
status: follow
idle: 0.91279700025916
peer: 127.0.0.1:3303
lag: 5.7220458984375e-05
name: instance-003
downstream:
status: follow
idle: 0.91363300010562
vclock: {1: 16, 2: 5186, 3: 5013}
lag: 0
...
Tarantool общается с приложениями с помощью msgpack формата. msgpack формат иногда допускает изменения размера целочисленного типа. Чтобы привести всё к uint64 сделаем утилитарную функцию.
func toUint64(v any) uint64 {
switch t := v.(type) {
case int, int8, int16, int32, int64:
return uint64(reflect.ValueOf(t).Int())
case uint, uint8, uint16, uint32, uint64:
return reflect.ValueOf(t).Uint()
default:
panic("type error")
}
}
Для общения с кластером Tarantool воспользуемся готовым пулом подключений. Такой пул умеет автоматически вычислять, какой узел является лидером.
instances := []pool.Instance{
{
Name: "instance-001",
Dialer: tarantool.NetDialer{
Address: "127.0.0.1:3301",
},
},
{
Name: "instance-002",
Dialer: tarantool.NetDialer{
Address: "127.0.0.1:3302",
},
},
{
Name: "instance-003",
Dialer: tarantool.NetDialer{
Address: "127.0.0.1:3303",
},
},
}
p, err := pool.Connect(context.Background(), instances)
if err != nil {
panic(err)
}
Создадим структуру для хранения блокировки
type Lock struct {
name string
token uint64
expire uint64
}
Создадим функция для взятия блокировки. Функция через пул подключений обращается к кластеру и вызывает хранимую
процедуру acquireLock
. Функция принимает в качестве параметров: контекст для выполнения запроса, пул соединений
с кластером, имя блокировки, и таймаут блокировки. Функция в случае успеха вернет объект блокировки, иначе функция
вернет ошибку.
func acquireLock(ctx context.Context, p pool.Pooler, name string, timeout uint64) (*Lock, error) {
resp, err := p.Do(tarantool.NewCallRequest("acquireLock").Context(ctx).Args([]any{name, timeout}), pool.RW).Get()
if err != nil {
return nil, err
}
if len(resp) == 0 {
return nil, fmt.Errorf("no response")
}
if resp[0] == nil {
return nil, fmt.Errorf("failed")
}
data := resp[0].([]any)
result := Lock{
name: name,
token: toUint64(data[1]),
expire: toUint64(data[2]),
}
return &result, nil
}
Создадим функция для отпускания блокировки. Функция принимает контекст, пул соединений к кластеру, объект с блокирокой.
Функция через пул подключений обращается к кластеру и вызывает хранимую процедуру acquireLock
.
Функция вернет true
, если блокировка успешно отпущена. Функция вернет false
, в случае тех или иных ошибок.
func releaseLock(ctx context.Context, p pool.Pooler, l *Lock) (bool, error) {
resp, err := p.Do(tarantool.NewCallRequest("releaseLock").Context(ctx).Args([]any{l.name, l.token}), pool.RW).Get()
if err != nil {
return false, err
}
if len(resp) == 0 {
return false, fmt.Errorf("no response")
}
return resp[0].(bool), nil
}
Ниже код как таким объектом блокировки пользоваться:
instances := []pool.Instance{
{
Name: "instance-001",
Dialer: tarantool.NetDialer{
Address: "127.0.0.1:3301",
},
},
{
Name: "instance-002",
Dialer: tarantool.NetDialer{
Address: "127.0.0.1:3302",
},
},
{
Name: "instance-003",
Dialer: tarantool.NetDialer{
Address: "127.0.0.1:3303",
},
},
}
p, err := pool.Connect(context.Background(), instances)
if err != nil {
panic(err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
var l *Lock
for i := 0; i < 3; i++ {
l, err = acquireLock(ctx, p, name, 10)
if err != nil {
time.Sleep(100 * time.Millisecond)
continue
}
break
}
if l == nil {
fmt.Println(name, "already locked")
return
}
defer func() {
ok, _ := releaseLock(ctx, p, l)
if ok {
fmt.Println(name, "success unlock")
} else {
fmt.Println(name, "lock expired")
}
}()
fmt.Println(name, "success lock")
Весь код main.go
package main
import (
"context"
"fmt"
"reflect"
"sync"
"time"
"github.com/tarantool/go-tarantool/v2"
_ "github.com/tarantool/go-tarantool/v2/datetime"
_ "github.com/tarantool/go-tarantool/v2/decimal"
"github.com/tarantool/go-tarantool/v2/pool"
_ "github.com/tarantool/go-tarantool/v2/uuid"
"github.com/tjarratt/babble"
)
func toUint64(v any) uint64 {
switch t := v.(type) {
case int, int8, int16, int32, int64:
return uint64(reflect.ValueOf(t).Int()) // a has type int64
case uint, uint8, uint16, uint32, uint64:
return reflect.ValueOf(t).Uint() // a has type uint64
default:
panic("type error")
}
}
type Lock struct {
name string
token uint64
expire uint64
}
func acquireLock(ctx context.Context, p pool.Pooler, name string, timeout uint64) (*Lock, error) {
resp, err := p.Do(tarantool.NewCallRequest("acquireLock").Context(ctx).Args([]any{name, timeout}), pool.RW).Get()
if err != nil {
return nil, err
}
if len(resp) == 0 {
return nil, fmt.Errorf("no response")
}
if resp[0] == nil {
return nil, fmt.Errorf("failed")
}
data := resp[0].([]any)
result := Lock{
name: name,
token: toUint64(data[1]),
expire: toUint64(data[2]),
}
return &result, nil
}
func releaseLock(ctx context.Context, p pool.Pooler, l *Lock) (bool, error) {
resp, err := p.Do(tarantool.NewCallRequest("releaseLock").Context(ctx).Args([]any{l.name, l.token}), pool.RW).Get()
if err != nil {
return false, err
}
if len(resp) == 0 {
return false, fmt.Errorf("no response")
}
return resp[0].(bool), nil
}
func main() {
instances := []pool.Instance{
{
Name: "instance-001",
Dialer: tarantool.NetDialer{
Address: "127.0.0.1:3301",
},
},
{
Name: "instance-002",
Dialer: tarantool.NetDialer{
Address: "127.0.0.1:3302",
},
},
{
Name: "instance-003",
Dialer: tarantool.NetDialer{
Address: "127.0.0.1:3303",
},
},
}
p, err := pool.Connect(context.Background(), instances)
if err != nil {
panic(err)
}
babbler := babble.NewBabbler()
babbler.Count = 1
wg := sync.WaitGroup{}
for i := 0; i < 1000; i++ {
name := babbler.Babble()
wg.Add(1)
go func(name string) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
var l *Lock
for i := 0; i < 3; i++ {
l, err = acquireLock(ctx, p, name, 10)
if err != nil {
time.Sleep(100 * time.Millisecond)
continue
}
break
}
if l == nil {
fmt.Println(name, "already locked")
return
}
defer func() {
ok, _ := releaseLock(ctx, p, l)
if ok {
fmt.Println(name, "success unlock")
} else {
fmt.Println(name, "lock expired")
}
}()
fmt.Println(name, "success lock")
time.Sleep(50 * time.Millisecond)
}(name)
}
wg.Wait()
}
За около 100 строк мы сделали на Tarantool 3 приложения для управления кластерными блокировками. Такое приложение может состоять из одного или нескольких узлов. Для регулирования количества узлов достаточно только редактирования yaml файла. В случае если один из узлов Tarantool упадёт, то сработает механизм автоматического выбора лидера, и приложение восстановит свою работоспособность.