Skip to content

Instantly share code, notes, and snippets.

@fmorency
Last active October 8, 2025 15:25
Show Gist options
  • Save fmorency/cba842034e448ee8bbdeb4c912da41e3 to your computer and use it in GitHub Desktop.
Save fmorency/cba842034e448ee8bbdeb4c912da41e3 to your computer and use it in GitHub Desktop.
Quint escrow module
/*
Module escrow models a simple leasing escrow system with discrete block
settlement. Users deposit funds into per-user escrow accounts, open leases that
incur a fixed per-block charge, and optionally close them. At each block (via
tick/end_block), open leases generate charges:

  - If a user has sufficient escrow, charges move to the provider balance.
  - If charges overdraw escrow, the user’s escrow is zeroed and all their still-open leases become Overdrawn.

State tracked: user balances (off-escrow), escrow balances, provider balance,
  block height, incremental lease ids, per-user lease sets, and per-lease status
  (Open, Closed, Overdrawn).
*/
module escrow {
  pure val USER_ADDRESSES = Set("alice", "bob")
  pure val INITIAL_BALANCE = 5 // Initial balance for each user
  pure val MIN_LEASE_BLOCKS = 2 // Minimum number of blocks for a lease. Hardcoded for simplicity
  pure val RATE_PER_BLOCK = 1 // Rate charged per block for leasing. Hardcoded for simplicity
  pure val K = 2 // Maximum number of steps between ticks

  type Addr = str
  type LeaseStatus = Open | Closed | Overdrawn
  type LeaseId = int

  // State variables
  var user_balances: Addr -> int                  // Keep track of user balances
  var escrow_balances: Addr -> int                // Keep track of escrow balances
  var h: int                                      // Current block height
  var next_id: LeaseId                            // Incremental lease id
  var user_leases: Addr -> Set[LeaseId]           // Map user to their leases
  var user_leases_status: LeaseId -> LeaseStatus  // Map lease id to its status
  var provider_balance: int                       // Balance of the provider
  var lease_heights: LeaseId -> int               // Map lease id to the block height it was opened at
  var did_tick: bool                              // Liveliness variable to ensure tick action is taken
  var steps_since_tick: int                       // Prevent long sequences without a tick

  action nonTickFrame = all { steps_since_tick' = steps_since_tick + 1 }

  // Helper functions
  // Get all open leases for a user
  def getOpenLeases(u: Addr): Set[LeaseId] =
    user_leases.get(u).filter(id => user_leases_status.get(id) == Open)

  // Get the per-block charge for a user based on number of open leases
  def getPerBlockCharge(u: Addr): int = getOpenLeases(u).size() * RATE_PER_BLOCK

  // Initialize state
  action init = all {
    user_balances' = USER_ADDRESSES.mapBy(_ => INITIAL_BALANCE),
    escrow_balances' = USER_ADDRESSES.mapBy(_ => 0),
    provider_balance' = 0,
    h' = 1,
    next_id' = 1,
    user_leases' = USER_ADDRESSES.mapBy(_ => Set()),
    user_leases_status' = Map(),
    lease_heights' = Map(),
    did_tick' = false,
    steps_since_tick' = 0,
  }

  // Deposit action: move amount from user balance to escrow balance
  action deposit(user, amount) = all {
    user_balances.get(user) >= amount,
    amount >= MIN_LEASE_BLOCKS * RATE_PER_BLOCK,
    user_balances' = user_balances.setBy(user, curr => curr - amount),
    escrow_balances' = escrow_balances.setBy(user, curr => curr + amount),
    provider_balance' = provider_balance,
    h' = h,
    user_leases' = user_leases,
    user_leases_status' = user_leases_status,
    next_id' = next_id,
    lease_heights' = lease_heights,
    did_tick' = false,
    nonTickFrame,
  }

  // Open Lease action: create a new lease if user has sufficient escrow balance
  action open_lease(user) =  {
    val open_count = getOpenLeases(user).size()
    all {
      escrow_balances.get(user) >= (open_count + 1) * RATE_PER_BLOCK * MIN_LEASE_BLOCKS, // Check if we have sufficient escrow
      user_leases' = user_leases.setBy(user, b => b.union(Set(next_id))),                // Associate the new lease and rate with the user
      user_leases_status' = user_leases_status.put(next_id, Open),                       // Set lease status to Open
      next_id' = next_id + 1,                                                            // Increment lease_id for next lease
      lease_heights' = lease_heights.put(next_id, h),                                    // Record the block height at which the lease was opened
      // Unchanged
      user_balances' = user_balances,
      escrow_balances' = escrow_balances,
      provider_balance' = provider_balance,
      h' = h,
      did_tick' = false,
      nonTickFrame,
    }
  }

  // Tick action: increment block height
  action tick = all {
    end_block,
    did_tick' = true,
    steps_since_tick' = 0,
  }

  // Close Lease action: close an existing lease if it's open
  action close_lease(user, lease_id) = {
    all {
      user_leases.get(user).contains(lease_id),                               // Check if lease_id is associated with user
      user_leases_status.get(lease_id) == Open,                               // Check if lease is Open
      lease_heights.get(lease_id) + MIN_LEASE_BLOCKS <= h,                    // Check if lease has been open for at least MIN_LEASE_BLOCKS
      user_leases_status' = user_leases_status.setBy(lease_id, _ => Closed),  // Set lease status to Closed
      // Unchanged
      user_balances' = user_balances,
      escrow_balances' = escrow_balances,
      user_leases' = user_leases,
      next_id' = next_id,
      provider_balance' = provider_balance,
      lease_heights' = lease_heights,
      h' = h,
      did_tick' = false,
      nonTickFrame,
    }
  }

  pure val AMOUNTS = Set(
    MIN_LEASE_BLOCKS * RATE_PER_BLOCK,                     // just enough
    MIN_LEASE_BLOCKS * RATE_PER_BLOCK + 1,                 // slightly above
    INITIAL_BALANCE                                        // max
  )

  action step = {
    nondet isTick = Set(true, false).oneOf()
    if (isTick or steps_since_tick >= K) {
      tick
    } else {
      nondet user = USER_ADDRESSES.oneOf()
      nondet amount = AMOUNTS.oneOf()
      val ids = user_leases.get(user)
      if (ids.size() == 0) tick else {
        nondet lease = ids.oneOf()
        any {
            deposit(user, amount),
            open_lease(user),
            close_lease(user, lease)
          }
      }
    }
  }

  // End Block action: settle leases, balances and increment block height
  action end_block() = {
    val total_charges_per_user = USER_ADDRESSES.mapBy(user => getPerBlockCharge(user)) // Total charges per user based on number of open leases
    val new_escrow_balance = USER_ADDRESSES.mapBy(user => escrow_balances.get(user) - total_charges_per_user.get(user)) // New escrow balance after deducting charges. Can be negative
    val users_with_negative_escrow = USER_ADDRESSES.filter(user => new_escrow_balance.get(user) < 0) // Users who have negative escrow balance after charges
    val user_with_non_negative_escrow = USER_ADDRESSES.exclude(users_with_negative_escrow) // Users who have non-negative escrow balance after charges
    // All leases associated with users who have negative escrow balance and are still Open become Overdrawn
    val overdrawn_lease_ids =
      users_with_negative_escrow.fold(Set(), (acc, user) =>
        acc.union(user_leases.get(user).filter(id => user_leases_status.get(id) == Open))
      )
    val new_status = overdrawn_lease_ids.fold(user_leases_status, (m, lease) => m.set(lease, Overdrawn)) // Set status of all leases associated with users who have negative escrow balance to Overdrawn
    val new_provider_balance = user_with_non_negative_escrow.fold(provider_balance, (b, user) => b + total_charges_per_user.get(user)) // Total charges collected from users with non-negative escrow balance
    val new_non_negative_escrow = users_with_negative_escrow.fold(new_escrow_balance, (m, user) => m.set(user, 0)) // Set escrow balance of users with negative escrow to 0
    val residue_to_provider = users_with_negative_escrow.fold(0, (s, user) => s + escrow_balances.get(user)) // All remaining escrow from users with negative escrow goes to provider

    // For every lease_id, if it was not Open before, it should not be Open after this step
    val no_reopen_end_block =
      user_leases_status.keys().forall(id =>
        (user_leases_status.get(id) != Open) implies (new_status.get(id) != Open)
      )

    all {
        h' = h + 1,
        user_balances' = user_balances,
        user_leases' = user_leases,
        user_leases_status' = new_status,
        no_reopen_end_block,
        next_id' = next_id,
        escrow_balances' = new_non_negative_escrow,
        provider_balance' = new_provider_balance + residue_to_provider,
        lease_heights' = lease_heights,
    }
  }

  // User and escrow balances should never be negative
  val no_negatives = USER_ADDRESSES.forall(addr => user_balances.get(addr) >= 0) and
    USER_ADDRESSES.forall(addr => escrow_balances.get(addr) >= 0)

  // All coins should be in the system
  val total_initial = USER_ADDRESSES.size() * INITIAL_BALANCE
  val total_current = provider_balance +
    USER_ADDRESSES.fold(0, (s, u) => s + user_balances.get(u) + escrow_balances.get(u))
  val coins_preserved = total_initial == total_current

  // Lease ids should be unique and monotonically increasing
  val next_id_monotone =
    user_leases_status.keys().forall(id => id < next_id)

  // All leases should be associated with a user
  val all_leases_tracked = user_leases_status.keys().size() == next_id - 1

  // Leases are not shared across users
  val lease_id_partition =
    USER_ADDRESSES.forall(u1 =>
      USER_ADDRESSES.forall(u2 =>
        (u1 == u2) or user_leases.get(u1).intersect(user_leases.get(u2)) == Set()
      )
    )

  // All leases associated with a user should have a status
  val every_lease_has_status =
    USER_ADDRESSES.forall(u =>
      user_leases.get(u).forall(id => user_leases_status.keys().contains(id))
    )

  // Every lease that has a status should have a recorded height
  val heights_match_status = lease_heights.keys() == user_leases_status.keys()

  // No starvation - eventually tick action should be called
  // Ensures that the end_block action is called infinitely often
  temporal liveliness = always(eventually(did_tick))

  // Parameters should be well-formed
  val params_well_formed = RATE_PER_BLOCK > 0 and MIN_LEASE_BLOCKS > 0 and INITIAL_BALANCE >= 0

  // If a user has open leases, their per-block charge should be exactly equal to number of open leases * RATE_PER_BLOCK
  val only_open_charged =
    USER_ADDRESSES.forall(u =>
      getPerBlockCharge(u) ==
      user_leases.get(u).filter(id => user_leases_status.get(id) == Open).size() * RATE_PER_BLOCK
    )

  // A lease can only be closed if it has been open for at least MIN_LEASE_BLOCKS
  val closed_min_age =
    user_leases_status.keys().forall(id =>
      user_leases_status.get(id) != Closed
      or lease_heights.get(id) + MIN_LEASE_BLOCKS <= h
    )

  // No lease can be opened in the future
  val heights_not_future =
    lease_heights.keys().forall(k => lease_heights.get(k) <= h)

  // Tick action should be called at least once every K steps
  val tick_often = steps_since_tick <= K

  // Conjunction of all invariants
  val invariants = no_negatives and
    next_id_monotone and
    all_leases_tracked and
    lease_id_partition and
    every_lease_has_status and
    coins_preserved and
    heights_match_status and
    params_well_formed and
    only_open_charged and
    closed_min_age and
    heights_not_future and
    tick_often
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment