Skip to content

Instantly share code, notes, and snippets.

@tyler-smith
Last active March 27, 2025 17:51
Show Gist options
  • Save tyler-smith/10b29ca4fa3c6a6e1eb9ebcffd77e228 to your computer and use it in GitHub Desktop.
Save tyler-smith/10b29ca4fa3c6a6e1eb9ebcffd77e228 to your computer and use it in GitHub Desktop.

Lock-Free Read Conflict Handling

Core Ideas

  1. Read Handle Registry

    • Non-blocking read operation tracking
    • Atomic handle validation
    • Explicit handle cleanup
    • Simple error propagation
  2. Block Number Validation

    • Fast validity checks
    • No state juggling
    • Clear error reporting
    • Zero-copy operations

Technical Design

Data Structures

  1. Read Handle

    type ReadHandle struct {
        blockNum   uint64
        handleID   uint64
        valid     atomic.Bool
    }
  2. Database

    type DB struct {
        nextHandleID   atomic.Uint64
        activeHandles  sync.Map[uint64, *ReadHandle]
        //...
    }

Core Operations

  1. Read Operations

    func (db *DB) AcquireReadHandle(blockNum uint64) *ReadHandle {
        handle := &ReadHandle{
            blockNum: blockNum,
            handleID: db.nextHandleID.Add(1),
            valid: atomic.Bool{},
        }
        handle.valid.Store(true)
        db.activeHandles.Store(handle.handleID, handle)
        return handle
    }
    
    func (h *ReadHandle) Release() {
        db.activeHandles.Delete(h.handleID)
    }
    
    func (h *ReadHandle) IsValid() bool {
        return h.valid.Load()
    }
    
    func (db *DB) Read(handle *ReadHandle) (Data, error) {
        if !handle.IsValid() {
            return nil, ErrInvalidHandle
        }
        return data, nil
    }
  2. Rewind Operation

    func (db *DB) Rewind(target eth.BlockID) error {
        db.activeHandles.Range(func(_, value interface{}) bool {
            handle := value.(*ReadHandle)
            if handle.blockNum > target.Number {
                handle.valid.Store(false)
            }
            return true
        })
    
        return db.doRewind(target)
    }

Errors

  1. Error Cases

    • Invalid read handle
    • Read after rewind
    • Handle already released
  2. Recovery

    • Acquire new handle
    • Retry at correct block
    • Clear error messages
func TestInteropReadHandleRewind(gt *testing.T) {
t := helpers.NewDefaultTesting(gt)
is := dsl.SetupInterop(t)
actors := is.CreateActors()
actors.PrepareChainState(t)
// Create initial blocks on both chains
actors.ChainA.Sequencer.ActL2EmptyBlock(t)
actors.ChainB.Sequencer.ActL2EmptyBlock(t)
// Setup users and contracts
aliceA := setupUser(t, is, actors.ChainA, 0)
aliceB := setupUser(t, is, actors.ChainB, 0)
// Deploy an emitter contract on chain B
auth := newL2TxOpts(t, aliceB.secret, actors.ChainB)
emitContractAddr, deployTx, _, err := emit.DeployEmit(auth, actors.ChainB.SequencerEngine.EthClient())
require.NoError(t, err)
includeTxOnChainBasic(t, actors.ChainB, deployTx, aliceB.address)
// Emit a message from chain B that will be referenced
emitTx := newEmitMessageTx(t, actors.ChainB, aliceB, emitContractAddr, []byte("hello from B"))
includeTxOnChainBasic(t, actors.ChainB, emitTx, aliceB.address)
// Submit chain B's data and sync
actors.ChainB.Batcher.ActSubmitAll(t)
actors.L1Miner.ActL1StartBlock(12)(t)
actors.L1Miner.ActL1IncludeTx(actors.ChainB.BatcherAddr)(t)
actors.L1Miner.ActL1EndBlock(t)
// Sync the supervisor and process
actors.Supervisor.SignalLatestL1(t)
actors.ChainB.Sequencer.ActL2PipelineFull(t)
actors.ChainB.Sequencer.SyncSupervisor(t)
actors.Supervisor.ProcessFull(t)
// Get the block number we'll be reading from
chainBStatus := actors.ChainB.Sequencer.SyncStatus()
readBlockNum := chainBStatus.SafeL2.Number
// Create a block on chain A that references the message from B
execTx := newExecuteMessageTx(t, actors.ChainA, aliceA, actors.ChainB, emitTx)
includeTxOnChainBasic(t, actors.ChainA, execTx, aliceA.address)
// Submit chain A's data and sync
actors.ChainA.Batcher.ActSubmitAll(t)
actors.L1Miner.ActL1StartBlock(12)(t)
actors.L1Miner.ActL1IncludeTx(actors.ChainA.BatcherAddr)(t)
actors.L1Miner.ActL1EndBlock(t)
// Sync everything
actors.Supervisor.SignalLatestL1(t)
actors.ChainA.Sequencer.ActL2PipelineFull(t)
actors.ChainA.Sequencer.SyncSupervisor(t)
actors.ChainB.Sequencer.ActL2PipelineFull(t)
actors.ChainB.Sequencer.SyncSupervisor(t)
actors.Supervisor.ProcessFull(t)
// Create a conflicting message on chain B that will invalidate the block we're reading from
fakeMessage := []byte("this message conflicts")
id := stypes.Identifier{
Origin: aliceB.address,
BlockNumber: readBlockNum, // Specifically target the block we're reading from
LogIndex: 0,
Timestamp: chainBStatus.SafeL2.Time,
ChainID: eth.ChainIDFromBig(actors.ChainA.RollupCfg.L2ChainID),
}
msgHash := crypto.Keccak256Hash(fakeMessage)
conflictTx := newExecuteMessageTxFromIDAndHash(t, aliceB, actors.ChainB, id, msgHash)
// Include the conflicting transaction
actors.ChainB.Sequencer.ActL2StartBlock(t)
_, err = actors.ChainB.SequencerEngine.EngineApi.IncludeTx(conflictTx, aliceB.address)
require.NoError(t, err)
actors.ChainB.Sequencer.ActL2EndBlock(t)
// Submit the conflicting block
actors.ChainB.Batcher.ActSubmitAll(t)
actors.L1Miner.ActL1StartBlock(12)(t)
actors.L1Miner.ActL1IncludeTx(actors.ChainB.BatcherAddr)(t)
actors.L1Miner.ActL1EndBlock(t)
// Sync and process, this should trigger the rewind
actors.Supervisor.SignalLatestL1(t)
actors.ChainB.Sequencer.ActL2PipelineFull(t)
actors.ChainB.Sequencer.SyncSupervisor(t)
actors.Supervisor.ProcessFull(t)
// Verify the chain was rewound
status := actors.ChainB.Sequencer.SyncStatus()
require.Less(t, status.SafeL2.Number, chainBStatus.SafeL2.Number, "chain should have rewound")
// Now try to read from the block that was rewound
// This should fail with an invalid read handle error
// TODO: Replace with actual read handle API call once implemented
readResult, err := actors.Supervisor.ReadFromBlock(t.Ctx(), actors.ChainB.ChainID, readBlockNum)
require.Error(t, err)
require.ErrorIs(t, err, ErrInvalidReadHandle)
require.Nil(t, readResult)
// Verify that chain A's execution is now invalid and needs to be reprocessed
actors.ChainA.Sequencer.ActL2PipelineFull(t)
statusA := actors.ChainA.Sequencer.SyncStatus()
require.Less(t, statusA.SafeL2.Number, uint64(2), "chain A's execution should be invalidated")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment