Skip to content

Instantly share code, notes, and snippets.

@raphaelsc
Created April 29, 2025 17:31
Show Gist options
  • Select an option

  • Save raphaelsc/68b64532905065171f652bd7591d3109 to your computer and use it in GitHub Desktop.

Select an option

Save raphaelsc/68b64532905065171f652bd7591d3109 to your computer and use it in GitHub Desktop.
reproducer for missing data post merge
#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from test.pylib.internal_types import ServerInfo
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error_one_shot, read_barrier
from test.pylib.tablets import get_all_tablet_replicas
from test.pylib.util import wait_for
from test.cluster.conftest import skip_mode
from test.cluster.util import new_test_keyspace, create_new_test_keyspace
import pytest
import asyncio
import logging
import time
import random
logger = logging.getLogger(__name__)
async def inject_error_one_shot_on(manager, error_name, servers):
errs = [inject_error_one_shot(manager.api, s.ip_addr, error_name) for s in servers]
await asyncio.gather(*errs)
async def inject_error_on(manager, error_name, servers):
errs = [manager.api.enable_injection(s.ip_addr, error_name, False) for s in servers]
await asyncio.gather(*errs)
async def disable_injection_on(manager, error_name, servers):
errs = [manager.api.disable_injection(s.ip_addr, error_name) for s in servers]
await asyncio.gather(*errs)
async def get_tablet_count(manager: ManagerClient, server: ServerInfo, keyspace_name: str, table_name: str):
host = manager.cql.cluster.metadata.get_host(server.ip_addr)
# read_barrier is needed to ensure that local tablet metadata on the queried node
# reflects the finalized tablet movement.
await read_barrier(manager.api, server.ip_addr)
table_id = await manager.get_table_id(keyspace_name, table_name)
rows = await manager.cql.run_async(f"SELECT tablet_count FROM system.tablets where "
f"table_id = {table_id}", host=host)
return rows[0].tablet_count
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_merge_simple(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'storage_service=debug',
'--logger-log-level', 'table=debug',
'--logger-log-level', 'load_balancer=debug',
'--target-tablet-size-in-bytes', '60000',
'--smp', '1',
]
servers = [await manager.server_add(config={
'error_injections_at_startup': ['short_tablet_stats_refresh_interval']
}, cmdline=cmdline)]
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;")
# Initial average table size of 400k (1 tablet), so triggers some splits.
total_keys = 10000
keys = range(total_keys)
def populate(keys):
insert = cql.prepare(f"INSERT INTO {ks}.test(pk, c) VALUES(?, ?)")
for pk in keys:
value = random.randbytes(2000)
cql.execute(insert, [pk, value])
populate(keys)
async def check():
logger.info("Checking table")
cql = manager.get_cql()
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
assert len(rows) == len(keys)
await check()
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
assert tablet_count == 1
# Increases the chance of tablet migration concurrent with split
await inject_error_one_shot_on(manager, "tablet_allocator_shuffle", servers)
await inject_error_on(manager, "tablet_load_stats_refresh_before_rebalancing", servers)
s1_log = await manager.server_open_log(servers[0].server_id)
s1_mark = await s1_log.mark()
# Now there's a split and migration need, so they'll potentially run concurrently.
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
await check()
time.sleep(2) # Give load balancer some time to do work
await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark)
await check()
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
assert tablet_count > 1
time.sleep(10)
# Allow shuffling of tablet replicas to make co-location work harder
async def shuffle():
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
time.sleep(2)
await disable_injection_on(manager, "tablet_allocator_shuffle", servers)
await shuffle()
# This will allow us to simulate some balancing after co-location with shuffling, to make sure that
# balancer won't break co-location.
await inject_error_on(manager, "tablet_merge_completion_bypass", servers)
# Shrinks table significantly, forcing merge.
delete_keys = range(total_keys - 1000)
await asyncio.gather(*[cql.run_async(f"DELETE FROM {ks}.test WHERE pk={k};") for k in delete_keys])
keys = range(total_keys - 1000, total_keys)
# To avoid race of major with migration
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
await manager.api.keyspace_compaction(server.ip_addr, ks)
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
await check()
await s1_log.wait_for("Emitting resize decision of type merge", from_mark=s1_mark)
# Waits for balancer to co-locate sibling tablets
await s1_log.wait_for("All sibling tablets are co-located")
# Do some shuffling to make sure balancer works with co-located tablets
await shuffle()
old_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
s1_mark = await s1_log.mark()
await inject_error_on(manager, "replica_merge_completion_wait", servers)
await disable_injection_on(manager, "tablet_merge_completion_bypass", servers)
await s1_log.wait_for('Detected tablet merge for table', from_mark=s1_mark)
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
assert tablet_count < old_tablet_count
await s1_log.wait_for('Merge completion fiber finished', from_mark=s1_mark)
await manager.api.drop_sstable_caches(servers[0].ip_addr)
await inject_error_on(manager, "enable_read_debug_log", servers)
cql = manager.get_cql()
rows = await cql.run_async(f"SELECT * FROM {ks}.test WHERE pk=9892;")
assert len(rows) == 1
await check()
await disable_injection_on(manager, "enable_read_debug_log", servers)
for i in range(0, 100):
await check()
time.sleep(0.1)
await s1_log.wait_for('Merge completion fiber finished', from_mark=s1_mark)
await check()
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
await manager.api.keyspace_compaction(server.ip_addr, ks)
await check()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment