Created
April 29, 2025 17:31
-
-
Save raphaelsc/68b64532905065171f652bd7591d3109 to your computer and use it in GitHub Desktop.
reproducer for missing data post merge
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # | |
| # Copyright (C) 2025-present ScyllaDB | |
| # | |
| # SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 | |
| # | |
| from test.pylib.internal_types import ServerInfo | |
| from test.pylib.manager_client import ManagerClient | |
| from test.pylib.rest_client import inject_error_one_shot, read_barrier | |
| from test.pylib.tablets import get_all_tablet_replicas | |
| from test.pylib.util import wait_for | |
| from test.cluster.conftest import skip_mode | |
| from test.cluster.util import new_test_keyspace, create_new_test_keyspace | |
| import pytest | |
| import asyncio | |
| import logging | |
| import time | |
| import random | |
| logger = logging.getLogger(__name__) | |
| async def inject_error_one_shot_on(manager, error_name, servers): | |
| errs = [inject_error_one_shot(manager.api, s.ip_addr, error_name) for s in servers] | |
| await asyncio.gather(*errs) | |
| async def inject_error_on(manager, error_name, servers): | |
| errs = [manager.api.enable_injection(s.ip_addr, error_name, False) for s in servers] | |
| await asyncio.gather(*errs) | |
| async def disable_injection_on(manager, error_name, servers): | |
| errs = [manager.api.disable_injection(s.ip_addr, error_name) for s in servers] | |
| await asyncio.gather(*errs) | |
| async def get_tablet_count(manager: ManagerClient, server: ServerInfo, keyspace_name: str, table_name: str): | |
| host = manager.cql.cluster.metadata.get_host(server.ip_addr) | |
| # read_barrier is needed to ensure that local tablet metadata on the queried node | |
| # reflects the finalized tablet movement. | |
| await read_barrier(manager.api, server.ip_addr) | |
| table_id = await manager.get_table_id(keyspace_name, table_name) | |
| rows = await manager.cql.run_async(f"SELECT tablet_count FROM system.tablets where " | |
| f"table_id = {table_id}", host=host) | |
| return rows[0].tablet_count | |
| @pytest.mark.asyncio | |
| @skip_mode('release', 'error injections are not supported in release mode') | |
| async def test_tablet_merge_simple(manager: ManagerClient): | |
| logger.info("Bootstrapping cluster") | |
| cmdline = [ | |
| '--logger-log-level', 'storage_service=debug', | |
| '--logger-log-level', 'table=debug', | |
| '--logger-log-level', 'load_balancer=debug', | |
| '--target-tablet-size-in-bytes', '60000', | |
| '--smp', '1', | |
| ] | |
| servers = [await manager.server_add(config={ | |
| 'error_injections_at_startup': ['short_tablet_stats_refresh_interval'] | |
| }, cmdline=cmdline)] | |
| await manager.api.disable_tablet_balancing(servers[0].ip_addr) | |
| cql = manager.get_cql() | |
| async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks: | |
| await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;") | |
| # Initial average table size of 400k (1 tablet), so triggers some splits. | |
| total_keys = 10000 | |
| keys = range(total_keys) | |
| def populate(keys): | |
| insert = cql.prepare(f"INSERT INTO {ks}.test(pk, c) VALUES(?, ?)") | |
| for pk in keys: | |
| value = random.randbytes(2000) | |
| cql.execute(insert, [pk, value]) | |
| populate(keys) | |
| async def check(): | |
| logger.info("Checking table") | |
| cql = manager.get_cql() | |
| rows = await cql.run_async(f"SELECT * FROM {ks}.test;") | |
| assert len(rows) == len(keys) | |
| await check() | |
| await manager.api.flush_keyspace(servers[0].ip_addr, ks) | |
| tablet_count = await get_tablet_count(manager, servers[0], ks, 'test') | |
| assert tablet_count == 1 | |
| # Increases the chance of tablet migration concurrent with split | |
| await inject_error_one_shot_on(manager, "tablet_allocator_shuffle", servers) | |
| await inject_error_on(manager, "tablet_load_stats_refresh_before_rebalancing", servers) | |
| s1_log = await manager.server_open_log(servers[0].server_id) | |
| s1_mark = await s1_log.mark() | |
| # Now there's a split and migration need, so they'll potentially run concurrently. | |
| await manager.api.enable_tablet_balancing(servers[0].ip_addr) | |
| await check() | |
| time.sleep(2) # Give load balancer some time to do work | |
| await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark) | |
| await check() | |
| tablet_count = await get_tablet_count(manager, servers[0], ks, 'test') | |
| assert tablet_count > 1 | |
| time.sleep(10) | |
| # Allow shuffling of tablet replicas to make co-location work harder | |
| async def shuffle(): | |
| await inject_error_on(manager, "tablet_allocator_shuffle", servers) | |
| time.sleep(2) | |
| await disable_injection_on(manager, "tablet_allocator_shuffle", servers) | |
| await shuffle() | |
| # This will allow us to simulate some balancing after co-location with shuffling, to make sure that | |
| # balancer won't break co-location. | |
| await inject_error_on(manager, "tablet_merge_completion_bypass", servers) | |
| # Shrinks table significantly, forcing merge. | |
| delete_keys = range(total_keys - 1000) | |
| await asyncio.gather(*[cql.run_async(f"DELETE FROM {ks}.test WHERE pk={k};") for k in delete_keys]) | |
| keys = range(total_keys - 1000, total_keys) | |
| # To avoid race of major with migration | |
| await manager.api.disable_tablet_balancing(servers[0].ip_addr) | |
| for server in servers: | |
| await manager.api.flush_keyspace(server.ip_addr, ks) | |
| await manager.api.keyspace_compaction(server.ip_addr, ks) | |
| await manager.api.enable_tablet_balancing(servers[0].ip_addr) | |
| await check() | |
| await s1_log.wait_for("Emitting resize decision of type merge", from_mark=s1_mark) | |
| # Waits for balancer to co-locate sibling tablets | |
| await s1_log.wait_for("All sibling tablets are co-located") | |
| # Do some shuffling to make sure balancer works with co-located tablets | |
| await shuffle() | |
| old_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test') | |
| s1_mark = await s1_log.mark() | |
| await inject_error_on(manager, "replica_merge_completion_wait", servers) | |
| await disable_injection_on(manager, "tablet_merge_completion_bypass", servers) | |
| await s1_log.wait_for('Detected tablet merge for table', from_mark=s1_mark) | |
| tablet_count = await get_tablet_count(manager, servers[0], ks, 'test') | |
| assert tablet_count < old_tablet_count | |
| await s1_log.wait_for('Merge completion fiber finished', from_mark=s1_mark) | |
| await manager.api.drop_sstable_caches(servers[0].ip_addr) | |
| await inject_error_on(manager, "enable_read_debug_log", servers) | |
| cql = manager.get_cql() | |
| rows = await cql.run_async(f"SELECT * FROM {ks}.test WHERE pk=9892;") | |
| assert len(rows) == 1 | |
| await check() | |
| await disable_injection_on(manager, "enable_read_debug_log", servers) | |
| for i in range(0, 100): | |
| await check() | |
| time.sleep(0.1) | |
| await s1_log.wait_for('Merge completion fiber finished', from_mark=s1_mark) | |
| await check() | |
| for server in servers: | |
| await manager.api.flush_keyspace(server.ip_addr, ks) | |
| await manager.api.keyspace_compaction(server.ip_addr, ks) | |
| await check() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment