Last active
March 28, 2024 00:20
-
-
Save boatbomber/7c0cd41461bd682e7f4fa0ebca36ae21 to your computer and use it in GitHub Desktop.
This is a module for handling data that can be read from/written to from multiple servers at a time. It is made only for commutative updates. This is so that your operations can be applied locally and globally at different times and still end up at the same value eventually. Uses MemoryStore for atomic locking.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--[[ | |
GlobalStorage | |
by boatbomber (c) 2021 | |
This is a module for handling data that can be read from/written to | |
from multiple servers at a time. It is made only for commutative updates. | |
This is so that your operations can be applied locally and globally at different | |
times and still end up at the same value eventually. Uses MemoryStore for atomic locking. | |
Make sure your transform function is deterministic and has no side effects, | |
as it will be run twice- once locally and once globally. If it absolutely must, | |
then it can take in a second arguement, a boolean "isGlobal" that is true when being | |
run globally and false during the local run. | |
Examples: | |
local PlayerStore = GlobalStorage.new("User_1234", 1234) | |
local DEFAULT_COINS = 100 | |
-- Can be used to safely +/- number stores | |
local coins = PlayerStore:Get("Coins", DEFAULT_COINS) | |
PlayerStore:Update("Coins", function(oldValue) | |
return (oldValue or DEFAULT_COINS) + 5 | |
end) | |
coins = PlayerStore:Get("Coins", DEFAULT_COINS) | |
-- Can be used to safely add/remove unique dict keys | |
local notifications = PlayerStore:Get("Notifications", {}) | |
PlayerStore:Update("Notifications", function(oldValue) | |
return (oldValue or {})[GUID] = newNotif | |
end) | |
notifications = PlayerStore:Get("Notifications", {}) | |
--]] | |
local DEBUG = false | |
local function dprint(...) | |
if DEBUG then | |
print(...) | |
end | |
end | |
local DataStoreService = game:GetService("DataStoreService") | |
local MemoryStoreService = game:GetService("MemoryStoreService") | |
local MessagingService = game:GetService("MessagingService") | |
local GlobalStorage = { | |
_cache = {}, | |
} | |
function GlobalStorage.new(name: string, associatedUserId: number?) | |
local debugInfo = string.format("GlobalStorage '%s'", name) | |
-- Get existing store object if possible | |
if GlobalStorage._cache[name] then | |
dprint(debugInfo, "got existing store") | |
local cachedStore = GlobalStorage._cache[name] | |
cachedStore._references += 1 | |
return cachedStore | |
end | |
-- Create new store object | |
dprint(debugInfo, "created new store") | |
local Store = { | |
_dsStore = DataStoreService:GetDataStore(name), | |
_msMap = MemoryStoreService:GetSortedMap(name), | |
_associatedUserId = associatedUserId, | |
_references = 1, | |
_cache = {}, | |
_msgId = "BS_" .. name, | |
_updateQueue = {}, | |
_events = {}, | |
_destroyed = false, | |
_flushesInProgress = {}, | |
} | |
function Store:_flushUpdateQueue() | |
if next(self._updateQueue) == nil then | |
dprint(debugInfo, "update queue empty, cancelling flush") | |
return | |
end | |
dprint(debugInfo, "flushing update queue") | |
for key, transformers in pairs(self._updateQueue) do | |
if self._flushesInProgress[key] then | |
dprint(debugInfo, "flush already in progress for key", key) | |
continue | |
end | |
if #transformers < 1 then | |
dprint(debugInfo, "no transformers for key", key) | |
self._updateQueue[key] = nil | |
continue | |
end | |
self._flushesInProgress[key] = true | |
-- DataStore UpdateAsync can conflict with other servers if called at the exact same time | |
-- and race so whichever finishes last will overwrite the previous. | |
-- MemoryStore UpdateAsync solves this by retrying if two are called at once, so we use | |
-- that as a locking mechanism to avoid two DataStore updates overwriting. If two try to grab | |
-- while unlocked, MemoryStore will force one of them to retry later. | |
local unlocked, lockWaitTime = false, 0 | |
while unlocked == false do | |
local success, message = pcall(function() | |
dprint(debugInfo, "attempting to retrieve lock for", key) | |
self._msMap:UpdateAsync(key, function(lockOwner) | |
if lockOwner ~= nil then | |
dprint("Lock already taken by " .. lockOwner) | |
return nil -- Someone else has this key rn, we must wait | |
end | |
unlocked = true | |
-- Since other servers trying to take it will be returning | |
-- different JobId, memorystore will know its a conflict | |
-- and force the others to retry | |
return game.JobId | |
end, 15) | |
end) | |
if not success then | |
warn(message) | |
end | |
if unlocked == false then | |
lockWaitTime += task.wait() | |
if lockWaitTime > 60 then | |
warn( | |
"Update flush for " | |
.. key | |
.. " expired after 60 seconds while waiting for lock to be available." | |
) | |
self._flushesInProgress[key] = nil | |
return | |
end | |
dprint(debugInfo, "waiting for lock for", key, "for", lockWaitTime, "seconds so far") | |
end | |
end | |
dprint(debugInfo, "received lock for", key) | |
-- Update the global value | |
dprint(debugInfo, "updating global value for", key) | |
self._dsStore:UpdateAsync(key, function(storedValue) | |
local value = storedValue | |
for i, transformer in ipairs(transformers) do | |
local success, newValue = pcall(transformer, value, true) | |
if not success then | |
warn(newValue) | |
dprint(debugInfo, "cancelled transformer", i, "on", key) | |
continue -- skip this one, transform errored | |
end | |
if newValue == nil then | |
dprint(debugInfo, "skipped transformer", i, "on", key) | |
continue -- skip this one, transform exited | |
end | |
dprint(debugInfo, "applied transformer", i, "on", key) | |
value = newValue | |
end | |
table.clear(transformers) | |
self._cache[key] = value | |
-- Inform other servers they need to refresh | |
task.defer(function() | |
local publishSuccess, publishResult = pcall(function() | |
dprint(debugInfo, "informing other servers of changes to", key) | |
MessagingService:PublishAsync(self._msgId, { | |
JobId = game.JobId, | |
Key = key, | |
}) | |
end) | |
if not publishSuccess then | |
warn(publishResult) | |
end | |
end) | |
return value | |
end) | |
-- Unlock this key for the next server to take | |
dprint(debugInfo, "unlocking", key) | |
self._flushesInProgress[key] = nil | |
pcall(self._msMap.RemoveAsync, self._msMap, key) | |
end | |
end | |
function Store:GetKeyChangedSignal(key: string) | |
local event = self._events[key] | |
if not event then | |
event = Instance.new("BindableEvent") | |
self._events[key] = event | |
end | |
return event.Event | |
end | |
function Store:Get(key: string, default: any?, skipCache: boolean?) | |
if not skipCache and self._cache[key] ~= nil then | |
dprint(debugInfo, "getting local value of", key) | |
return self._cache[key] or default | |
end | |
dprint(debugInfo, "getting global value of", key) | |
local value = self._dsStore:GetAsync(key) | |
if value == nil then | |
value = default | |
end | |
self._cache[key] = value | |
return value | |
end | |
function Store:Update(key: string, transformer: (any?, boolean?) -> any?) | |
-- Queue it up for updating on the latest real value & replication | |
dprint(debugInfo, "queuing global transformer for", key) | |
if self._updateQueue[key] == nil then | |
self._updateQueue[key] = { transformer } | |
else | |
table.insert(self._updateQueue[key], transformer) | |
end | |
-- First, perform it locally | |
dprint(debugInfo, "applying local transformer for", key) | |
local success, newValue = pcall(transformer, self._cache[key], false) | |
if not success then | |
warn(newValue) | |
return -- cancel, transform errored | |
end | |
if newValue == nil then | |
return -- cancel, transform exited | |
end | |
self._cache[key] = newValue | |
local event = self._events[key] | |
if event then | |
event:Fire(newValue) | |
end | |
end | |
function Store:Destroy() | |
if self._destroyed then | |
dprint(debugInfo, "is already destroyed") | |
return | |
end | |
self._references -= 1 | |
if self._references > 0 then | |
dprint(debugInfo, "removing a reference to the store, there are now", self._references, "references") | |
return | |
end | |
dprint(debugInfo, "destroying store!") | |
self._destroyed = true | |
if self._msgConnection ~= nil then | |
self._msgConnection:Disconnect() | |
end | |
for _, event in pairs(self._events) do | |
event:Destroy() | |
end | |
GlobalStorage._cache[name] = nil | |
self:_flushUpdateQueue() | |
while next(self._flushesInProgress) do | |
dprint(debugInfo, "Waiting for flushes to finish before wiping") | |
task.wait() | |
end | |
table.clear(self) | |
self._destroyed = true | |
end | |
task.spawn(function() | |
-- Subscribe to store's msg for cross-server updates | |
local subscribeSuccess, subscribeConnection = pcall(function() | |
return MessagingService:SubscribeAsync(Store._msgId, function(message) | |
if game.JobId == message.Data.JobId then | |
return | |
end | |
if (Store._destroyed) or (Store._cache == nil) then | |
error(debugInfo .. " received update from another server after being destroyed") | |
return | |
end | |
local key = message.Data.Key | |
dprint(debugInfo, "received update to", key, "from another server") | |
local newValue = Store:Get(key, Store._cache[key], true) | |
local event = Store._events[key] | |
if event then | |
event:Fire(newValue) | |
end | |
end) | |
end) | |
if subscribeSuccess then | |
if Store._destroyed then | |
dprint(debugInfo, "destroyed during subscribe, disconnecting subscription") | |
subscribeConnection:Disconnect() | |
else | |
Store._msgConnection = subscribeConnection | |
end | |
else | |
warn(subscribeConnection) | |
end | |
-- Start update queue flush thread | |
while not Store._destroyed do | |
local jitter = math.random(0, 100) / 100 -- Reduce server conflicts? | |
task.wait(6 + jitter) | |
-- Check if destroyed during wait | |
if Store._destroyed then break end | |
dprint(debugInfo, "periodically flushing update queue") | |
Store:_flushUpdateQueue() | |
end | |
end) | |
-- Cache the store object for future GetStore sharing | |
GlobalStorage._cache[name] = Store | |
return Store | |
end | |
game:BindToClose(function() | |
for _, Store in pairs(GlobalStorage._cache) do | |
task.spawn(Store._flushUpdateQueue, Store) | |
end | |
end) | |
game.Players.PlayerRemoving:Connect(function(Player) | |
for name, Store in pairs(GlobalStorage._cache) do | |
if Store._associatedUserId == Player.UserId then | |
dprint(string.format("Destroying %s store for %s on leave", name, Player.Name)) | |
Store:Destroy() | |
end | |
end | |
end) | |
return GlobalStorage |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment