Created
December 28, 2021 14:12
-
-
Save unclechu/03b15a7c822f3e3560c8cb8ed3c2f508 to your computer and use it in GitHub Desktop.
Haskell script that reproduces race condition in PostgreSQL database
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env nix-shell | |
#! nix-shell --pure -i runhaskell | |
#! nix-shell -E "with import (fetchTarball{url=\"https://github.com/NixOS/nixpkgs/archive/d887ac7aee92e8fc54dde9060d60d927afae9d69.tar.gz\";sha256=\"1bpgfv45b1yvrgpwdgc4fm4a6sav198yd41bsrvlmm3jn2wi6qx5\";}) {}; let hs=haskellPackages.ghcWithPackages(p:[p.postgresql-simple p.async p.turtle p.text p.process p.time]); in mkShell{buildInputs=[postgresql hs cacert];}" | |
{- | |
Haskell script that reproduces race condition in PostgreSQL database. | |
This is not considered a bug but rather an expected behavior, | |
just something you should be aware of. | |
This test demonstrates that if you have two transactions and data reads in both | |
for the same entity, and you are going to use that read data for a following | |
update you get a race condition thus you get some data lost or database state | |
inconsistency. This implies that you read data into the client state (but still | |
being inside a transaction for both read and update). Updates are blocking but | |
selects are not. | |
This test should demonstrate the importance of ensuring that the operation is | |
atomic on the database level, and not on client level. | |
So you should write a single SQL query in the way it is atomic. | |
You should also avoid having any local client-side state that is not meant for | |
read-only operations. Any update operation that uses data from client-side is | |
a candidate for a race condition. | |
To skip all the boilerplate you need to look at only these functions: | |
* runTransactionA | |
* runTransactionB | |
* runStateTest | |
P.S. The Nix pin above is from release-21.11 branch. | |
-} | |
{-# LANGUAGE UnicodeSyntax #-} | |
{-# LANGUAGE OverloadedStrings #-} | |
{-# LANGUAGE QuasiQuotes #-} | |
{-# LANGUAGE LambdaCase #-} | |
{-# LANGUAGE ScopedTypeVariables #-} | |
{-# LANGUAGE NoMonomorphismRestriction #-} | |
import Prelude hiding (FilePath, log) | |
import Data.Text (unpack) | |
import Data.Bifunctor (first) | |
import Data.Function (fix) | |
import Data.Time.Clock (getCurrentTime, diffUTCTime) | |
import Control.Concurrent.Async hiding (wait) | |
import Control.Concurrent.MVar | |
import Control.Exception (SomeException, catch) | |
import Database.PostgreSQL.Simple | |
import Database.PostgreSQL.Simple.SqlQQ | |
import System.Process | |
import Turtle hiding (proc, procs, inproc, echo, err, printf, eprintf, stdout, stderr, view) | |
import qualified Turtle | |
main ∷ IO () | |
main = do | |
currentDir ← pwd | |
pgdata ← | |
let | |
key = "PGDATA" | |
defaultValue = currentDir <> ".pgdata" | |
in | |
need key >>= | |
maybe | |
(defaultValue <$ export key (format fp defaultValue)) | |
(pure . fromText) | |
export "PGHOST" (format fp pgdata) | |
pgdatabase ← | |
let | |
key = "PGDATABASE" | |
defaultValue = "test-race-condition" | |
in | |
need key >>= maybe (defaultValue <$ export key defaultValue) pure | |
loggerBus ∷ LoggerBus ← newEmptyMVar | |
let | |
runWithDbServer m = | |
withAsync (runLogger loggerBus) $ \loggerThread → | |
withAsync (runPostgreSqlServer loggerBus pgdata) $ \dbThread → | |
withAsync (sleep 1.0 >> m) $ \mThread → do | |
waitEitherCancel dbThread mThread | |
sendToLoggerBus loggerBus Cancel | |
wait loggerThread | |
runTheMainTest = do | |
addInitialData loggerBus pgdatabase | |
runTransactions | |
runStateTest loggerBus | |
runTransactions = void . runConcurrently $ (,) | |
<$> Concurrently (runTransactionA loggerBus) | |
<*> Concurrently (runTransactionB loggerBus) | |
runPsql cmd args = do | |
echoToLoggerBus loggerBus "Entering psql REPL..." | |
createProcess (proc (unpack cmd) (unpack <$> args)) >>= \case | |
(_, _, _, procHandle) → void $ waitForProcess procHandle | |
arguments >>= \case | |
[] → runWithDbServer runTheMainTest | |
x@"psql" : xs → runWithDbServer $ runPsql x xs | |
_ → fail "Incorrect arguments" | |
runPostgreSqlServer ∷ LoggerBus → FilePath → IO () | |
runPostgreSqlServer loggerBus pgdata = do | |
echo' "Running PostgreSQL server..." | |
pgdataExists ← testdir pgdata | |
unless pgdataExists $ do | |
echo' $ format ("Creating database directory "%w%"...") (format fp pgdata) | |
stdout' $ inproc' "pg_ctl" ["initdb", "-D", format fp pgdata] empty | |
echo' "Running the PostgreSQL daemon..." | |
stdout' $ inproc' "postgres" ["-F", "-h", mempty, "-k", format fp pgdata] empty | |
where | |
echo' = echoToLoggerBus loggerBus | |
inproc' = loggedInproc loggerBus | |
stdout' = sh . (echo' . lineToText =<<) | |
addInitialData ∷ LoggerBus → Text → IO () | |
addInitialData loggerBus pgdatabase = do | |
echo' "Create initial data (if not exists)..." | |
echo' $ format ("Checking if "%w%" database already exists...") pgdatabase | |
alreadyHasDb ← | |
let | |
reducer acc x = acc || case cut "|" (lineToText x) of | |
[] → False | |
x : _ → x == pgdatabase | |
in | |
empty | |
& inproc' "psql" ["-ltqA"] | |
& grep (begins (chars <* "|")) | |
& reduce (Fold reducer False id) | |
unless alreadyHasDb $ do | |
echo' $ format ("Creating "%w%" database...") pgdatabase | |
stdout' $ inproc' "createdb" ["--", pgdatabase] empty | |
echo' "Connecting to the database for creating initial data..." | |
conn ← connectPostgreSQL "" | |
echo' "(Re-)creating the test table and counters..." | |
withTransaction conn $ | |
void $ execute_ conn [sql| | |
drop table if exists test_counter; | |
create table test_counter | |
( counter_id integer primary key | |
, counter integer not null default 0 | |
); | |
insert into test_counter (counter_id) values (1); | |
|] | |
where | |
echo' = echoToLoggerBus loggerBus | |
inproc' = loggedInproc loggerBus | |
stdout' = sh . (echo' . lineToText =<<) | |
runTransactionA ∷ LoggerBus → IO () | |
runTransactionA loggerBus = do | |
echo' "Running transaction A..." | |
echo' "Transaction A: Connecting to the database..." | |
conn ← connectPostgreSQL "" | |
echo' "Transaction A: Making a transaction..." | |
withTransaction conn $ do | |
[Only (x ∷ Int)] ← | |
query_ conn "select counter from test_counter where counter_id = 1" | |
sleep 1.0 | |
void $ execute conn [sql| | |
update test_counter set counter = ? where counter_id = 1 | |
|] (Only (succ x)) | |
where | |
echo' = echoToLoggerBus loggerBus | |
runTransactionB ∷ LoggerBus → IO () | |
runTransactionB loggerBus = do | |
echo' "Running transaction B..." | |
echo' "Transaction B: Connecting to the database..." | |
conn ← connectPostgreSQL "" | |
echo' "Transaction B: Making a transaction..." | |
withTransaction conn $ do | |
sleep 0.5 | |
[Only (x ∷ Int)] ← | |
query_ conn "select counter from test_counter where counter_id = 1" | |
sleep 2.0 | |
void $ execute conn [sql| | |
update test_counter set counter = ? where counter_id = 1 | |
|] (Only (succ x)) | |
where | |
echo' = echoToLoggerBus loggerBus | |
runStateTest ∷ LoggerBus → IO () | |
runStateTest loggerBus = do | |
echo' "Running state test..." | |
echo' "Connecting to the database..." | |
conn ← connectPostgreSQL "" | |
echo' "Selecting the records from the database…" | |
result ← query_ conn "select counter_id, counter from test_counter" | |
-- The result is 1, there *IS* race condition | |
echo' $ format | |
( "Result is "%w%" " | |
% "(counter id#1 should have value 2 if everything is fine " | |
% "or 1 in case of race condition)" | |
) | |
(first (("id#" <>) . show) <$> (result ∷ [(Int, Int)])) | |
where | |
echo' = echoToLoggerBus loggerBus | |
-- * Helpers | |
echoToLoggerBus ∷ MonadIO m ⇒ LoggerBus → Text → m () | |
echoToLoggerBus loggerBus = sendToLoggerBus loggerBus . Info | |
errToLoggerBus ∷ MonadIO m ⇒ LoggerBus → Text → m () | |
errToLoggerBus loggerBus = sendToLoggerBus loggerBus . Error | |
sendToLoggerBus ∷ MonadIO m ⇒ LoggerBus → LoggerMsg → m () | |
sendToLoggerBus loggerBus = liftIO . void . async . putMVar loggerBus | |
loggedInproc ∷ LoggerBus → Text → [Text] → Shell Line → Shell Line | |
loggedInproc loggerBus cmd args input = | |
inprocWithErr cmd args input >>= | |
either ((>> mzero) . errToLoggerBus loggerBus . lineToText) pure | |
type LoggerBus = MVar LoggerMsg | |
data LoggerMsg | |
= Info Text | |
| Error Text | |
| Cancel | |
| Done | |
runLogger ∷ MonadIO m ⇒ LoggerBus → m () | |
runLogger loggerBus = | |
liftIO . (False &) . fix $ \again isStopping → | |
let | |
withTimeout = fmap (either id id) . race (sleep timeout >> timeoutLog) | |
doneLog = resolveLogWrite $ Left "Logger reached its end, exiting..." | |
timeoutLog = resolveLogWrite $ Left "Logger stopping timeout, exiting..." | |
f = (if isStopping then withTimeout else id) $ do | |
takeMVar loggerBus >>= \case | |
Info x → resolveLogWrite (Right x) >> again isStopping | |
Error x → resolveLogWrite (Left x) >> again isStopping | |
Done → doneLog | |
Cancel → do | |
errToLoggerBus loggerBus | |
"Finalizing the logger after receiving Cancel command..." | |
sendToLoggerBus loggerBus Done | |
again True | |
in | |
f `catch` \(e ∷ SomeException) → | |
if isStopping | |
then do | |
resolveLogWrite . Left $ | |
format | |
("Secondary exception "%w%" for logger, exiting immediately...") | |
e | |
else do | |
errToLoggerBus loggerBus $ | |
format | |
("Finalizing the logger after catching this exception: "%w%"...") | |
e | |
sendToLoggerBus loggerBus Done | |
again True | |
where | |
resolveLogWrite = | |
either | |
(Turtle.eprintf ("[ERROR] "%s%"\n")) | |
(Turtle.printf ("[INFO] "%s%"\n")) | |
timeout = 5.0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment