Skip to content

Instantly share code, notes, and snippets.

@filonenko-mikhail
Created August 12, 2024 12:25
Show Gist options
  • Save filonenko-mikhail/04a60c946e512ada98da123a06a93310 to your computer and use it in GitHub Desktop.
Save filonenko-mikhail/04a60c946e512ada98da123a06a93310 to your computer and use it in GitHub Desktop.

Распределённые блокировки с помощью Tarantool 3

Введение

Распределённая блокировка

Распределённая блокировка — очень удобный инструмент в кластере для того, чтобы обеспечивать эксклюзивный доступ к некоторому общему ресурсу. Другими словами, цель такой блокировки — обеспечить доступ к ресурсу лишь одному сервису/запросу в данный момент времени. Таким образом предотвращается гонка за данными и неконсистентность данных. Распределённая (или кластерная) блокировка называется потому, что она обеспечивается несколькими узлами и выход из строя одного из них не повлияет на приложение.

Tarantool 3

Tarantool 3 — это новая версия in-memory базы данных, цель которой быть более простой и понятной. 3я полностью совместим с файлами и репликацией 2ой версии.

Перечислю основные характеристики Tarantool 3 по сравнению со 2ой версией:

  • Cервер с конфигом
  • Ориентация на кластерность
  • Удобная система триггеров
  • Переопределение сетевого API (IPROTO)
  • Упрощение настройки репликации
  • Имена узлов вместо UUID
  • Расширенная статистика по потреблению памяти
  • Значения полей по умолчанию

Подробнее

Инструменты

tarantool — база данных tt - утилита командной строки для:

  • создания и упаковки tarantool 3 приложений
  • запуска и управления локальных кластеров

Принцип работы

Блокировки будут храниться в таблице (спейсе).

name token expire

Взятой блокировкой будет считаться наличие строки в таблице c ненулевым expire. Отпущенной блокировкой будет считаться или отсутствие строки в таблице, или нулевой expire.

Для надежного хранения состояния блокировки настроим синхронную репликацию. Это делается при создании таблицы (спейса). Для операций с блокировками будем использовать транзакции в режиме linearizable.

Для автоматического переключения лидера, настроим raft фейловер.

Ниже хранимая процедура на Lua для взятия блокировки. Процедура принимает параметры: имя и таймаут. Процедура проверяет существует ли блокировка, если нет создаёт новую. Если блокировка существовала, то проверяет отпущена ли она (expire == 0). Если отпущена, то наращивает token и устанавливает время expire. Если блокировка уже была взята, процедура возвращает nil.

function _G.acquireLock(name, timeout)
    box.begin({txn_isolation="linearizable", timeout=timeout})
    local lock = box.space.locks:get(name)
    if lock == nil then
        lock = {name, 0, clock.time64() + timeout * 1e9}
        box.space.locks:insert(lock)
        box.commit()
        return lock
    end

    if lock['expire'] == 0 then
        lock = {name, lock['token'] + 1, clock.time64() + timeout * 1e9}
        log.info(lock)
        box.space.locks:put(lock)
        box.commit()
        return lock
    end

    box.commit()
    return nil
end

Ниже хранимая процедура на Lua для отпускания блокировки. Процедура принимает параметры: имя и токен. Процедура проверяет, что токен блокировки совпадает и expire не равен нулю. Тогда блокировка отпускается и процедура возвращает true. Иначе процедура возвращает false.

function _G.releaseLock(name, token)
    box.begin({txn_isolation="linearizable"})
    local lock = box.space.locks:get(name)
    if lock == nil then
        box.commit()
        return false
    end

    if lock['token'] == token and lock['expire'] ~= 0 then
        local lock = {name, token, 0}
        box.space.locks:put(lock)
        box.commit()
        return true
    end

    box.commit()
    return false
end

Для создания таблицы (спейса) с блокировками используется фоновая процедура. Фоновая процедура дожидается, что узел станет лидером. Затем создает таблицу с необходимыми полями и затем запускает цикл для обработки таймаутов блокировок.

fiber.create(function()
    fiber.name("expire-lock-fiber")
    box.ctl.wait_rw()

    box.schema.space.create("locks", {
        is_sync=true,
        if_not_exists=true})
    box.space.locks:format({{name="name", type="string"},
        {name='token', type='unsigned'},
        {name='expire', type='unsigned'}})

    box.space.locks:create_index('name', {
        parts={{field="name", type="string"}},
        if_not_exists=true})
    box.space.locks:create_index('expire', {
        parts={{field="expire", type="unsigned"}},
        unique=false,
        if_not_exists=true})

    box.schema.user.grant('guest', 'super', nil, nil, {if_not_exists=true})

    while true do
        box.ctl.wait_rw()
        local now = clock.time64()
        for _, t in box.space.locks.index.expire:pairs({0}, {iterator="GT"}) do
            if t[3] < now then
                local rc, err = pcall(box.space.locks.update, box.space.locks, {t["name"]}, {{"=", "expire", 0}})
                if not rc then
                    log.info(err)
                    break
                end
            end
        end
        fiber.sleep(1)
    end
end)

Инициализация приложения

Сначала инициализируем рабочее окружение для разработки

tt init

Создадим директорию будущего приложения. Приложения располагаются в instances.enabled

mkdir instances.enabled/app

Создадим файл конфигурации будущего локального кластера. Этот файл будет содержать топологию и настройки узлов кластера.

touch instances.enabled/app/config.yml

В конфигурации укажем, что мы хотим кластер из 3х узлов. Укажем с каким параметрами фейловера должен работать кластер. Укажем также о включении mvcc режима работы. Также в этом файле мы укажем файл с хранимыми процедурами.

credentials:
  users:
    client:
      password: "secret"

replication:
  failover: election

database:
  use_mvcc_engine: true

app:
  file: init.lua

groups:
  group-001:
    replicasets:
      replicaset-001:
        replication:
          bootstrap_strategy: config
        bootstrap_leader: instance-001
        instances:
          instance-001:
            iproto:
              listen:
                - uri: 127.0.0.1:3301
          instance-002:
            iproto:
              listen:
                - uri: 127.0.0.1:3302
          instance-003:
            iproto:
              listen:
                - uri: 127.0.0.1:3303

Создадим файл управляющий запуском локальных узлов кластера.

touch instances.enabled/app/instances.yaml

Укажем три узла для запуска

instance-001:
instance-002:
instance-003:

Приложение на Lua

Создадим файл init.lua

touch instances.enabled/app/init.lua

Полный листинг приложения

local log = require('log')
local fiber = require('fiber')
local clock = require('clock')

log.info("starting application")

function _G.acquireLock(name, timeout)
    box.begin({txn_isolation="linearizable", timeout=timeout})
    local lock = box.space.locks:get(name)
    if lock == nil then
        lock = {name, 0, clock.time64() + timeout * 1e9}
        box.space.locks:insert(lock)
        box.commit()
        return lock
    end

    if lock['expire'] == 0 then
        lock = {name, lock['token'] + 1, clock.time64() + timeout * 1e9}
        box.space.locks:put(lock)
        box.commit()
        return lock
    end

    box.commit()
    return nil
end

function _G.releaseLock(name, token)
    box.begin({txn_isolation="linearizable"})
    local lock = box.space.locks:get(name)
    if lock == nil then
        box.commit()
        return false
    end

    if lock['token'] == token and lock['expire'] ~= 0 then
        local lock = {name, token, 0}
        box.space.locks:put(lock)
        box.commit()
        return true
    end

    box.commit()
    return false
end

fiber.create(function()
    fiber.name("expire-lock-fiber")
    box.ctl.wait_rw()

    box.schema.space.create("locks", {
        is_sync=true,
        if_not_exists=true})
    box.space.locks:format({{name="name", type="string"},
        {name='token', type='unsigned'},
        {name='expire', type='unsigned'}})

    box.space.locks:create_index('name', {
        parts={{field="name", type="string"}},
        if_not_exists=true})
    box.space.locks:create_index('expire', {
        parts={{field="expire", type="unsigned"}},
        unique=false,
        if_not_exists=true})

    box.schema.user.grant('guest', 'super', nil, nil, {if_not_exists=true})

    while true do
        box.ctl.wait_rw()
        local now = clock.time64()
        for _, t in box.space.locks.index.expire:pairs({0}, {iterator="GT"}) do
            if t[3] < now then
                local rc, err = pcall(box.space.locks.update, box.space.locks, {t["name"]}, {{"=", "expire", 0}})
                if not rc then
                    log.info(err)
                    break
                end
            end
        end
        fiber.sleep(1)
    end
end)

Запуск локального кластера

Для запуска кластера из узлов Tarantool 3 воспользуемся командой tt

tt start

Для проверки статуса узлов выполним команду:

tt status

Для проверки того, что кластер собрался, необходимо подключится по очереди на узлы и проверить состояние репликации.

tt connect app:instance-001
> box.info.replication

Пример хорошего вывода статуса репликации

---
- 1:
    id: 1
    uuid: 09652d7e-6b1d-4304-aad0-6ae058c847c8
    lsn: 16
    upstream:
      status: follow
      idle: 0.92459100019187
      peer: 127.0.0.1:3301
      lag: 7.9154968261719e-05
    name: instance-001
    downstream:
      status: follow
      idle: 0.91285500023514
      vclock: {1: 16, 2: 5186, 3: 5013}
      lag: 0
  2:
    id: 2
    uuid: 8e62d6ea-badf-490d-9153-a342d0e48fd8
    lsn: 5186
    name: instance-002
  3:
    id: 3
    uuid: 7f40d300-e9e6-4916-adaa-b669b672256b
    lsn: 5013
    upstream:
      status: follow
      idle: 0.91279700025916
      peer: 127.0.0.1:3303
      lag: 5.7220458984375e-05
    name: instance-003
    downstream:
      status: follow
      idle: 0.91363300010562
      vclock: {1: 16, 2: 5186, 3: 5013}
      lag: 0
...

Пример использования кластерых блокировок на Golang

Tarantool общается с приложениями с помощью msgpack формата. msgpack формат иногда допускает изменения размера целочисленного типа. Чтобы привести всё к uint64 сделаем утилитарную функцию.

func toUint64(v any) uint64 {
	switch t := v.(type) {
	case int, int8, int16, int32, int64:
		return uint64(reflect.ValueOf(t).Int())
	case uint, uint8, uint16, uint32, uint64:
		return reflect.ValueOf(t).Uint()
	default:
		panic("type error")
	}
}

Для общения с кластером Tarantool воспользуемся готовым пулом подключений. Такой пул умеет автоматически вычислять, какой узел является лидером.

instances := []pool.Instance{
	{
		Name: "instance-001",
		Dialer: tarantool.NetDialer{
			Address: "127.0.0.1:3301",
		},
	},
	{
		Name: "instance-002",
		Dialer: tarantool.NetDialer{
			Address: "127.0.0.1:3302",
		},
	},
	{
		Name: "instance-003",
		Dialer: tarantool.NetDialer{
			Address: "127.0.0.1:3303",
		},
	},
}
p, err := pool.Connect(context.Background(), instances)
if err != nil {
	panic(err)
}

Создадим структуру для хранения блокировки

type Lock struct {
	name   string
	token  uint64
	expire uint64
}

Создадим функция для взятия блокировки. Функция через пул подключений обращается к кластеру и вызывает хранимую процедуру acquireLock. Функция принимает в качестве параметров: контекст для выполнения запроса, пул соединений с кластером, имя блокировки, и таймаут блокировки. Функция в случае успеха вернет объект блокировки, иначе функция вернет ошибку.

func acquireLock(ctx context.Context, p pool.Pooler, name string, timeout uint64) (*Lock, error) {
	resp, err := p.Do(tarantool.NewCallRequest("acquireLock").Context(ctx).Args([]any{name, timeout}), pool.RW).Get()
	if err != nil {
		return nil, err
	}

	if len(resp) == 0 {
		return nil, fmt.Errorf("no response")
	}
	if resp[0] == nil {
		return nil, fmt.Errorf("failed")
	}
	data := resp[0].([]any)

	result := Lock{
		name:   name,
		token:  toUint64(data[1]),
		expire: toUint64(data[2]),
	}
	return &result, nil
}

Создадим функция для отпускания блокировки. Функция принимает контекст, пул соединений к кластеру, объект с блокирокой. Функция через пул подключений обращается к кластеру и вызывает хранимую процедуру acquireLock. Функция вернет true, если блокировка успешно отпущена. Функция вернет false, в случае тех или иных ошибок.

func releaseLock(ctx context.Context, p pool.Pooler, l *Lock) (bool, error) {
	resp, err := p.Do(tarantool.NewCallRequest("releaseLock").Context(ctx).Args([]any{l.name, l.token}), pool.RW).Get()
	if err != nil {
		return false, err
	}
	if len(resp) == 0 {
		return false, fmt.Errorf("no response")
	}
	return resp[0].(bool), nil
}

Ниже код как таким объектом блокировки пользоваться:

instances := []pool.Instance{
		{
			Name: "instance-001",
			Dialer: tarantool.NetDialer{
				Address: "127.0.0.1:3301",
			},
		},
		{
			Name: "instance-002",
			Dialer: tarantool.NetDialer{
				Address: "127.0.0.1:3302",
			},
		},
		{
			Name: "instance-003",
			Dialer: tarantool.NetDialer{
				Address: "127.0.0.1:3303",
			},
		},
	}
p, err := pool.Connect(context.Background(), instances)
if err != nil {
	panic(err)
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

var l *Lock
for i := 0; i < 3; i++ {
	l, err = acquireLock(ctx, p, name, 10)
	if err != nil {
		time.Sleep(100 * time.Millisecond)
		continue
	}
	break
}
if l == nil {
	fmt.Println(name, "already locked")
	return
}
defer func() {
	ok, _ := releaseLock(ctx, p, l)
	if ok {
		fmt.Println(name, "success unlock")
	} else {
		fmt.Println(name, "lock expired")
	}
}()
fmt.Println(name, "success lock")

Весь код main.go

package main

import (
	"context"
	"fmt"
	"reflect"
	"sync"
	"time"

	"github.com/tarantool/go-tarantool/v2"
	_ "github.com/tarantool/go-tarantool/v2/datetime"
	_ "github.com/tarantool/go-tarantool/v2/decimal"
	"github.com/tarantool/go-tarantool/v2/pool"
	_ "github.com/tarantool/go-tarantool/v2/uuid"
	"github.com/tjarratt/babble"
)

func toUint64(v any) uint64 {
	switch t := v.(type) {
	case int, int8, int16, int32, int64:
		return uint64(reflect.ValueOf(t).Int()) // a has type int64
	case uint, uint8, uint16, uint32, uint64:
		return reflect.ValueOf(t).Uint() // a has type uint64
	default:
		panic("type error")
	}
}

type Lock struct {
	name   string
	token  uint64
	expire uint64
}

func acquireLock(ctx context.Context, p pool.Pooler, name string, timeout uint64) (*Lock, error) {
	resp, err := p.Do(tarantool.NewCallRequest("acquireLock").Context(ctx).Args([]any{name, timeout}), pool.RW).Get()
	if err != nil {
		return nil, err
	}

	if len(resp) == 0 {
		return nil, fmt.Errorf("no response")
	}
	if resp[0] == nil {
		return nil, fmt.Errorf("failed")
	}
	data := resp[0].([]any)

	result := Lock{
		name:   name,
		token:  toUint64(data[1]),
		expire: toUint64(data[2]),
	}
	return &result, nil
}

func releaseLock(ctx context.Context, p pool.Pooler, l *Lock) (bool, error) {
	resp, err := p.Do(tarantool.NewCallRequest("releaseLock").Context(ctx).Args([]any{l.name, l.token}), pool.RW).Get()
	if err != nil {
		return false, err
	}
	if len(resp) == 0 {
		return false, fmt.Errorf("no response")
	}
	return resp[0].(bool), nil
}

func main() {
	instances := []pool.Instance{
		{
			Name: "instance-001",
			Dialer: tarantool.NetDialer{
				Address: "127.0.0.1:3301",
			},
		},
		{
			Name: "instance-002",
			Dialer: tarantool.NetDialer{
				Address: "127.0.0.1:3302",
			},
		},
		{
			Name: "instance-003",
			Dialer: tarantool.NetDialer{
				Address: "127.0.0.1:3303",
			},
		},
	}
	p, err := pool.Connect(context.Background(), instances)
	if err != nil {
		panic(err)
	}

	babbler := babble.NewBabbler()
	babbler.Count = 1

	wg := sync.WaitGroup{}
	for i := 0; i < 1000; i++ {
		name := babbler.Babble()
		wg.Add(1)
		go func(name string) {
			defer wg.Done()
			ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
			defer cancel()

			var l *Lock
			for i := 0; i < 3; i++ {
				l, err = acquireLock(ctx, p, name, 10)
				if err != nil {
					time.Sleep(100 * time.Millisecond)
					continue
				}
				break
			}
			if l == nil {
				fmt.Println(name, "already locked")
				return
			}
			defer func() {
				ok, _ := releaseLock(ctx, p, l)
				if ok {
					fmt.Println(name, "success unlock")
				} else {
					fmt.Println(name, "lock expired")
				}
			}()
			fmt.Println(name, "success lock")
			time.Sleep(50 * time.Millisecond)

		}(name)
	}
	wg.Wait()
}

Итоги

За около 100 строк мы сделали на Tarantool 3 приложения для управления кластерными блокировками. Такое приложение может состоять из одного или нескольких узлов. Для регулирования количества узлов достаточно только редактирования yaml файла. В случае если один из узлов Tarantool упадёт, то сработает механизм автоматического выбора лидера, и приложение восстановит свою работоспособность.

Ссылки

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment