Skip to content

Instantly share code, notes, and snippets.

@raphaelsc
Created April 14, 2025 01:44
Show Gist options
  • Save raphaelsc/2e6d6f3ccc57289903646fcd97bd2e41 to your computer and use it in GitHub Desktop.
Save raphaelsc/2e6d6f3ccc57289903646fcd97bd2e41 to your computer and use it in GitHub Desktop.
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_data_resurrection_with_repair_mode(manager):
cfg = {'tablets_mode_for_new_keyspaces': 'enabled'}
cmdline = [
"--enable-cache", "0",
"--smp", "1"]
node1 = await manager.server_add(cmdline=cmdline, config=cfg)
node2 = await manager.server_add(cmdline=cmdline, config=cfg)
node3 = await manager.server_add(cmdline=cmdline, config=cfg)
cql = manager.get_cql()
host1, host2 = await wait_for_cql_and_get_hosts(cql, [node1, node2], time.time() + 30)
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}")
cql.execute("CREATE TABLE ks.tbl (pk int, ck int, PRIMARY KEY (pk, ck)) WITH tombstone_gc = {'mode': 'repair', 'propagation_delay_in_seconds': '1'}")
# node 2 goes down
await manager.server_stop_gracefully(node2.server_id)
# node 1 stores the hint for offline node 2
stmt = SimpleStatement("INSERT INTO ks.tbl (pk, ck) VALUES (0,0)", consistency_level=ConsistencyLevel.QUORUM)
cql.execute(stmt, host=host1)
host1_id = await manager.get_host_id(node1.server_id)
logger.info(f"host1: {host1_id}")
# node storing hint goes down
await manager.server_stop_gracefully(node1.server_id)
# node missing P is online again
await manager.server_start(node2.server_id, wait_others=1)
host2, host3 = await wait_for_cql_and_get_hosts(cql, [node2, node3], time.time() + 30)
stmt = SimpleStatement("DELETE FROM ks.tbl WHERE pk = 0 AND ck = 0", consistency_level=ConsistencyLevel.QUORUM)
cql.execute(stmt, host=host2)
async def do_repair(node, ignore_node=''):
await manager.api.repair(node.ip_addr, "ks", "tbl", ignore_node=ignore_node)
await manager.servers_see_each_other([node2, node3])
logger.info(f"Node2 ip addr {node2.ip_addr}")
live_nodes = await manager.api.get_alive_endpoints(node2.ip_addr)
logger.info(f"Live nodes: {live_nodes}")
# repair runs with node storing hint offline
await do_repair(node2, node1.ip_addr)
await do_repair(node3, node1.ip_addr)
# simulates that default propagation delay has passed, but at second unit instead
time.sleep(2)
# tombstone in node 2 and 3 is GCed
for node in (node2, node3):
await manager.api.flush_all_keyspaces(node.ip_addr)
await manager.api.keyspace_compaction(node.ip_addr, "ks")
await manager.server_start(node1.server_id, wait_others=1)
# to force replay of hint stored in node 1 itself
await do_repair(node1)
time.sleep(10)
prepared = cql.prepare(f"SELECT * FROM ks.tbl WHERE pk = 0")
prepared.consistency_level = ConsistencyLevel.ALL
rows = await cql.run_async(prepared)
logger.info(f"** ROWS **: {len(rows)}")
assert len(rows) == 0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment