Skip to content

Instantly share code, notes, and snippets.

@raphaelsc
Last active April 14, 2025 15:15
Show Gist options
  • Select an option

  • Save raphaelsc/90db9b09125dc2c9cb0eb826512b44aa to your computer and use it in GitHub Desktop.

Select an option

Save raphaelsc/90db9b09125dc2c9cb0eb826512b44aa to your computer and use it in GitHub Desktop.
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_data_resurrection_with_repair_mode(manager):
cfg = {}
cmdline = [
"--enable-cache", "0",
"--smp", "1"]
property_file_cfg = {"dc": "dc1", "rack": "r1"}
node1 = await manager.server_add(cmdline=cmdline, config=cfg, property_file=property_file_cfg)
node2 = await manager.server_add(cmdline=cmdline, config=cfg, property_file=property_file_cfg)
node3 = await manager.server_add(cmdline=cmdline, config=cfg, property_file=property_file_cfg)
cql = manager.get_cql()
host1, host2, host3 = await wait_for_cql_and_get_hosts(cql, [node1, node2, node3], time.time() + 30)
await manager.api.disable_tablet_balancing(node1.ip_addr)
cfg_with_hinted_handoff_delayed = {
'error_injections_at_startup': ['hinted_handoff_pause_hint_replay'],
'join_ring': 'true',
}
node4 = await manager.server_add(cmdline=cmdline, config=cfg_with_hinted_handoff_delayed, property_file={"dc": "dc2", "rack": "r2"})
#cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} and tablets = {'initial': 1}")
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 3, 'dc2': 0} and tablets = {'enabled': false}")
cql.execute("CREATE TABLE ks.tbl (pk int, ck int, PRIMARY KEY (pk, ck)) WITH tombstone_gc = {'mode': 'repair', 'propagation_delay_in_seconds': '1'}")
# node 2 (owning a tablet replica) goes down
await manager.server_stop_gracefully(node2.server_id)
host1, host4 = await wait_for_cql_and_get_hosts(cql, [node1, node4], time.time() + 30)
# node 4 (not owning any replica) stores the hint for offline node 2
stmt = SimpleStatement("INSERT INTO ks.tbl (pk, ck) VALUES (0,0)", consistency_level=ConsistencyLevel.QUORUM)
cql.execute(stmt, host=host4)
host4_id = await manager.get_host_id(node4.server_id)
logger.info(f"host4: {host4_id}")
# node storing hint goes down
await manager.server_stop_gracefully(node4.server_id)
# node missing partition P is online again
await manager.server_start(node2.server_id, wait_others=1)
host2, host3 = await wait_for_cql_and_get_hosts(cql, [node2, node3], time.time() + 30)
# Issue tombstone to all nodes owning replica
stmt = SimpleStatement("DELETE FROM ks.tbl WHERE pk = 0 AND ck = 0", consistency_level=ConsistencyLevel.QUORUM)
cql.execute(stmt, host=host2)
async def do_repair(node, ignore_node=''):
await manager.api.repair(node.ip_addr, "ks", "tbl", ignore_node=ignore_node)
await manager.servers_see_each_other([node1, node2, node3])
logger.info(f"Node2 ip addr {node2.ip_addr}")
live_nodes = await manager.api.get_alive_endpoints(node2.ip_addr)
logger.info(f"Live nodes: {live_nodes}")
# simulates that default propagation delay has passed, but at second unit instead
time.sleep(60)
# Repair runs on all nodes owning replica, but with node storing hint offline
await do_repair(node1, node4.ip_addr)
await do_repair(node2, node4.ip_addr)
await do_repair(node3, node4.ip_addr)
# simulates that default propagation delay has passed, but at second unit instead
time.sleep(30)
# tombstone in node 2 and 3 is GCed
for node in (node1, node2, node3):
await manager.api.flush_all_keyspaces(node.ip_addr)
await manager.api.keyspace_compaction(node.ip_addr, "ks")
await manager.server_start(node4.server_id, wait_others=1)
await manager.api.disable_injection(node4.ip_addr, "hinted_handoff_pause_hint_replay")
# to force replay of hint stored in node 4 itself
await do_repair(node4)
time.sleep(30)
cql = manager.get_cql()
host1, host2, host3, host4 = await wait_for_cql_and_get_hosts(cql, [node1, node2, node3, node4], time.time() + 30)
prepared = cql.prepare(f"SELECT * FROM ks.tbl WHERE pk = 0")
prepared.consistency_level = ConsistencyLevel.LOCAL_ONE
rows = await cql.run_async(prepared, host=host2)
logger.info(f"** ROWS **: {len(rows)}")
assert len(rows) == 0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment