Skip to content

Instantly share code, notes, and snippets.

@kokizzu
Last active October 4, 2025 15:17
Show Gist options
  • Save kokizzu/50f564c23894a9c8abded6c09b3ae1eb to your computer and use it in GitHub Desktop.
Save kokizzu/50f564c23894a9c8abded6c09b3ae1eb to your computer and use it in GitHub Desktop.
Mega migration
-- from 003_candles_new_migration.sql
-------------------------------------
USE indexer;
SET max_memory_usage = 0,
max_execution_time = 0,
send_timeout = 0,
receive_timeout = 0,
optimize_on_insert = 0,
max_rows_to_read = 0,
max_bytes_to_read = 0,
max_threads = 8,
max_bytes_before_external_group_by = '3G',
group_by_two_level_threshold = 100000,
group_by_two_level_threshold_bytes = '128M',
max_bytes_before_external_sort = '2G',
allow_nondeterministic_mutations = 1,
max_table_size_to_drop = 0;
DROP TABLE IF EXISTS indexer.mv_ohlcv_agg_slot;
DROP TABLE IF EXISTS indexer.ohlcv_agg_slot;
DROP TABLE IF EXISTS indexer.mv_mev_sandwich_slots;
DROP TABLE IF EXISTS indexer.mev_sandwich_slots;
CREATE TABLE indexer.ohlcv_agg_slot
(
quote_mint FixedString(32),
pool FixedString(32),
is_launchpad UInt8,
bucket UInt64,
bucket_ts DateTime,
open_state_usd AggregateFunction(argMin, Float64, Tuple(
UInt32,
Int8,
Int8)),
high_state_usd AggregateFunction(max, Float64),
low_state_usd AggregateFunction(min, Float64),
close_state_usd AggregateFunction(argMax, Float64, Tuple(
UInt32,
Int8,
Int8)),
vol_state_usd AggregateFunction(sumIf, Float64, UInt8),
open_state_base AggregateFunction(argMin, Float64, Tuple(
UInt32,
Int8,
Int8)),
high_state_base AggregateFunction(max, Float64),
low_state_base AggregateFunction(min, Float64),
close_state_base AggregateFunction(argMax, Float64, Tuple(
UInt32,
Int8,
Int8)),
vol_state_base AggregateFunction(sumIf, UInt64, UInt8)
)
ENGINE = AggregatingMergeTree
ORDER BY (quote_mint, bucket, pool, is_launchpad);
CREATE MATERIALIZED VIEW indexer.mv_ohlcv_agg_slot TO indexer.ohlcv_agg_slot
(
quote_mint FixedString(32),
pool FixedString(32),
is_launchpad UInt8,
bucket UInt64,
bucket_ts DateTime('UTC'),
open_state_usd AggregateFunction(argMin, Float64, Tuple(
UInt32,
Int8,
Int8)),
high_state_usd AggregateFunction(max, Float64),
low_state_usd AggregateFunction(min, Float64),
close_state_usd AggregateFunction(argMax, Float64, Tuple(
UInt32,
Int8,
Int8)),
vol_state_usd AggregateFunction(sumIf, Float64, UInt8),
open_state_base AggregateFunction(argMin, Float64, Tuple(
UInt32,
Int8,
Int8)),
high_state_base AggregateFunction(max, Float64),
low_state_base AggregateFunction(min, Float64),
close_state_base AggregateFunction(argMax, Float64, Tuple(
UInt32,
Int8,
Int8)),
vol_state_base AggregateFunction(sumIf, UInt64, UInt8)
)
AS SELECT
quote_mint,
pool,
toUInt8(is_launchpad) AS is_launchpad,
slot AS bucket,
timestamp AS bucket_ts,
argMinState(price_usd, (transaction_index, instruction_index, inner_instruction_index)) AS open_state_usd,
maxState(price_usd) AS high_state_usd,
minState(price_usd) AS low_state_usd,
argMaxState(price_usd, (transaction_index, instruction_index, inner_instruction_index)) AS close_state_usd,
sumIfState(usd_amount, is_swap = 1) AS vol_state_usd,
argMinState(price_base, (transaction_index, instruction_index, inner_instruction_index)) AS open_state_base,
maxState(price_base) AS high_state_base,
minState(price_base) AS low_state_base,
argMaxState(price_base, (transaction_index, instruction_index, inner_instruction_index)) AS close_state_base,
sumIfState(base_amount, is_swap = 1) AS vol_state_base
FROM indexer.parsed_transactions_optimized
WHERE ((is_swap = 1) AND (base_amount >= 100) AND (quote_amount >= 100) AND (price_usd > 0) AND (price_base > 0)) OR ((is_launchpad = 1) AND (is_swap = 0))
GROUP BY
quote_mint,
pool,
is_launchpad,
bucket,
bucket_ts;
CREATE TABLE indexer.mev_sandwich_slots
(
quote_mint FixedString(32),
slot UInt64
)
ENGINE = MergeTree
ORDER BY (quote_mint, slot);
CREATE MATERIALIZED VIEW indexer.mv_mev_sandwich_slots TO indexer.mev_sandwich_slots
(
quote_mint FixedString(32),
slot UInt64
)
AS SELECT
quote_mint,
slot
FROM
(
SELECT
quote_mint,
slot,
countIf((method = 'buy') AND (base_amount >= 100) AND (quote_amount >= 100) AND (price_usd != 0) AND (price_base != 0)) AS buys,
countIf((method = 'sell') AND (base_amount >= 100) AND (quote_amount >= 100) AND (price_usd != 0) AND (price_base != 0)) AS sells,
minIf(price_base, method = 'buy') AS min_buy,
maxIf(price_base, method = 'sell') AS max_sell
FROM indexer.parsed_transactions_optimized
GROUP BY
quote_mint,
slot,
wallet
HAVING (buys > 0) AND (sells > 0) AND (min_buy < max_sell)
)
GROUP BY
quote_mint,
slot;
INSERT INTO indexer.ohlcv_agg_slot
SELECT
quote_mint,
pool,
toUInt8(is_launchpad) AS is_launchpad,
slot AS bucket,
timestamp AS bucket_ts,
argMinState(price_usd, (transaction_index, instruction_index, inner_instruction_index)) AS open_state_usd,
maxState(price_usd) AS high_state_usd,
minState(price_usd) AS low_state_usd,
argMaxState(price_usd, (transaction_index, instruction_index, inner_instruction_index)) AS close_state_usd,
sumIfState(usd_amount, is_swap = 1) AS vol_state_usd,
argMinState(price_base, (transaction_index, instruction_index, inner_instruction_index)) AS open_state_base,
maxState(price_base) AS high_state_base,
minState(price_base) AS low_state_base,
argMaxState(price_base, (transaction_index, instruction_index, inner_instruction_index)) AS close_state_base,
sumIfState(base_amount, is_swap = 1) AS vol_state_base
FROM indexer.parsed_transactions_optimized
WHERE ((is_swap = 1) AND (base_amount >= 100) AND (quote_amount >= 100) AND (price_usd > 0) AND (price_base > 0)) OR ((is_launchpad = 1) AND (is_swap = 0))
GROUP BY
quote_mint,
pool,
is_launchpad,
bucket,
bucket_ts
SETTINGS
max_memory_usage = '120G',
max_threads = '60',
max_bytes_before_external_group_by = '16G',
group_by_two_level_threshold = 500000,
group_by_two_level_threshold_bytes = '640M',
max_bytes_before_external_sort = '16G';
-- Break down the complex query into steps to reduce memory usage
CREATE TEMPORARY TABLE temp_wallet_sandwich_candidates AS
SELECT
quote_mint,
slot,
wallet,
countIf((method = 'buy') AND (base_amount >= 100) AND (quote_amount >= 100) AND (price_usd != 0) AND (price_base != 0)) AS buys,
countIf((method = 'sell') AND (base_amount >= 100) AND (quote_amount >= 100) AND (price_usd != 0) AND (price_base != 0)) AS sells,
minIf(price_base, method = 'buy') AS min_buy,
maxIf(price_base, method = 'sell') AS max_sell
FROM indexer.parsed_transactions_optimized
GROUP BY
quote_mint,
slot,
wallet
HAVING (buys > 0) AND (sells > 0) AND (min_buy < max_sell)
SETTINGS
max_memory_usage = 0,
max_threads = 6,
max_bytes_before_external_group_by = '16G',
group_by_two_level_threshold = 50000,
group_by_two_level_threshold_bytes = '64M',
max_bytes_before_external_sort = '16G',
join_algorithm = 'hash',
max_rows_in_join = 10000000;
INSERT INTO indexer.mev_sandwich_slots
SELECT
quote_mint,
slot
FROM temp_wallet_sandwich_candidates
GROUP BY
quote_mint,
slot
SETTINGS
max_memory_usage = 0,
max_threads = 6,
max_bytes_before_external_group_by = '16G',
group_by_two_level_threshold = 250000,
group_by_two_level_threshold_bytes = '320M',
max_bytes_before_external_sort = '16G';
DROP TABLE temp_wallet_sandwich_candidates;
SELECT 'OHLCV rows inserted:', count() FROM indexer.ohlcv_agg_slot;
SELECT 'MEV rows inserted:', count() FROM indexer.mev_sandwich_slots;
DROP TABLE IF EXISTS indexer.mv_trade_agg_qm_wallet;
ALTER TABLE indexer.trade_agg_qm_wallet
ADD COLUMN buy_usd_amount AggregateFunction(sum, Float64),
ADD COLUMN sell_usd_amount AggregateFunction(sum, Float64);
CREATE MATERIALIZED VIEW indexer.mv_trade_agg_qm_wallet TO indexer.trade_agg_qm_wallet
(
quote_mint FixedString(32),
wallet FixedString(32),
buy_count AggregateFunction(count),
sell_count AggregateFunction(count),
buy_base_amount AggregateFunction(sum, UInt64),
sell_base_amount AggregateFunction(sum, UInt64),
buy_quote_amount AggregateFunction(sum, UInt64),
sell_quote_amount AggregateFunction(sum, UInt64),
launchpad AggregateFunction(any, String),
buy_usd_amount AggregateFunction(sum, Float64),
sell_usd_amount AggregateFunction(sum, Float64)
)
AS SELECT
quote_mint,
wallet,
countMergeState(buy_count) AS buy_count,
countMergeState(sell_count) AS sell_count,
sumMergeState(buy_base_amount) AS buy_base_amount,
sumMergeState(sell_base_amount) AS sell_base_amount,
sumMergeState(buy_quote_amount) AS buy_quote_amount,
sumMergeState(sell_quote_amount) AS sell_quote_amount,
anyMergeState(launchpad) AS launchpad,
sumMergeState(buy_usd_amount) AS buy_usd_amount,
sumMergeState(sell_usd_amount) AS sell_usd_amount
FROM indexer.trade_aggregates_optimized
GROUP BY
quote_mint,
wallet;
TRUNCATE TABLE indexer.trade_agg_qm_wallet SETTINGS max_table_size_to_drop = 0 ;
INSERT INTO indexer.trade_agg_qm_wallet
SELECT
quote_mint,
wallet,
countMergeState(buy_count) AS buy_count,
countMergeState(sell_count) AS sell_count,
sumMergeState(buy_base_amount) AS buy_base_amount,
sumMergeState(sell_base_amount) AS sell_base_amount,
sumMergeState(buy_quote_amount) AS buy_quote_amount,
sumMergeState(sell_quote_amount) AS sell_quote_amount,
anyMergeState(launchpad) AS launchpad,
sumMergeState(buy_usd_amount) AS buy_usd_amount,
sumMergeState(sell_usd_amount) AS sell_usd_amount
FROM indexer.trade_aggregates_optimized
GROUP BY
quote_mint,
wallet
SETTINGS
max_memory_usage = '120G',
max_threads = 60,
max_bytes_before_external_group_by = '16G',
group_by_two_level_threshold = 50000,
group_by_two_level_threshold_bytes = '64M',
max_bytes_before_external_sort = '16G';
SELECT 'Trade aggregation rows updated:', count() FROM indexer.trade_agg_qm_wallet;
CREATE TABLE indexer.ohlcv_finalized
(
quote_mint FixedString(32),
pool FixedString(32),
is_launchpad UInt8,
bucket UInt64,
bucket_ts DateTime,
open_usd Float64,
high_usd Float64,
low_usd Float64,
close_usd Float64,
vol_usd Float64,
open_base Float64,
high_base Float64,
low_base Float64,
close_base Float64,
vol_base UInt64
)
ENGINE = MergeTree
ORDER BY (quote_mint, bucket, pool, is_launchpad);
CREATE MATERIALIZED VIEW indexer.ohlcv_mv TO indexer.ohlcv_finalized AS
SELECT
quote_mint,
pool,
is_launchpad,
bucket,
bucket_ts,
finalizeAggregation(open_state_usd) AS open_usd,
finalizeAggregation(high_state_usd) AS high_usd,
finalizeAggregation(low_state_usd) AS low_usd,
finalizeAggregation(close_state_usd) AS close_usd,
finalizeAggregation(vol_state_usd) AS vol_usd,
finalizeAggregation(open_state_base) AS open_base,
finalizeAggregation(high_state_base) AS high_base,
finalizeAggregation(low_state_base) AS low_base,
finalizeAggregation(close_state_base) AS close_base,
finalizeAggregation(vol_state_base) AS vol_base
FROM indexer.ohlcv_agg_slot;
INSERT INTO indexer.ohlcv_finalized
SELECT
quote_mint,
pool,
is_launchpad,
bucket,
bucket_ts,
finalizeAggregation(open_state_usd) AS open_usd,
finalizeAggregation(high_state_usd) AS high_usd,
finalizeAggregation(low_state_usd) AS low_usd,
finalizeAggregation(close_state_usd) AS close_usd,
finalizeAggregation(vol_state_usd) AS vol_usd,
finalizeAggregation(open_state_base) AS open_base,
finalizeAggregation(high_state_base) AS high_base,
finalizeAggregation(low_state_base) AS low_base,
finalizeAggregation(close_state_base) AS close_base,
finalizeAggregation(vol_state_base) AS vol_base
FROM indexer.ohlcv_agg_slot;
INSERT INTO system_migrations (name)
VALUES ('003_candles_new_migration');
-- from 004_pools_and_metrics__pool_aggregates_optimized2.sql
-------------------------------------------------------------
/* ===============================
MIGRATION 004 — Good Pools & Metrics (metrics embedded in best_pool_resolved_by_qmint)
=============================== */
USE indexer;
SET max_memory_usage = '150G',
max_execution_time = 0,
send_timeout = 0,
receive_timeout = 0,
optimize_on_insert = 0,
max_rows_to_read = 0,
max_bytes_to_read = 0,
max_threads = 8,
max_bytes_before_external_group_by = '3G',
group_by_two_level_threshold = 100000,
group_by_two_level_threshold_bytes = '128M',
max_bytes_before_external_sort = '2G',
allow_nondeterministic_mutations = 1,
max_table_size_to_drop = 0;
/* ---------- DROP old objects in dependency order (if exist) ---------- */
DROP VIEW IF EXISTS indexer.mv_best_pool_metrics_by_qmint;
DROP TABLE IF EXISTS indexer.best_pool_metrics_by_qmint;
DROP VIEW IF EXISTS indexer.mv_best_pool_resolved_by_qmint;
DROP TABLE IF EXISTS indexer.best_pool_resolved_by_qmint;
DROP VIEW IF EXISTS indexer.mv_best_from_snapshot;
DROP TABLE IF EXISTS indexer.best_pools_by_qmint_agg;
DROP VIEW IF EXISTS indexer.mv_pool_aggregates_optimized;
DROP TABLE IF EXISTS indexer.pool_aggregates_optimized;
DROP VIEW IF EXISTS indexer.mv_pool_aggregates_optimized2;
DROP TABLE IF EXISTS indexer.pool_aggregates_optimized2;
/* ==============================================
CHANGE #1 — Pool aggregates (optimized)
============================================== */
CREATE TABLE indexer.pool_aggregates_optimized
(
/* functionally dependent on pool */
`base_mint` FixedString(32),
`quote_mint` FixedString(32),
`pool` FixedString(32),
/* labels */
`platform` LowCardinality(String),
`launchpad` LowCardinality(String),
/* ever-true flags (logical OR over history) */
`is_migration_state` AggregateFunction(max, Bool),
/* counters & sums */
`buy_count` AggregateFunction(countIf, Bool),
`sell_count` AggregateFunction(countIf, Bool),
`buy_base_amount` AggregateFunction(sum, UInt64),
`sell_base_amount` AggregateFunction(sum, UInt64),
`buy_usd_amount` AggregateFunction(sum, Float64),
`sell_usd_amount` AggregateFunction(sum, Float64),
/* recency-weighted by position tuple */
`price_base` AggregateFunction(argMax, Float64, Tuple(UInt64, UInt32, Int8, Int8)),
`price_usd` AggregateFunction(argMax, Float64, Tuple(UInt64, UInt32, Int8, Int8)),
`base_liquidity` AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)),
`quote_liquidity` AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)),
`base_liquidity_real` AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)),
`quote_liquidity_real` AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)),
/* earliest position */
`first_transaction_time` AggregateFunction(min, DateTime('UTC')),
/* order-only projection */
PROJECTION quote_mint_projection
(
SELECT *
ORDER BY quote_mint, pool
)
)
ENGINE = ReplicatedAggregatingMergeTree(
'/clickhouse/tables/{shard}/indexer/pool_aggregates_optimized_new', '{replica}'
)
ORDER BY pool
SETTINGS index_granularity = 2048,
deduplicate_merge_projection_mode = 'drop';
CREATE MATERIALIZED VIEW indexer.mv_pool_aggregates_optimized
TO indexer.pool_aggregates_optimized
AS
SELECT
/* states / labels via any() because we GROUP BY pool */
any(base_mint) AS base_mint,
any(quote_mint) AS quote_mint,
pool,
any(platform) AS platform,
any(launchpad) AS launchpad,
/* ever-true flags */
maxState(is_migration) AS is_migration_state,
/* measures */
countIfState(method = 'buy') AS buy_count,
countIfState(method = 'sell') AS sell_count,
sumIfState(base_amount, method = 'buy') AS buy_base_amount,
sumIfState(base_amount, method = 'sell') AS sell_base_amount,
sumIfState(usd_amount, method = 'buy') AS buy_usd_amount,
sumIfState(usd_amount, method = 'sell') AS sell_usd_amount,
/* recency by position tuple (slot, tx_idx, ix_idx, inner_ix_idx)
Apply the ≥100 base/quote thresholds and require swaps for prices. */
argMaxStateIf(
price_base,
(slot, transaction_index, instruction_index, inner_instruction_index),
(is_swap = 1) AND (base_amount >= 100) AND (quote_amount >= 100)
) AS price_base,
argMaxStateIf(
price_usd,
(slot, transaction_index, instruction_index, inner_instruction_index),
(is_swap = 1) AND (base_amount >= 100) AND (quote_amount >= 100)
) AS price_usd,
/* liquidity snapshots by recency (no additional filter) */
argMaxState(
base_liquidity,
(slot, transaction_index, instruction_index, inner_instruction_index)
) AS base_liquidity,
argMaxState(
quote_liquidity,
(slot, transaction_index, instruction_index, inner_instruction_index)
) AS quote_liquidity,
argMaxState(
base_liquidity_real,
(slot, transaction_index, instruction_index, inner_instruction_index)
) AS base_liquidity_real,
argMaxState(
quote_liquidity_real,
(slot, transaction_index, instruction_index, inner_instruction_index)
) AS quote_liquidity_real,
/* earliest observed ts */
minState(timestamp) AS first_transaction_time
FROM indexer.parsed_transactions_optimized
GROUP BY pool;
/* ---------- Backfill pool_aggregates_optimized ---------- */
INSERT INTO indexer.pool_aggregates_optimized
SELECT
any(base_mint) AS base_mint,
any(quote_mint) AS quote_mint,
pool,
any(platform) AS platform,
any(launchpad) AS launchpad,
maxState(is_migration) AS is_migration_state,
countIfState(method = 'buy') AS buy_count,
countIfState(method = 'sell') AS sell_count,
sumIfState(base_amount, method = 'buy') AS buy_base_amount,
sumIfState(base_amount, method = 'sell') AS sell_base_amount,
sumIfState(usd_amount, method = 'buy') AS buy_usd_amount,
sumIfState(usd_amount, method = 'sell') AS sell_usd_amount,
argMaxStateIf(
price_base,
(slot, transaction_index, instruction_index, inner_instruction_index),
(is_swap = 1) AND (base_amount >= 100) AND (quote_amount >= 100)
) AS price_base,
argMaxStateIf(
price_usd,
(slot, transaction_index, instruction_index, inner_instruction_index),
(is_swap = 1) AND (base_amount >= 100) AND (quote_amount >= 100)
) AS price_usd,
argMaxState(
base_liquidity,
(slot, transaction_index, instruction_index, inner_instruction_index)
) AS base_liquidity,
argMaxState(
quote_liquidity,
(slot, transaction_index, instruction_index, inner_instruction_index)
) AS quote_liquidity,
argMaxState(
base_liquidity_real,
(slot, transaction_index, instruction_index, inner_instruction_index)
) AS base_liquidity_real,
argMaxState(
quote_liquidity_real,
(slot, transaction_index, instruction_index, inner_instruction_index)
) AS quote_liquidity_real,
minState(timestamp) AS first_transaction_time
FROM indexer.parsed_transactions_optimized
GROUP BY pool
SETTINGS
max_memory_usage = '120G',
max_threads = 16,
max_bytes_before_external_group_by = '16G',
group_by_two_level_threshold = 80000,
group_by_two_level_threshold_bytes = '96M',
max_bytes_before_external_sort = '16G';
/* ==============================================
CHANGE #2 — Best pools per quote_mint (agg)
============================================== */
CREATE TABLE IF NOT EXISTS indexer.best_pools_by_qmint_agg
(
quote_mint FixedString(32),
-- official pools
migration_state AggregateFunction(any,
Tuple(FixedString(32), LowCardinality(String), LowCardinality(String))),
launchpad_state AggregateFunction(any,
Tuple(FixedString(32), LowCardinality(String), LowCardinality(String))),
-- best non-official by latest known liquidity (UInt64)
best_liq_state AggregateFunction(argMax,
Tuple(FixedString(32), LowCardinality(String), LowCardinality(String)),
UInt64)
)
ENGINE = ReplicatedAggregatingMergeTree(
'/clickhouse/tables/{shard}/indexer/best_pools_by_qmint_agg', '{replica}'
)
ORDER BY quote_mint;
CREATE MATERIALIZED VIEW IF NOT EXISTS indexer.mv_best_from_snapshot
TO indexer.best_pools_by_qmint_agg
AS
SELECT
quote_mint,
-- unique migration pool per mint
anyStateIf( (pool, platform, launchpad), is_migration = 1 ) AS migration_state,
-- unique launchpad pool per mint (non-empty launchpad label)
anyStateIf( (pool, platform, launchpad), launchpad <> '' ) AS launchpad_state,
-- best non-official by latest liquidity snapshot
argMaxStateIf(
(pool, platform, launchpad),
last_liq,
(is_migration = 0) AND (launchpad = '')
) AS best_liq_state
FROM
(
/* Snapshot per pool (functionally dependent columns via any*; no nested aggs above this level) */
SELECT
any(quote_mint) AS quote_mint,
pool,
any(platform) AS platform,
any(launchpad) AS launchpad,
-- latest known liquidity per pool (UInt64)
argMaxMerge(quote_liquidity_real) AS last_liq,
-- ever-true migration flag
maxMerge(is_migration_state) AS is_migration
FROM indexer.pool_aggregates_optimized
GROUP BY pool
)
GROUP BY quote_mint;
/* ---------- Backfill best_pools_by_qmint_agg ---------- */
INSERT INTO indexer.best_pools_by_qmint_agg
SELECT
quote_mint,
anyStateIf( (pool, platform, launchpad), is_migration = 1 ) AS migration_state,
anyStateIf( (pool, platform, launchpad), launchpad <> '' ) AS launchpad_state,
argMaxStateIf( (pool, platform, launchpad), last_liq,
(is_migration = 0) AND (launchpad = '') ) AS best_liq_state
FROM
(
SELECT
any(quote_mint) AS quote_mint,
pool,
any(platform) AS platform,
any(launchpad) AS launchpad,
argMaxMerge(quote_liquidity_real) AS last_liq,
maxMerge(is_migration_state) AS is_migration
FROM indexer.pool_aggregates_optimized
GROUP BY pool
)
GROUP BY quote_mint
SETTINGS
max_memory_usage = '80G',
max_threads = 32;
/* ==============================================
CHANGE #3 — Resolve single best (and historical)
+ embed ALL metrics directly here
============================================== */
CREATE TABLE IF NOT EXISTS indexer.best_pool_resolved_by_qmint
(
quote_mint FixedString(32),
pool FixedString(32),
platform LowCardinality(String),
launchpad LowCardinality(String),
is_launchpad Bool,
is_migration Bool,
historical_pool Nullable(FixedString(32)),
historical_platform Nullable(String), -- previously Nullable(LowCardinality(String
historical_launchpad Nullable(String), -- previously Nullable(LowCardinality(String
/* === embedded metrics aggregated across the chosen pools (primary + optional historical) === */
buy_count AggregateFunction(countIf, Bool),
sell_count AggregateFunction(countIf, Bool),
buy_base_amount AggregateFunction(sum, UInt64),
sell_base_amount AggregateFunction(sum, UInt64),
buy_usd_amount AggregateFunction(sum, Float64),
sell_usd_amount AggregateFunction(sum, Float64),
price_base AggregateFunction(argMax, Float64, Tuple(UInt64, UInt32, Int8, Int8)),
price_usd AggregateFunction(argMax, Float64, Tuple(UInt64, UInt32, Int8, Int8)),
base_liquidity AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)),
quote_liquidity AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)),
base_liquidity_real AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)),
quote_liquidity_real AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)),
first_transaction_time AggregateFunction(min, DateTime('UTC'))
)
ENGINE = ReplacingMergeTree
ORDER BY quote_mint;
CREATE MATERIALIZED VIEW IF NOT EXISTS indexer.mv_best_pool_resolved_by_qmint
TO indexer.best_pool_resolved_by_qmint
AS
WITH
toFixedString('', 32) AS empty32,
(
empty32,
CAST('' AS LowCardinality(String)),
CAST('' AS LowCardinality(String))
) AS empty_tuple
SELECT
q.quote_mint,
/* identity columns for the primary (resolved) pool */
primary_tuple.1 AS pool,
primary_tuple.2 AS platform,
primary_tuple.3 AS launchpad,
CAST(q.mig IS NULL AND q.lp IS NOT NULL AS Bool) AS is_launchpad,
CAST(q.mig IS NOT NULL AS Bool) AS is_migration,
/* historical (optional) identity */
if(
q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1,
q.lp.1, CAST(NULL AS Nullable(FixedString(32)))
) AS historical_pool,
if(
q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1,
q.lp.2, CAST(NULL AS Nullable(String))
) AS historical_platform,
if(
q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1,
q.lp.3, CAST(NULL AS Nullable(String))
) AS historical_launchpad,
/* === merged metrics across chosen pools === */
countIfMergeState(p.buy_count) AS buy_count,
countIfMergeState(p.sell_count) AS sell_count,
sumMergeState(p.buy_base_amount) AS buy_base_amount,
sumMergeState(p.sell_base_amount) AS sell_base_amount,
sumMergeState(p.buy_usd_amount) AS buy_usd_amount,
sumMergeState(p.sell_usd_amount) AS sell_usd_amount,
argMaxMergeState(p.price_base) AS price_base,
argMaxMergeState(p.price_usd) AS price_usd,
argMaxMergeState(p.base_liquidity) AS base_liquidity,
argMaxMergeState(p.quote_liquidity) AS quote_liquidity,
argMaxMergeState(p.base_liquidity_real) AS base_liquidity_real,
argMaxMergeState(p.quote_liquidity_real) AS quote_liquidity_real,
minMergeState(p.first_transaction_time) AS first_transaction_time
FROM
(
/* Resolve the tuple set per quote_mint */
SELECT
quote_mint,
if(anyMerge(migration_state) != empty_tuple, anyMerge(migration_state), empty_tuple) AS mig,
if(anyMerge(launchpad_state) != empty_tuple, anyMerge(launchpad_state), empty_tuple) AS lp,
if(argMaxMerge(best_liq_state) != empty_tuple, argMaxMerge(best_liq_state), empty_tuple) AS best,
coalesce(
if(anyMerge(migration_state) != empty_tuple, anyMerge(migration_state), empty_tuple),
if(anyMerge(launchpad_state) != empty_tuple, anyMerge(launchpad_state), empty_tuple),
if(argMaxMerge(best_liq_state) != empty_tuple, argMaxMerge(best_liq_state), empty_tuple)
) AS primary_tuple
FROM indexer.best_pools_by_qmint_agg
GROUP BY quote_mint
) AS q
/* expand to the set of chosen pools: primary + (optional) historical lp */
ARRAY JOIN arrayFilter(x -> x.1 != empty32,
[ primary_tuple,
if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp, empty_tuple)
]) AS chosen_tuple
ANY LEFT JOIN indexer.pool_aggregates_optimized AS p
ON p.pool = chosen_tuple.1
GROUP BY
q.quote_mint,
primary_tuple.1, primary_tuple.2, primary_tuple.3,
CAST(q.mig IS NULL AND q.lp IS NOT NULL AS Bool),
CAST(q.mig IS NOT NULL AS Bool),
if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.1, CAST(NULL AS Nullable(FixedString(32)))),
if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.2, CAST(NULL AS Nullable(String))),
if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.3, CAST(NULL AS Nullable(String)));
/* ---------- Backfill best_pool_resolved_by_qmint (with embedded metrics) ---------- */
INSERT INTO indexer.best_pool_resolved_by_qmint
WITH
toFixedString('', 32) AS empty32,
(
empty32,
CAST('' AS LowCardinality(String)),
CAST('' AS LowCardinality(String))
) AS empty_tuple
SELECT
q.quote_mint,
/* identity columns for the primary (resolved) pool */
primary_tuple.1 AS pool,
primary_tuple.2 AS platform,
primary_tuple.3 AS launchpad,
CAST(q.mig IS NULL AND q.lp IS NOT NULL AS Bool) AS is_launchpad,
CAST(q.mig IS NOT NULL AS Bool) AS is_migration,
/* historical (optional) identity */
if(
q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1,
q.lp.1, CAST(NULL AS Nullable(FixedString(32)))
) AS historical_pool,
if(
q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1,
q.lp.2, CAST(NULL AS Nullable(String))
) AS historical_platform,
if(
q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1,
q.lp.3, CAST(NULL AS Nullable(String))
) AS historical_launchpad,
/* === merged metrics across chosen pools === */
countIfMergeState(p.buy_count) AS buy_count,
countIfMergeState(p.sell_count) AS sell_count,
sumMergeState(p.buy_base_amount) AS buy_base_amount,
sumMergeState(p.sell_base_amount) AS sell_base_amount,
sumMergeState(p.buy_usd_amount) AS buy_usd_amount,
sumMergeState(p.sell_usd_amount) AS sell_usd_amount,
argMaxMergeState(p.price_base) AS price_base,
argMaxMergeState(p.price_usd) AS price_usd,
argMaxMergeState(p.base_liquidity) AS base_liquidity,
argMaxMergeState(p.quote_liquidity) AS quote_liquidity,
argMaxMergeState(p.base_liquidity_real) AS base_liquidity_real,
argMaxMergeState(p.quote_liquidity_real) AS quote_liquidity_real,
minMergeState(p.first_transaction_time) AS first_transaction_time
FROM
(
/* Resolve the tuple set per quote_mint */
SELECT
quote_mint,
if(anyMerge(migration_state) != empty_tuple, anyMerge(migration_state), empty_tuple) AS mig,
if(anyMerge(launchpad_state) != empty_tuple, anyMerge(launchpad_state), empty_tuple) AS lp,
if(argMaxMerge(best_liq_state) != empty_tuple, argMaxMerge(best_liq_state), empty_tuple) AS best,
coalesce(
if(anyMerge(migration_state) != empty_tuple, anyMerge(migration_state), empty_tuple),
if(anyMerge(launchpad_state) != empty_tuple, anyMerge(launchpad_state), empty_tuple),
if(argMaxMerge(best_liq_state) != empty_tuple, argMaxMerge(best_liq_state), empty_tuple)
) AS primary_tuple
FROM indexer.best_pools_by_qmint_agg
GROUP BY quote_mint
) AS q
/* expand to the set of chosen pools: primary + (optional) historical lp */
ARRAY JOIN arrayFilter(x -> x.1 != empty32,
[ primary_tuple,
if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp, empty_tuple)
]) AS chosen_tuple
ANY LEFT JOIN indexer.pool_aggregates_optimized AS p
ON p.pool = chosen_tuple.1
GROUP BY
q.quote_mint,
primary_tuple.1, primary_tuple.2, primary_tuple.3,
CAST(q.mig IS NULL AND q.lp IS NOT NULL AS Bool),
CAST(q.mig IS NOT NULL AS Bool),
if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.1, CAST(NULL AS Nullable(FixedString(32)))),
if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.2, CAST(NULL AS Nullable(String))),
if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.3, CAST(NULL AS Nullable(String)))
SETTINGS
max_memory_usage = '120G',
max_bytes_in_join = '120G',
join_algorithm = 'partial_merge',
max_threads = 32;
/* ---------- Counts for quick sanity ---------- */
SELECT 'pool_aggregates_optimized rows:', count() FROM indexer.pool_aggregates_optimized;
SELECT 'best_pools_by_qmint_agg rows:', count() FROM indexer.best_pools_by_qmint_agg;
SELECT 'best_pool_resolved_by_qmint rows:', count() FROM indexer.best_pool_resolved_by_qmint;
/* ---------- Migration bookkeeping ---------- */
INSERT INTO system_migrations (name)
VALUES ('004_good_pools_and_metrics');
-- from 005_split_per_date.sql
------------------------------
-- 1. Create the target table for MV
CREATE TABLE IF NOT EXISTS indexer.k_parsed_transactions_by_quote
(
signature String,
slot UInt64,
timestamp DateTime64(9),
method LowCardinality(String),
platform LowCardinality(String),
wallet FixedString(32),
quote_mint FixedString(32),
base_mint FixedString(32),
pool String,
base_amount Float64,
quote_amount Float64,
usd_amount Float64,
price_base Float64,
price_usd Float64,
quote_decimals UInt8,
base_decimals UInt8,
pre_quote_balance Float64,
post_quote_balance Float64,
pre_base_balance Float64,
post_base_balance Float64,
base_liquidity Float64,
quote_liquidity Float64,
base_liquidity_real Float64,
quote_liquidity_real Float64,
transaction_index UInt32,
instruction_index UInt32,
inner_instruction_index UInt32,
tx_fees Float64,
platform_fees Float64,
processor_tips Float64,
platform_type LowCardinality(String)
)
ENGINE = MergeTree
ORDER BY (quote_mint, wallet, method, slot, transaction_index, instruction_index, inner_instruction_index);
-- 2. Create MV that auto-populates into the target
CREATE MATERIALIZED VIEW IF NOT EXISTS indexer.k_mv_parsed_transactions_by_quote
TO indexer.k_parsed_transactions_by_quote
AS
SELECT
signature,
slot,
timestamp,
method,
platform,
wallet,
quote_mint,
base_mint,
pool,
base_amount,
quote_amount,
usd_amount,
price_base,
price_usd,
quote_decimals,
base_decimals,
pre_quote_balance,
post_quote_balance,
pre_base_balance,
post_base_balance,
base_liquidity,
quote_liquidity,
base_liquidity_real,
quote_liquidity_real,
transaction_index,
instruction_index,
inner_instruction_index,
tx_fees,
platform_fees,
processor_tips,
platform_type
FROM indexer.parsed_transactions_optimized;
-- 3. backfill per yyyymm
INSERT INTO indexer.k_parsed_transactions_by_quote
SELECT signature,
slot,
timestamp,
method,
platform,
wallet,
quote_mint,
base_mint,
pool,
base_amount,
quote_amount,
usd_amount,
price_base,
price_usd,
quote_decimals,
base_decimals,
pre_quote_balance,
post_quote_balance,
pre_base_balance,
post_base_balance,
base_liquidity,
quote_liquidity,
base_liquidity_real,
quote_liquidity_real,
transaction_index,
instruction_index,
inner_instruction_index,
tx_fees,
platform_fees,
processor_tips,
platform_type
FROM indexer.parsed_transactions_optimized
WHERE toYYYYMM(timestamp) = '202409'
;
INSERT INTO indexer.k_parsed_transactions_by_quote
SELECT signature,
slot,
timestamp,
method,
platform,
wallet,
quote_mint,
base_mint,
pool,
base_amount,
quote_amount,
usd_amount,
price_base,
price_usd,
quote_decimals,
base_decimals,
pre_quote_balance,
post_quote_balance,
pre_base_balance,
post_base_balance,
base_liquidity,
quote_liquidity,
base_liquidity_real,
quote_liquidity_real,
transaction_index,
instruction_index,
inner_instruction_index,
tx_fees,
platform_fees,
processor_tips,
platform_type
FROM indexer.parsed_transactions_optimized
WHERE toYYYYMM(timestamp) = '202410'
;
INSERT INTO indexer.k_parsed_transactions_by_quote
SELECT signature,
slot,
timestamp,
method,
platform,
wallet,
quote_mint,
base_mint,
pool,
base_amount,
quote_amount,
usd_amount,
price_base,
price_usd,
quote_decimals,
base_decimals,
pre_quote_balance,
post_quote_balance,
pre_base_balance,
post_base_balance,
base_liquidity,
quote_liquidity,
base_liquidity_real,
quote_liquidity_real,
transaction_index,
instruction_index,
inner_instruction_index,
tx_fees,
platform_fees,
processor_tips,
platform_type
FROM indexer.parsed_transactions_optimized
WHERE toYYYYMM(timestamp) = '202411'
;
INSERT INTO indexer.k_parsed_transactions_by_quote
SELECT signature,
slot,
timestamp,
method,
platform,
wallet,
quote_mint,
base_mint,
pool,
base_amount,
quote_amount,
usd_amount,
price_base,
price_usd,
quote_decimals,
base_decimals,
pre_quote_balance,
post_quote_balance,
pre_base_balance,
post_base_balance,
base_liquidity,
quote_liquidity,
base_liquidity_real,
quote_liquidity_real,
transaction_index,
instruction_index,
inner_instruction_index,
tx_fees,
platform_fees,
processor_tips,
platform_type
FROM indexer.parsed_transactions_optimized
WHERE toYYYYMM(timestamp) = '202412'
;
INSERT INTO indexer.k_parsed_transactions_by_quote
SELECT signature,
slot,
timestamp,
method,
platform,
wallet,
quote_mint,
base_mint,
pool,
base_amount,
quote_amount,
usd_amount,
price_base,
price_usd,
quote_decimals,
base_decimals,
pre_quote_balance,
post_quote_balance,
pre_base_balance,
post_base_balance,
base_liquidity,
quote_liquidity,
base_liquidity_real,
quote_liquidity_real,
transaction_index,
instruction_index,
inner_instruction_index,
tx_fees,
platform_fees,
processor_tips,
platform_type
FROM indexer.parsed_transactions_optimized
WHERE toYYYYMM(timestamp) = '202501'
;
INSERT INTO indexer.k_parsed_transactions_by_quote
SELECT signature,
slot,
timestamp,
method,
platform,
wallet,
quote_mint,
base_mint,
pool,
base_amount,
quote_amount,
usd_amount,
price_base,
price_usd,
quote_decimals,
base_decimals,
pre_quote_balance,
post_quote_balance,
pre_base_balance,
post_base_balance,
base_liquidity,
quote_liquidity,
base_liquidity_real,
quote_liquidity_real,
transaction_index,
instruction_index,
inner_instruction_index,
tx_fees,
platform_fees,
processor_tips,
platform_type
FROM indexer.parsed_transactions_optimized
WHERE toYYYYMM(timestamp) = '202502'
;
INSERT INTO indexer.k_parsed_transactions_by_quote
SELECT signature,
slot,
timestamp,
method,
platform,
wallet,
quote_mint,
base_mint,
pool,
base_amount,
quote_amount,
usd_amount,
price_base,
price_usd,
quote_decimals,
base_decimals,
pre_quote_balance,
post_quote_balance,
pre_base_balance,
post_base_balance,
base_liquidity,
quote_liquidity,
base_liquidity_real,
quote_liquidity_real,
transaction_index,
instruction_index,
inner_instruction_index,
tx_fees,
platform_fees,
processor_tips,
platform_type
FROM indexer.parsed_transactions_optimized
WHERE toYYYYMM(timestamp) = '202503'
;
INSERT INTO indexer.k_parsed_transactions_by_quote
SELECT signature,
slot,
timestamp,
method,
platform,
wallet,
quote_mint,
base_mint,
pool,
base_amount,
quote_amount,
usd_amount,
price_base,
price_usd,
quote_decimals,
base_decimals,
pre_quote_balance,
post_quote_balance,
pre_base_balance,
post_base_balance,
base_liquidity,
quote_liquidity,
base_liquidity_real,
quote_liquidity_real,
transaction_index,
instruction_index,
inner_instruction_index,
tx_fees,
platform_fees,
processor_tips,
platform_type
FROM indexer.parsed_transactions_optimized
WHERE toYYYYMM(timestamp) = '202504'
;
INSERT INTO indexer.k_parsed_transactions_by_quote
SELECT signature,
slot,
timestamp,
method,
platform,
wallet,
quote_mint,
base_mint,
pool,
base_amount,
quote_amount,
usd_amount,
price_base,
price_usd,
quote_decimals,
base_decimals,
pre_quote_balance,
post_quote_balance,
pre_base_balance,
post_base_balance,
base_liquidity,
quote_liquidity,
base_liquidity_real,
quote_liquidity_real,
transaction_index,
instruction_index,
inner_instruction_index,
tx_fees,
platform_fees,
processor_tips,
platform_type
FROM indexer.parsed_transactions_optimized
WHERE toYYYYMM(timestamp) = '202505'
;
INSERT INTO indexer.k_parsed_transactions_by_quote
SELECT signature,
slot,
timestamp,
method,
platform,
wallet,
quote_mint,
base_mint,
pool,
base_amount,
quote_amount,
usd_amount,
price_base,
price_usd,
quote_decimals,
base_decimals,
pre_quote_balance,
post_quote_balance,
pre_base_balance,
post_base_balance,
base_liquidity,
quote_liquidity,
base_liquidity_real,
quote_liquidity_real,
transaction_index,
instruction_index,
inner_instruction_index,
tx_fees,
platform_fees,
processor_tips,
platform_type
FROM indexer.parsed_transactions_optimized
WHERE toYYYYMM(timestamp) = '202506'
;
INSERT INTO indexer.k_parsed_transactions_by_quote
SELECT signature,
slot,
timestamp,
method,
platform,
wallet,
quote_mint,
base_mint,
pool,
base_amount,
quote_amount,
usd_amount,
price_base,
price_usd,
quote_decimals,
base_decimals,
pre_quote_balance,
post_quote_balance,
pre_base_balance,
post_base_balance,
base_liquidity,
quote_liquidity,
base_liquidity_real,
quote_liquidity_real,
transaction_index,
instruction_index,
inner_instruction_index,
tx_fees,
platform_fees,
processor_tips,
platform_type
FROM indexer.parsed_transactions_optimized
WHERE toYYYYMM(timestamp) = '202507'
;
INSERT INTO indexer.k_parsed_transactions_by_quote
SELECT signature,
slot,
timestamp,
method,
platform,
wallet,
quote_mint,
base_mint,
pool,
base_amount,
quote_amount,
usd_amount,
price_base,
price_usd,
quote_decimals,
base_decimals,
pre_quote_balance,
post_quote_balance,
pre_base_balance,
post_base_balance,
base_liquidity,
quote_liquidity,
base_liquidity_real,
quote_liquidity_real,
transaction_index,
instruction_index,
inner_instruction_index,
tx_fees,
platform_fees,
processor_tips,
platform_type
FROM indexer.parsed_transactions_optimized
WHERE toYYYYMM(timestamp) = '202508'
;
INSERT INTO indexer.k_parsed_transactions_by_quote
SELECT signature,
slot,
timestamp,
method,
platform,
wallet,
quote_mint,
base_mint,
pool,
base_amount,
quote_amount,
usd_amount,
price_base,
price_usd,
quote_decimals,
base_decimals,
pre_quote_balance,
post_quote_balance,
pre_base_balance,
post_base_balance,
base_liquidity,
quote_liquidity,
base_liquidity_real,
quote_liquidity_real,
transaction_index,
instruction_index,
inner_instruction_index,
tx_fees,
platform_fees,
processor_tips,
platform_type
FROM indexer.parsed_transactions_optimized
WHERE toYYYYMM(timestamp) = '202509'
;
INSERT INTO indexer.k_parsed_transactions_by_quote
SELECT signature,
slot,
timestamp,
method,
platform,
wallet,
quote_mint,
base_mint,
pool,
base_amount,
quote_amount,
usd_amount,
price_base,
price_usd,
quote_decimals,
base_decimals,
pre_quote_balance,
post_quote_balance,
pre_base_balance,
post_base_balance,
base_liquidity,
quote_liquidity,
base_liquidity_real,
quote_liquidity_real,
transaction_index,
instruction_index,
inner_instruction_index,
tx_fees,
platform_fees,
processor_tips,
platform_type
FROM indexer.parsed_transactions_optimized
WHERE toYYYYMM(timestamp) = '202509'
;
-- 4. check whether it's getting faster or not
EXPLAIN indexes = 1
SELECT
signature,
slot,
timestamp,
method,
platform,
wallet,
quote_mint,
base_mint,
pool,
base_amount,
quote_amount,
usd_amount,
price_base,
price_usd,
quote_decimals,
base_decimals,
pre_quote_balance,
post_quote_balance,
pre_base_balance,
post_base_balance,
base_liquidity,
quote_liquidity,
base_liquidity_real,
quote_liquidity_real,
transaction_index,
instruction_index,
inner_instruction_index,
tx_fees,
platform_fees,
processor_tips,
platform_type
FROM indexer.k_parsed_transactions_by_quote
WHERE wallet IN (
SELECT toFixedString(base58Decode(w), 32)
FROM (SELECT arrayJoin([
'4GQeEya6ZTwvXre4Br6ZfDyfe2WQMkcDz2QbkJZazVqS',
'cook3vQojECt9LgMUse6Y3PLDzzUmdVGt1Uz8sohGai',
'86ca1fHVACfW57vQaGwvx2841WnpphuT5YF3WoHxuouo',
'H3jfDkDCeBgzM5wm7WCwtjdMC48poTAKXbEeWdvKLnVL',
'3tc4BVAdzjr1JpeZu6NAjLHyp4kK3iic7TexMBYGJ4Xk',
'RFSqPtn1JfavGiUD4HJsZyYXvZsycxf31hnYfbyG6iB',
'8deJ9xeUvXSJwicYptA9mHsU2rN2pDx37KWzkDkEXhU6',
'DfMxre4cKmvogbLrPigxmibVTTQDuzjdXojWzjCXXhzj',
/* ⚠️ The next two end with "pump" and are not valid Base58 pubkeys.
Remove or replace them, or base58Decode will throw. */
-- '34qXg7fx9AJDmBA1fRs8xMsfNvWQbabkdkEWfGSRpump',
'8zFZHuSRuDpuAR7J6FzwyF3vKNx4CVW3DFHJerQhc7Zd',
'FpR5RsXauqdsGhgdW9C6zmBHMkcT9HJvXJQDhSUa4kYe',
'3kebnKw7cPdSkLRfiMEALyZJGZ4wdiSRvmoN4rD1yPzV',
'4Be9CvxqHW6BYiRAxW9Q3xu1ycTMWaL5z8NX4HR3ha7t',
'9jyqFiLnruggwNn4EQwBNFXwpbLM9hrA4hV59ytyAVVz',
'3rSZJHysEk2ueFVovRLtZ8LGnQBMZGg96H2Q4jErspAF',
'CRVidEDtEUTYZisCxBZkpELzhQc9eauMLR3FWg74tReL',
'73LnJ7G9ffBDjEBGgJDdgvLUhD5APLonKrNiHsKDCw5B',
'91HANKimRsWTXMkfcDipLaDVmeTkj6m2eNSVobpkdUNz',
'GQWLRHtR18vy8myoHkgc9SMcSzwUdBjJ816vehSBwcis',
'62FZUSWPMX9pofoV1uWHMdzFJRjwMa1LHgh2zhdEB7Zj',
'AVAZvHLR2PcWpDf8BXY4rVxNHYRBytycHkcB5z5QNXYm',
'73KcgcTVyLYhnVX3PJbit8e4mC7jU8QotNGBFGix14jn',
'BDC19MFbzDpoF37QCxe4JR6i7UmXNMSiEqrxuQgJoWte',
'4vgDxDJubQxmHsaNxgsTESer9Qgk1bGKuE1dN4fXzsoj',
'26kZ9rg8Y5pd4j1tdT4cbT8BQRu5uDbXkaVs3L5QasHy',
'5TMR5GFPTzkNhprnty2ocXEFLoLur1uchg4NkEr5WEf4',
'G2VzymsKt3zNAn4CKBndYcS67w6Kny5sDEp7Y2W1aTf6',
'GfXQesPe3Zuwg8JhAt6Cg8euJDTVx751enp9EQQmhzPH',
'dVs7zZksjFuq73xbtUC62brFXYYuxCuPSG4wZeGiHck',
'DGPYpCdiVg2shab2TnNiZ2RnsjBQSmhgN71hJyWC5cYn',
'GTvBQnRvAPweU2qmYg8MDLND2PAAyYFKe35aKQGMRDaL',
'EcJWNtETrzdbj8s2dXpaE4Tu4r7fxALD6TNw9H8S6ksz',
'7nt58K2KF8HwBiCcRprCYP7civVifXP59fE4x229gAuv',
'2QwCvtKyVew25mvDDJDwgopMhwRmNQqqGDsjCBVACo8A',
'4aDdi3EiDPMbeZ3e5BvbFMt4vfJaoahaHxZuwKQRtFc1',
'Hg5SEgwdHw8FUoKMPDZbREii3qEXwn6EvvsUdBPM2rmi',
'7Dt5oUpxHWuKH8bCTXDLz2j3JyxA7jEmtzqCG6pnh96X',
'AbcX4XBm7DJ3i9p29i6sU8WLmiW4FWY5tiwB9D6UBbcE',
'HrTZPWV4ZPebBiwyzoTBajCD49kQqVwf4dwsLuYG8CXX',
'4hSXPtxZgXFpo6Vxq9yqxNjcBoqWN3VoaPJWonUtupzD',
'BrNoqdHUCcv9yTncnZeSjSov8kqhpmzv1nAiPbq1M95H',
'28ipXVfkdmu1PDowCHbcSfzkpH9edZmSiVoDhY5xGVfR',
'HsfSzb7BBv4oKNFamgoyBEQ4v6BHUhEREEmBnWzZJXLj',
'ECCKBDWX3MkEcf3bULbLBb9FvrEQLsmPMFTKFpvjzqgP',
'EDmbDc7sY87dKszqyZ3rHczWbKcCyUvJQSJpE3Cg4RcZ',
'DSVc1Rd69sLcXBoZAsvkzK4jGFyJKP77K3J2CaFVNAFP',
'HEibUTHW7JHt7VgKYtWpQCSX7a6YHBmRQrixZs9i4wGY',
'CAUbSmiNuj16phNiskMdwWZEAUXCfXaUSamDFyf7pAa6',
'6RoLbZJWJHpTk4sdPsWzocEHiRtzPS36WcBjnMXuQrfU',
'JDd3hy3gQn2V982mi1zqhNqUw1GfV2UL6g76STojCJPN',
'9UWZFoiCHeYRLmzmDJhdMrP7wgrTw7DMSpPiT2eHgJHe',
'EXEeLF1YiZxC1tFneoQPv6rcdq5VMUbKNGy3DLPEo2oH',
'D3Z7weHeLGWA7eg1qwVB66NCs8YLxiDTBVE6Eb1tTmwg',
'HVajxfNTWqLGxsfJA9DFThnvddveKfJLK8re1kNpeCVv',
'3PgV4tvihn16J6uaQsKeekChCAH7MKyxGQYb7eaRJbMu',
'AT6FsbUy1jNR2AHE35WCVh3nSdDtYADtdhA2KitQXmL5',
'HdxkiXqeN6qpK2YbG51W23QSWj3Yygc1eEk2zwmKJExp',
'Fxj1j5ohVoRos1yQnvx6bsvYCt493VGpyLbMiq4eLff3',
'7VBTpiiEjkwRbRGHJFUz6o5fWuhPFtAmy8JGhNqwHNnn',
'6LChaYRYtEYjLEHhzo4HdEmgNwu2aia8CM8VhR9wn6n7',
'DNfuF1L62WWyW3pNakVkyGGFzVVhj4Yr52jSmdTyeBHm',
'636N7frU8bUwYfyUAtvMQQsXhTFRuSWjxnEZihr5axGV',
'DpNVrtA3ERfKzX4F8Pi2CVykdJJjoNxyY5QgoytAwD26',
'f1BuDDhr9myYFm8X49sQ5RharwsoNPvXDtB18CMA1fu',
'HABhDh9zrzf8mA4SBo1yro8M6AirH2hZdLNPpuvMH6iA',
'9ecRdqNCBxwReiY2pxtnLeayeBzE1CrSSmKKJJGdmsaJ',
'F72vY99ihQsYwqEDCfz7igKXA5me6vN2zqVsVUTpw6qL',
'FAwHi11KyJVh2pR2b2vNFEbbunTVkMFpdNBuD2dh9NRf',
'HmBmSYwYEgEZuBUYuDs9xofyqBAkw4ywugB1d7R7sTGh',
'7iabBMwmSvS4CFPcjW2XYZY53bUCHzXjCFEFhxeYP4CY',
'AJ6MGExeK7FXmeKkKPmALjcdXVStXYokYNv9uVfDRtvo',
'CyaE1VxvBrahnPWkqm5VsdCvyS2QmNht2UFrKJHga54o',
'BCnqsPEtA1TkgednYEebRpkmwFRJDCjMQcKZMMtEdArc',
'7Ny3AmDQ1cnQ29ywXdQheaoeTPvo6NGkDDtGa5bK9wRH',
/* ⚠️ Same here — ends with "pump". Remove or replace. */
-- '36Qg5TussomUkM7g9FUR8KNcUNwDPbL8jdurKvw3pump',
'suqh5sHtr8HyJ7q8scBimULPkPpA557prMG47xCHQfK',
'4zq1iLpmepj2Rj7W6A3XQMRQA1HyjYqVpZiBzM6aPyH7',
'4gbKBZQ8aaXfmuwq2bbN6cFJEhLm1uxHv2ravrkYTgBU',
'o5e4MABwiMuhnwXWUPgwnhvGPK91i2ytnWZvAz2begi',
'7SDs3PjT2mswKQ7Zo4FTucn9gJdtuW4jaacPA65BseHS',
'77kq9xNZg4Vb94UJMpeHsx7pKXUBU3Wpqsm5UXhjA9dd',
'831qmkeGhfL8YpcXuhrug6nHj1YdK3aXMDQUCo85Auh1'
]) AS w)
)
AND quote_mint = toFixedString(base58Decode('MEFNBXixkEbait3xn9bkm8WsJzXtVsaJEn4c8Sam21u'), 32)
AND method IN ('buy', 'sell')
ORDER BY (slot, transaction_index, instruction_index, inner_instruction_index) DESC
LIMIT 1000 OFFSET 0
SETTINGS optimize_read_in_order=0, enable_filesystem_cache=0;
-- to undo
/*
DROP VIEW indexer.k_mv_parsed_transactions_by_quote;
DROP TABLE indexer.k_parsed_transactions_by_quote
SETTINGS max_table_size_to_drop = 0;
*/
INSERT INTO system_migrations (name)
VALUES ('005_wallet_quote_mint');
-- from 006_various_changes.sql
-------------------------------
/* ===============================
MIGRATION 006 — Various changes
=============================== */
USE indexer;
SET max_memory_usage = 480000000000,
max_execution_time = 0,
send_timeout = 0,
receive_timeout = 0,
optimize_on_insert = 0,
max_rows_to_read = 0,
max_bytes_to_read = 0,
max_threads = 56,
max_bytes_before_external_group_by = '100G',
group_by_two_level_threshold = 500000,
group_by_two_level_threshold_bytes = '8G',
max_bytes_before_external_sort = '100G',
allow_nondeterministic_mutations = 1,
max_table_size_to_drop = 0;
DROP TABLE IF EXISTS indexer.mv_trade_volume_hourly_by_mint_mv;
DROP TABLE IF EXISTS indexer.mv_trade_volume_hourly_by_mint;
ALTER TABLE indexer.parsed_new_token_transactions_optimized
DROP PROJECTION IF EXISTS uri_search_projection;
/* ---------- Trading Volume Hourly Materialized View ---------- */
CREATE TABLE indexer.mv_trade_volume_hourly_by_mint
(
`mint` FixedString(32),
`hour` DateTime('UTC'),
`total_volume` AggregateFunction(sum, Float64),
`buy_volume` AggregateFunction(sum, Float64),
`sell_volume` AggregateFunction(sum, Float64),
`total_txns` AggregateFunction(count),
`buy_txns` AggregateFunction(count),
`sell_txns` AggregateFunction(count),
PROJECTION hour_projection
(
SELECT *
ORDER BY hour
),
PROJECTION mint_hour_projection
(
SELECT *
ORDER BY mint, hour
)
)
ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMM(hour)
ORDER BY (mint, hour)
SETTINGS index_granularity = 8192,
deduplicate_merge_projection_mode = 'rebuild';
CREATE MATERIALIZED VIEW indexer.mv_trade_volume_hourly_by_mint_mv
TO indexer.mv_trade_volume_hourly_by_mint
AS SELECT
quote_mint AS mint,
toStartOfHour(timestamp) AS hour,
sumState(usd_amount) AS total_volume,
sumStateIf(usd_amount, method = 'buy') AS buy_volume,
sumStateIf(usd_amount, method = 'sell') AS sell_volume,
countState() AS total_txns,
countStateIf(method = 'buy') AS buy_txns,
countStateIf(method = 'sell') AS sell_txns
FROM indexer.parsed_transactions_optimized
WHERE is_swap = 1
GROUP BY mint, hour;
/* ---------- URI Search Projection ---------- */
ALTER TABLE indexer.parsed_new_token_transactions_optimized
ADD PROJECTION uri_search_projection
(
SELECT
*
ORDER BY uri, timestamp
);
ALTER TABLE indexer.parsed_new_token_transactions_optimized
MATERIALIZE PROJECTION uri_search_projection;
INSERT INTO indexer.mv_trade_volume_hourly_by_mint
SELECT
quote_mint AS mint,
toStartOfHour(timestamp) AS hour,
sumState(usd_amount) AS total_volume,
sumStateIf(usd_amount, method = 'buy') AS buy_volume,
sumStateIf(usd_amount, method = 'sell') AS sell_volume,
countState() AS total_txns,
countStateIf(method = 'buy') AS buy_txns,
countStateIf(method = 'sell') AS sell_txns
FROM indexer.parsed_transactions_optimized
WHERE is_swap = 1
GROUP BY mint, hour;
/* ---------- Migration bookkeeping ---------- */
INSERT INTO system_migrations (name)
VALUES ('006_various_changes');
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment