Last active
October 4, 2025 15:17
-
-
Save kokizzu/50f564c23894a9c8abded6c09b3ae1eb to your computer and use it in GitHub Desktop.
Mega migration
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- from 003_candles_new_migration.sql | |
------------------------------------- | |
USE indexer; | |
SET max_memory_usage = 0, | |
max_execution_time = 0, | |
send_timeout = 0, | |
receive_timeout = 0, | |
optimize_on_insert = 0, | |
max_rows_to_read = 0, | |
max_bytes_to_read = 0, | |
max_threads = 8, | |
max_bytes_before_external_group_by = '3G', | |
group_by_two_level_threshold = 100000, | |
group_by_two_level_threshold_bytes = '128M', | |
max_bytes_before_external_sort = '2G', | |
allow_nondeterministic_mutations = 1, | |
max_table_size_to_drop = 0; | |
DROP TABLE IF EXISTS indexer.mv_ohlcv_agg_slot; | |
DROP TABLE IF EXISTS indexer.ohlcv_agg_slot; | |
DROP TABLE IF EXISTS indexer.mv_mev_sandwich_slots; | |
DROP TABLE IF EXISTS indexer.mev_sandwich_slots; | |
CREATE TABLE indexer.ohlcv_agg_slot | |
( | |
quote_mint FixedString(32), | |
pool FixedString(32), | |
is_launchpad UInt8, | |
bucket UInt64, | |
bucket_ts DateTime, | |
open_state_usd AggregateFunction(argMin, Float64, Tuple( | |
UInt32, | |
Int8, | |
Int8)), | |
high_state_usd AggregateFunction(max, Float64), | |
low_state_usd AggregateFunction(min, Float64), | |
close_state_usd AggregateFunction(argMax, Float64, Tuple( | |
UInt32, | |
Int8, | |
Int8)), | |
vol_state_usd AggregateFunction(sumIf, Float64, UInt8), | |
open_state_base AggregateFunction(argMin, Float64, Tuple( | |
UInt32, | |
Int8, | |
Int8)), | |
high_state_base AggregateFunction(max, Float64), | |
low_state_base AggregateFunction(min, Float64), | |
close_state_base AggregateFunction(argMax, Float64, Tuple( | |
UInt32, | |
Int8, | |
Int8)), | |
vol_state_base AggregateFunction(sumIf, UInt64, UInt8) | |
) | |
ENGINE = AggregatingMergeTree | |
ORDER BY (quote_mint, bucket, pool, is_launchpad); | |
CREATE MATERIALIZED VIEW indexer.mv_ohlcv_agg_slot TO indexer.ohlcv_agg_slot | |
( | |
quote_mint FixedString(32), | |
pool FixedString(32), | |
is_launchpad UInt8, | |
bucket UInt64, | |
bucket_ts DateTime('UTC'), | |
open_state_usd AggregateFunction(argMin, Float64, Tuple( | |
UInt32, | |
Int8, | |
Int8)), | |
high_state_usd AggregateFunction(max, Float64), | |
low_state_usd AggregateFunction(min, Float64), | |
close_state_usd AggregateFunction(argMax, Float64, Tuple( | |
UInt32, | |
Int8, | |
Int8)), | |
vol_state_usd AggregateFunction(sumIf, Float64, UInt8), | |
open_state_base AggregateFunction(argMin, Float64, Tuple( | |
UInt32, | |
Int8, | |
Int8)), | |
high_state_base AggregateFunction(max, Float64), | |
low_state_base AggregateFunction(min, Float64), | |
close_state_base AggregateFunction(argMax, Float64, Tuple( | |
UInt32, | |
Int8, | |
Int8)), | |
vol_state_base AggregateFunction(sumIf, UInt64, UInt8) | |
) | |
AS SELECT | |
quote_mint, | |
pool, | |
toUInt8(is_launchpad) AS is_launchpad, | |
slot AS bucket, | |
timestamp AS bucket_ts, | |
argMinState(price_usd, (transaction_index, instruction_index, inner_instruction_index)) AS open_state_usd, | |
maxState(price_usd) AS high_state_usd, | |
minState(price_usd) AS low_state_usd, | |
argMaxState(price_usd, (transaction_index, instruction_index, inner_instruction_index)) AS close_state_usd, | |
sumIfState(usd_amount, is_swap = 1) AS vol_state_usd, | |
argMinState(price_base, (transaction_index, instruction_index, inner_instruction_index)) AS open_state_base, | |
maxState(price_base) AS high_state_base, | |
minState(price_base) AS low_state_base, | |
argMaxState(price_base, (transaction_index, instruction_index, inner_instruction_index)) AS close_state_base, | |
sumIfState(base_amount, is_swap = 1) AS vol_state_base | |
FROM indexer.parsed_transactions_optimized | |
WHERE ((is_swap = 1) AND (base_amount >= 100) AND (quote_amount >= 100) AND (price_usd > 0) AND (price_base > 0)) OR ((is_launchpad = 1) AND (is_swap = 0)) | |
GROUP BY | |
quote_mint, | |
pool, | |
is_launchpad, | |
bucket, | |
bucket_ts; | |
CREATE TABLE indexer.mev_sandwich_slots | |
( | |
quote_mint FixedString(32), | |
slot UInt64 | |
) | |
ENGINE = MergeTree | |
ORDER BY (quote_mint, slot); | |
CREATE MATERIALIZED VIEW indexer.mv_mev_sandwich_slots TO indexer.mev_sandwich_slots | |
( | |
quote_mint FixedString(32), | |
slot UInt64 | |
) | |
AS SELECT | |
quote_mint, | |
slot | |
FROM | |
( | |
SELECT | |
quote_mint, | |
slot, | |
countIf((method = 'buy') AND (base_amount >= 100) AND (quote_amount >= 100) AND (price_usd != 0) AND (price_base != 0)) AS buys, | |
countIf((method = 'sell') AND (base_amount >= 100) AND (quote_amount >= 100) AND (price_usd != 0) AND (price_base != 0)) AS sells, | |
minIf(price_base, method = 'buy') AS min_buy, | |
maxIf(price_base, method = 'sell') AS max_sell | |
FROM indexer.parsed_transactions_optimized | |
GROUP BY | |
quote_mint, | |
slot, | |
wallet | |
HAVING (buys > 0) AND (sells > 0) AND (min_buy < max_sell) | |
) | |
GROUP BY | |
quote_mint, | |
slot; | |
INSERT INTO indexer.ohlcv_agg_slot | |
SELECT | |
quote_mint, | |
pool, | |
toUInt8(is_launchpad) AS is_launchpad, | |
slot AS bucket, | |
timestamp AS bucket_ts, | |
argMinState(price_usd, (transaction_index, instruction_index, inner_instruction_index)) AS open_state_usd, | |
maxState(price_usd) AS high_state_usd, | |
minState(price_usd) AS low_state_usd, | |
argMaxState(price_usd, (transaction_index, instruction_index, inner_instruction_index)) AS close_state_usd, | |
sumIfState(usd_amount, is_swap = 1) AS vol_state_usd, | |
argMinState(price_base, (transaction_index, instruction_index, inner_instruction_index)) AS open_state_base, | |
maxState(price_base) AS high_state_base, | |
minState(price_base) AS low_state_base, | |
argMaxState(price_base, (transaction_index, instruction_index, inner_instruction_index)) AS close_state_base, | |
sumIfState(base_amount, is_swap = 1) AS vol_state_base | |
FROM indexer.parsed_transactions_optimized | |
WHERE ((is_swap = 1) AND (base_amount >= 100) AND (quote_amount >= 100) AND (price_usd > 0) AND (price_base > 0)) OR ((is_launchpad = 1) AND (is_swap = 0)) | |
GROUP BY | |
quote_mint, | |
pool, | |
is_launchpad, | |
bucket, | |
bucket_ts | |
SETTINGS | |
max_memory_usage = '120G', | |
max_threads = '60', | |
max_bytes_before_external_group_by = '16G', | |
group_by_two_level_threshold = 500000, | |
group_by_two_level_threshold_bytes = '640M', | |
max_bytes_before_external_sort = '16G'; | |
-- Break down the complex query into steps to reduce memory usage | |
CREATE TEMPORARY TABLE temp_wallet_sandwich_candidates AS | |
SELECT | |
quote_mint, | |
slot, | |
wallet, | |
countIf((method = 'buy') AND (base_amount >= 100) AND (quote_amount >= 100) AND (price_usd != 0) AND (price_base != 0)) AS buys, | |
countIf((method = 'sell') AND (base_amount >= 100) AND (quote_amount >= 100) AND (price_usd != 0) AND (price_base != 0)) AS sells, | |
minIf(price_base, method = 'buy') AS min_buy, | |
maxIf(price_base, method = 'sell') AS max_sell | |
FROM indexer.parsed_transactions_optimized | |
GROUP BY | |
quote_mint, | |
slot, | |
wallet | |
HAVING (buys > 0) AND (sells > 0) AND (min_buy < max_sell) | |
SETTINGS | |
max_memory_usage = 0, | |
max_threads = 6, | |
max_bytes_before_external_group_by = '16G', | |
group_by_two_level_threshold = 50000, | |
group_by_two_level_threshold_bytes = '64M', | |
max_bytes_before_external_sort = '16G', | |
join_algorithm = 'hash', | |
max_rows_in_join = 10000000; | |
INSERT INTO indexer.mev_sandwich_slots | |
SELECT | |
quote_mint, | |
slot | |
FROM temp_wallet_sandwich_candidates | |
GROUP BY | |
quote_mint, | |
slot | |
SETTINGS | |
max_memory_usage = 0, | |
max_threads = 6, | |
max_bytes_before_external_group_by = '16G', | |
group_by_two_level_threshold = 250000, | |
group_by_two_level_threshold_bytes = '320M', | |
max_bytes_before_external_sort = '16G'; | |
DROP TABLE temp_wallet_sandwich_candidates; | |
SELECT 'OHLCV rows inserted:', count() FROM indexer.ohlcv_agg_slot; | |
SELECT 'MEV rows inserted:', count() FROM indexer.mev_sandwich_slots; | |
DROP TABLE IF EXISTS indexer.mv_trade_agg_qm_wallet; | |
ALTER TABLE indexer.trade_agg_qm_wallet | |
ADD COLUMN buy_usd_amount AggregateFunction(sum, Float64), | |
ADD COLUMN sell_usd_amount AggregateFunction(sum, Float64); | |
CREATE MATERIALIZED VIEW indexer.mv_trade_agg_qm_wallet TO indexer.trade_agg_qm_wallet | |
( | |
quote_mint FixedString(32), | |
wallet FixedString(32), | |
buy_count AggregateFunction(count), | |
sell_count AggregateFunction(count), | |
buy_base_amount AggregateFunction(sum, UInt64), | |
sell_base_amount AggregateFunction(sum, UInt64), | |
buy_quote_amount AggregateFunction(sum, UInt64), | |
sell_quote_amount AggregateFunction(sum, UInt64), | |
launchpad AggregateFunction(any, String), | |
buy_usd_amount AggregateFunction(sum, Float64), | |
sell_usd_amount AggregateFunction(sum, Float64) | |
) | |
AS SELECT | |
quote_mint, | |
wallet, | |
countMergeState(buy_count) AS buy_count, | |
countMergeState(sell_count) AS sell_count, | |
sumMergeState(buy_base_amount) AS buy_base_amount, | |
sumMergeState(sell_base_amount) AS sell_base_amount, | |
sumMergeState(buy_quote_amount) AS buy_quote_amount, | |
sumMergeState(sell_quote_amount) AS sell_quote_amount, | |
anyMergeState(launchpad) AS launchpad, | |
sumMergeState(buy_usd_amount) AS buy_usd_amount, | |
sumMergeState(sell_usd_amount) AS sell_usd_amount | |
FROM indexer.trade_aggregates_optimized | |
GROUP BY | |
quote_mint, | |
wallet; | |
TRUNCATE TABLE indexer.trade_agg_qm_wallet SETTINGS max_table_size_to_drop = 0 ; | |
INSERT INTO indexer.trade_agg_qm_wallet | |
SELECT | |
quote_mint, | |
wallet, | |
countMergeState(buy_count) AS buy_count, | |
countMergeState(sell_count) AS sell_count, | |
sumMergeState(buy_base_amount) AS buy_base_amount, | |
sumMergeState(sell_base_amount) AS sell_base_amount, | |
sumMergeState(buy_quote_amount) AS buy_quote_amount, | |
sumMergeState(sell_quote_amount) AS sell_quote_amount, | |
anyMergeState(launchpad) AS launchpad, | |
sumMergeState(buy_usd_amount) AS buy_usd_amount, | |
sumMergeState(sell_usd_amount) AS sell_usd_amount | |
FROM indexer.trade_aggregates_optimized | |
GROUP BY | |
quote_mint, | |
wallet | |
SETTINGS | |
max_memory_usage = '120G', | |
max_threads = 60, | |
max_bytes_before_external_group_by = '16G', | |
group_by_two_level_threshold = 50000, | |
group_by_two_level_threshold_bytes = '64M', | |
max_bytes_before_external_sort = '16G'; | |
SELECT 'Trade aggregation rows updated:', count() FROM indexer.trade_agg_qm_wallet; | |
CREATE TABLE indexer.ohlcv_finalized | |
( | |
quote_mint FixedString(32), | |
pool FixedString(32), | |
is_launchpad UInt8, | |
bucket UInt64, | |
bucket_ts DateTime, | |
open_usd Float64, | |
high_usd Float64, | |
low_usd Float64, | |
close_usd Float64, | |
vol_usd Float64, | |
open_base Float64, | |
high_base Float64, | |
low_base Float64, | |
close_base Float64, | |
vol_base UInt64 | |
) | |
ENGINE = MergeTree | |
ORDER BY (quote_mint, bucket, pool, is_launchpad); | |
CREATE MATERIALIZED VIEW indexer.ohlcv_mv TO indexer.ohlcv_finalized AS | |
SELECT | |
quote_mint, | |
pool, | |
is_launchpad, | |
bucket, | |
bucket_ts, | |
finalizeAggregation(open_state_usd) AS open_usd, | |
finalizeAggregation(high_state_usd) AS high_usd, | |
finalizeAggregation(low_state_usd) AS low_usd, | |
finalizeAggregation(close_state_usd) AS close_usd, | |
finalizeAggregation(vol_state_usd) AS vol_usd, | |
finalizeAggregation(open_state_base) AS open_base, | |
finalizeAggregation(high_state_base) AS high_base, | |
finalizeAggregation(low_state_base) AS low_base, | |
finalizeAggregation(close_state_base) AS close_base, | |
finalizeAggregation(vol_state_base) AS vol_base | |
FROM indexer.ohlcv_agg_slot; | |
INSERT INTO indexer.ohlcv_finalized | |
SELECT | |
quote_mint, | |
pool, | |
is_launchpad, | |
bucket, | |
bucket_ts, | |
finalizeAggregation(open_state_usd) AS open_usd, | |
finalizeAggregation(high_state_usd) AS high_usd, | |
finalizeAggregation(low_state_usd) AS low_usd, | |
finalizeAggregation(close_state_usd) AS close_usd, | |
finalizeAggregation(vol_state_usd) AS vol_usd, | |
finalizeAggregation(open_state_base) AS open_base, | |
finalizeAggregation(high_state_base) AS high_base, | |
finalizeAggregation(low_state_base) AS low_base, | |
finalizeAggregation(close_state_base) AS close_base, | |
finalizeAggregation(vol_state_base) AS vol_base | |
FROM indexer.ohlcv_agg_slot; | |
INSERT INTO system_migrations (name) | |
VALUES ('003_candles_new_migration'); | |
-- from 004_pools_and_metrics__pool_aggregates_optimized2.sql | |
------------------------------------------------------------- | |
/* =============================== | |
MIGRATION 004 — Good Pools & Metrics (metrics embedded in best_pool_resolved_by_qmint) | |
=============================== */ | |
USE indexer; | |
SET max_memory_usage = '150G', | |
max_execution_time = 0, | |
send_timeout = 0, | |
receive_timeout = 0, | |
optimize_on_insert = 0, | |
max_rows_to_read = 0, | |
max_bytes_to_read = 0, | |
max_threads = 8, | |
max_bytes_before_external_group_by = '3G', | |
group_by_two_level_threshold = 100000, | |
group_by_two_level_threshold_bytes = '128M', | |
max_bytes_before_external_sort = '2G', | |
allow_nondeterministic_mutations = 1, | |
max_table_size_to_drop = 0; | |
/* ---------- DROP old objects in dependency order (if exist) ---------- */ | |
DROP VIEW IF EXISTS indexer.mv_best_pool_metrics_by_qmint; | |
DROP TABLE IF EXISTS indexer.best_pool_metrics_by_qmint; | |
DROP VIEW IF EXISTS indexer.mv_best_pool_resolved_by_qmint; | |
DROP TABLE IF EXISTS indexer.best_pool_resolved_by_qmint; | |
DROP VIEW IF EXISTS indexer.mv_best_from_snapshot; | |
DROP TABLE IF EXISTS indexer.best_pools_by_qmint_agg; | |
DROP VIEW IF EXISTS indexer.mv_pool_aggregates_optimized; | |
DROP TABLE IF EXISTS indexer.pool_aggregates_optimized; | |
DROP VIEW IF EXISTS indexer.mv_pool_aggregates_optimized2; | |
DROP TABLE IF EXISTS indexer.pool_aggregates_optimized2; | |
/* ============================================== | |
CHANGE #1 — Pool aggregates (optimized) | |
============================================== */ | |
CREATE TABLE indexer.pool_aggregates_optimized | |
( | |
/* functionally dependent on pool */ | |
`base_mint` FixedString(32), | |
`quote_mint` FixedString(32), | |
`pool` FixedString(32), | |
/* labels */ | |
`platform` LowCardinality(String), | |
`launchpad` LowCardinality(String), | |
/* ever-true flags (logical OR over history) */ | |
`is_migration_state` AggregateFunction(max, Bool), | |
/* counters & sums */ | |
`buy_count` AggregateFunction(countIf, Bool), | |
`sell_count` AggregateFunction(countIf, Bool), | |
`buy_base_amount` AggregateFunction(sum, UInt64), | |
`sell_base_amount` AggregateFunction(sum, UInt64), | |
`buy_usd_amount` AggregateFunction(sum, Float64), | |
`sell_usd_amount` AggregateFunction(sum, Float64), | |
/* recency-weighted by position tuple */ | |
`price_base` AggregateFunction(argMax, Float64, Tuple(UInt64, UInt32, Int8, Int8)), | |
`price_usd` AggregateFunction(argMax, Float64, Tuple(UInt64, UInt32, Int8, Int8)), | |
`base_liquidity` AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)), | |
`quote_liquidity` AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)), | |
`base_liquidity_real` AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)), | |
`quote_liquidity_real` AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)), | |
/* earliest position */ | |
`first_transaction_time` AggregateFunction(min, DateTime('UTC')), | |
/* order-only projection */ | |
PROJECTION quote_mint_projection | |
( | |
SELECT * | |
ORDER BY quote_mint, pool | |
) | |
) | |
ENGINE = ReplicatedAggregatingMergeTree( | |
'/clickhouse/tables/{shard}/indexer/pool_aggregates_optimized_new', '{replica}' | |
) | |
ORDER BY pool | |
SETTINGS index_granularity = 2048, | |
deduplicate_merge_projection_mode = 'drop'; | |
CREATE MATERIALIZED VIEW indexer.mv_pool_aggregates_optimized | |
TO indexer.pool_aggregates_optimized | |
AS | |
SELECT | |
/* states / labels via any() because we GROUP BY pool */ | |
any(base_mint) AS base_mint, | |
any(quote_mint) AS quote_mint, | |
pool, | |
any(platform) AS platform, | |
any(launchpad) AS launchpad, | |
/* ever-true flags */ | |
maxState(is_migration) AS is_migration_state, | |
/* measures */ | |
countIfState(method = 'buy') AS buy_count, | |
countIfState(method = 'sell') AS sell_count, | |
sumIfState(base_amount, method = 'buy') AS buy_base_amount, | |
sumIfState(base_amount, method = 'sell') AS sell_base_amount, | |
sumIfState(usd_amount, method = 'buy') AS buy_usd_amount, | |
sumIfState(usd_amount, method = 'sell') AS sell_usd_amount, | |
/* recency by position tuple (slot, tx_idx, ix_idx, inner_ix_idx) | |
Apply the ≥100 base/quote thresholds and require swaps for prices. */ | |
argMaxStateIf( | |
price_base, | |
(slot, transaction_index, instruction_index, inner_instruction_index), | |
(is_swap = 1) AND (base_amount >= 100) AND (quote_amount >= 100) | |
) AS price_base, | |
argMaxStateIf( | |
price_usd, | |
(slot, transaction_index, instruction_index, inner_instruction_index), | |
(is_swap = 1) AND (base_amount >= 100) AND (quote_amount >= 100) | |
) AS price_usd, | |
/* liquidity snapshots by recency (no additional filter) */ | |
argMaxState( | |
base_liquidity, | |
(slot, transaction_index, instruction_index, inner_instruction_index) | |
) AS base_liquidity, | |
argMaxState( | |
quote_liquidity, | |
(slot, transaction_index, instruction_index, inner_instruction_index) | |
) AS quote_liquidity, | |
argMaxState( | |
base_liquidity_real, | |
(slot, transaction_index, instruction_index, inner_instruction_index) | |
) AS base_liquidity_real, | |
argMaxState( | |
quote_liquidity_real, | |
(slot, transaction_index, instruction_index, inner_instruction_index) | |
) AS quote_liquidity_real, | |
/* earliest observed ts */ | |
minState(timestamp) AS first_transaction_time | |
FROM indexer.parsed_transactions_optimized | |
GROUP BY pool; | |
/* ---------- Backfill pool_aggregates_optimized ---------- */ | |
INSERT INTO indexer.pool_aggregates_optimized | |
SELECT | |
any(base_mint) AS base_mint, | |
any(quote_mint) AS quote_mint, | |
pool, | |
any(platform) AS platform, | |
any(launchpad) AS launchpad, | |
maxState(is_migration) AS is_migration_state, | |
countIfState(method = 'buy') AS buy_count, | |
countIfState(method = 'sell') AS sell_count, | |
sumIfState(base_amount, method = 'buy') AS buy_base_amount, | |
sumIfState(base_amount, method = 'sell') AS sell_base_amount, | |
sumIfState(usd_amount, method = 'buy') AS buy_usd_amount, | |
sumIfState(usd_amount, method = 'sell') AS sell_usd_amount, | |
argMaxStateIf( | |
price_base, | |
(slot, transaction_index, instruction_index, inner_instruction_index), | |
(is_swap = 1) AND (base_amount >= 100) AND (quote_amount >= 100) | |
) AS price_base, | |
argMaxStateIf( | |
price_usd, | |
(slot, transaction_index, instruction_index, inner_instruction_index), | |
(is_swap = 1) AND (base_amount >= 100) AND (quote_amount >= 100) | |
) AS price_usd, | |
argMaxState( | |
base_liquidity, | |
(slot, transaction_index, instruction_index, inner_instruction_index) | |
) AS base_liquidity, | |
argMaxState( | |
quote_liquidity, | |
(slot, transaction_index, instruction_index, inner_instruction_index) | |
) AS quote_liquidity, | |
argMaxState( | |
base_liquidity_real, | |
(slot, transaction_index, instruction_index, inner_instruction_index) | |
) AS base_liquidity_real, | |
argMaxState( | |
quote_liquidity_real, | |
(slot, transaction_index, instruction_index, inner_instruction_index) | |
) AS quote_liquidity_real, | |
minState(timestamp) AS first_transaction_time | |
FROM indexer.parsed_transactions_optimized | |
GROUP BY pool | |
SETTINGS | |
max_memory_usage = '120G', | |
max_threads = 16, | |
max_bytes_before_external_group_by = '16G', | |
group_by_two_level_threshold = 80000, | |
group_by_two_level_threshold_bytes = '96M', | |
max_bytes_before_external_sort = '16G'; | |
/* ============================================== | |
CHANGE #2 — Best pools per quote_mint (agg) | |
============================================== */ | |
CREATE TABLE IF NOT EXISTS indexer.best_pools_by_qmint_agg | |
( | |
quote_mint FixedString(32), | |
-- official pools | |
migration_state AggregateFunction(any, | |
Tuple(FixedString(32), LowCardinality(String), LowCardinality(String))), | |
launchpad_state AggregateFunction(any, | |
Tuple(FixedString(32), LowCardinality(String), LowCardinality(String))), | |
-- best non-official by latest known liquidity (UInt64) | |
best_liq_state AggregateFunction(argMax, | |
Tuple(FixedString(32), LowCardinality(String), LowCardinality(String)), | |
UInt64) | |
) | |
ENGINE = ReplicatedAggregatingMergeTree( | |
'/clickhouse/tables/{shard}/indexer/best_pools_by_qmint_agg', '{replica}' | |
) | |
ORDER BY quote_mint; | |
CREATE MATERIALIZED VIEW IF NOT EXISTS indexer.mv_best_from_snapshot | |
TO indexer.best_pools_by_qmint_agg | |
AS | |
SELECT | |
quote_mint, | |
-- unique migration pool per mint | |
anyStateIf( (pool, platform, launchpad), is_migration = 1 ) AS migration_state, | |
-- unique launchpad pool per mint (non-empty launchpad label) | |
anyStateIf( (pool, platform, launchpad), launchpad <> '' ) AS launchpad_state, | |
-- best non-official by latest liquidity snapshot | |
argMaxStateIf( | |
(pool, platform, launchpad), | |
last_liq, | |
(is_migration = 0) AND (launchpad = '') | |
) AS best_liq_state | |
FROM | |
( | |
/* Snapshot per pool (functionally dependent columns via any*; no nested aggs above this level) */ | |
SELECT | |
any(quote_mint) AS quote_mint, | |
pool, | |
any(platform) AS platform, | |
any(launchpad) AS launchpad, | |
-- latest known liquidity per pool (UInt64) | |
argMaxMerge(quote_liquidity_real) AS last_liq, | |
-- ever-true migration flag | |
maxMerge(is_migration_state) AS is_migration | |
FROM indexer.pool_aggregates_optimized | |
GROUP BY pool | |
) | |
GROUP BY quote_mint; | |
/* ---------- Backfill best_pools_by_qmint_agg ---------- */ | |
INSERT INTO indexer.best_pools_by_qmint_agg | |
SELECT | |
quote_mint, | |
anyStateIf( (pool, platform, launchpad), is_migration = 1 ) AS migration_state, | |
anyStateIf( (pool, platform, launchpad), launchpad <> '' ) AS launchpad_state, | |
argMaxStateIf( (pool, platform, launchpad), last_liq, | |
(is_migration = 0) AND (launchpad = '') ) AS best_liq_state | |
FROM | |
( | |
SELECT | |
any(quote_mint) AS quote_mint, | |
pool, | |
any(platform) AS platform, | |
any(launchpad) AS launchpad, | |
argMaxMerge(quote_liquidity_real) AS last_liq, | |
maxMerge(is_migration_state) AS is_migration | |
FROM indexer.pool_aggregates_optimized | |
GROUP BY pool | |
) | |
GROUP BY quote_mint | |
SETTINGS | |
max_memory_usage = '80G', | |
max_threads = 32; | |
/* ============================================== | |
CHANGE #3 — Resolve single best (and historical) | |
+ embed ALL metrics directly here | |
============================================== */ | |
CREATE TABLE IF NOT EXISTS indexer.best_pool_resolved_by_qmint | |
( | |
quote_mint FixedString(32), | |
pool FixedString(32), | |
platform LowCardinality(String), | |
launchpad LowCardinality(String), | |
is_launchpad Bool, | |
is_migration Bool, | |
historical_pool Nullable(FixedString(32)), | |
historical_platform Nullable(String), -- previously Nullable(LowCardinality(String | |
historical_launchpad Nullable(String), -- previously Nullable(LowCardinality(String | |
/* === embedded metrics aggregated across the chosen pools (primary + optional historical) === */ | |
buy_count AggregateFunction(countIf, Bool), | |
sell_count AggregateFunction(countIf, Bool), | |
buy_base_amount AggregateFunction(sum, UInt64), | |
sell_base_amount AggregateFunction(sum, UInt64), | |
buy_usd_amount AggregateFunction(sum, Float64), | |
sell_usd_amount AggregateFunction(sum, Float64), | |
price_base AggregateFunction(argMax, Float64, Tuple(UInt64, UInt32, Int8, Int8)), | |
price_usd AggregateFunction(argMax, Float64, Tuple(UInt64, UInt32, Int8, Int8)), | |
base_liquidity AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)), | |
quote_liquidity AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)), | |
base_liquidity_real AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)), | |
quote_liquidity_real AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)), | |
first_transaction_time AggregateFunction(min, DateTime('UTC')) | |
) | |
ENGINE = ReplacingMergeTree | |
ORDER BY quote_mint; | |
CREATE MATERIALIZED VIEW IF NOT EXISTS indexer.mv_best_pool_resolved_by_qmint | |
TO indexer.best_pool_resolved_by_qmint | |
AS | |
WITH | |
toFixedString('', 32) AS empty32, | |
( | |
empty32, | |
CAST('' AS LowCardinality(String)), | |
CAST('' AS LowCardinality(String)) | |
) AS empty_tuple | |
SELECT | |
q.quote_mint, | |
/* identity columns for the primary (resolved) pool */ | |
primary_tuple.1 AS pool, | |
primary_tuple.2 AS platform, | |
primary_tuple.3 AS launchpad, | |
CAST(q.mig IS NULL AND q.lp IS NOT NULL AS Bool) AS is_launchpad, | |
CAST(q.mig IS NOT NULL AS Bool) AS is_migration, | |
/* historical (optional) identity */ | |
if( | |
q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, | |
q.lp.1, CAST(NULL AS Nullable(FixedString(32))) | |
) AS historical_pool, | |
if( | |
q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, | |
q.lp.2, CAST(NULL AS Nullable(String)) | |
) AS historical_platform, | |
if( | |
q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, | |
q.lp.3, CAST(NULL AS Nullable(String)) | |
) AS historical_launchpad, | |
/* === merged metrics across chosen pools === */ | |
countIfMergeState(p.buy_count) AS buy_count, | |
countIfMergeState(p.sell_count) AS sell_count, | |
sumMergeState(p.buy_base_amount) AS buy_base_amount, | |
sumMergeState(p.sell_base_amount) AS sell_base_amount, | |
sumMergeState(p.buy_usd_amount) AS buy_usd_amount, | |
sumMergeState(p.sell_usd_amount) AS sell_usd_amount, | |
argMaxMergeState(p.price_base) AS price_base, | |
argMaxMergeState(p.price_usd) AS price_usd, | |
argMaxMergeState(p.base_liquidity) AS base_liquidity, | |
argMaxMergeState(p.quote_liquidity) AS quote_liquidity, | |
argMaxMergeState(p.base_liquidity_real) AS base_liquidity_real, | |
argMaxMergeState(p.quote_liquidity_real) AS quote_liquidity_real, | |
minMergeState(p.first_transaction_time) AS first_transaction_time | |
FROM | |
( | |
/* Resolve the tuple set per quote_mint */ | |
SELECT | |
quote_mint, | |
if(anyMerge(migration_state) != empty_tuple, anyMerge(migration_state), empty_tuple) AS mig, | |
if(anyMerge(launchpad_state) != empty_tuple, anyMerge(launchpad_state), empty_tuple) AS lp, | |
if(argMaxMerge(best_liq_state) != empty_tuple, argMaxMerge(best_liq_state), empty_tuple) AS best, | |
coalesce( | |
if(anyMerge(migration_state) != empty_tuple, anyMerge(migration_state), empty_tuple), | |
if(anyMerge(launchpad_state) != empty_tuple, anyMerge(launchpad_state), empty_tuple), | |
if(argMaxMerge(best_liq_state) != empty_tuple, argMaxMerge(best_liq_state), empty_tuple) | |
) AS primary_tuple | |
FROM indexer.best_pools_by_qmint_agg | |
GROUP BY quote_mint | |
) AS q | |
/* expand to the set of chosen pools: primary + (optional) historical lp */ | |
ARRAY JOIN arrayFilter(x -> x.1 != empty32, | |
[ primary_tuple, | |
if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp, empty_tuple) | |
]) AS chosen_tuple | |
ANY LEFT JOIN indexer.pool_aggregates_optimized AS p | |
ON p.pool = chosen_tuple.1 | |
GROUP BY | |
q.quote_mint, | |
primary_tuple.1, primary_tuple.2, primary_tuple.3, | |
CAST(q.mig IS NULL AND q.lp IS NOT NULL AS Bool), | |
CAST(q.mig IS NOT NULL AS Bool), | |
if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.1, CAST(NULL AS Nullable(FixedString(32)))), | |
if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.2, CAST(NULL AS Nullable(String))), | |
if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.3, CAST(NULL AS Nullable(String))); | |
/* ---------- Backfill best_pool_resolved_by_qmint (with embedded metrics) ---------- */ | |
INSERT INTO indexer.best_pool_resolved_by_qmint | |
WITH | |
toFixedString('', 32) AS empty32, | |
( | |
empty32, | |
CAST('' AS LowCardinality(String)), | |
CAST('' AS LowCardinality(String)) | |
) AS empty_tuple | |
SELECT | |
q.quote_mint, | |
/* identity columns for the primary (resolved) pool */ | |
primary_tuple.1 AS pool, | |
primary_tuple.2 AS platform, | |
primary_tuple.3 AS launchpad, | |
CAST(q.mig IS NULL AND q.lp IS NOT NULL AS Bool) AS is_launchpad, | |
CAST(q.mig IS NOT NULL AS Bool) AS is_migration, | |
/* historical (optional) identity */ | |
if( | |
q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, | |
q.lp.1, CAST(NULL AS Nullable(FixedString(32))) | |
) AS historical_pool, | |
if( | |
q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, | |
q.lp.2, CAST(NULL AS Nullable(String)) | |
) AS historical_platform, | |
if( | |
q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, | |
q.lp.3, CAST(NULL AS Nullable(String)) | |
) AS historical_launchpad, | |
/* === merged metrics across chosen pools === */ | |
countIfMergeState(p.buy_count) AS buy_count, | |
countIfMergeState(p.sell_count) AS sell_count, | |
sumMergeState(p.buy_base_amount) AS buy_base_amount, | |
sumMergeState(p.sell_base_amount) AS sell_base_amount, | |
sumMergeState(p.buy_usd_amount) AS buy_usd_amount, | |
sumMergeState(p.sell_usd_amount) AS sell_usd_amount, | |
argMaxMergeState(p.price_base) AS price_base, | |
argMaxMergeState(p.price_usd) AS price_usd, | |
argMaxMergeState(p.base_liquidity) AS base_liquidity, | |
argMaxMergeState(p.quote_liquidity) AS quote_liquidity, | |
argMaxMergeState(p.base_liquidity_real) AS base_liquidity_real, | |
argMaxMergeState(p.quote_liquidity_real) AS quote_liquidity_real, | |
minMergeState(p.first_transaction_time) AS first_transaction_time | |
FROM | |
( | |
/* Resolve the tuple set per quote_mint */ | |
SELECT | |
quote_mint, | |
if(anyMerge(migration_state) != empty_tuple, anyMerge(migration_state), empty_tuple) AS mig, | |
if(anyMerge(launchpad_state) != empty_tuple, anyMerge(launchpad_state), empty_tuple) AS lp, | |
if(argMaxMerge(best_liq_state) != empty_tuple, argMaxMerge(best_liq_state), empty_tuple) AS best, | |
coalesce( | |
if(anyMerge(migration_state) != empty_tuple, anyMerge(migration_state), empty_tuple), | |
if(anyMerge(launchpad_state) != empty_tuple, anyMerge(launchpad_state), empty_tuple), | |
if(argMaxMerge(best_liq_state) != empty_tuple, argMaxMerge(best_liq_state), empty_tuple) | |
) AS primary_tuple | |
FROM indexer.best_pools_by_qmint_agg | |
GROUP BY quote_mint | |
) AS q | |
/* expand to the set of chosen pools: primary + (optional) historical lp */ | |
ARRAY JOIN arrayFilter(x -> x.1 != empty32, | |
[ primary_tuple, | |
if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp, empty_tuple) | |
]) AS chosen_tuple | |
ANY LEFT JOIN indexer.pool_aggregates_optimized AS p | |
ON p.pool = chosen_tuple.1 | |
GROUP BY | |
q.quote_mint, | |
primary_tuple.1, primary_tuple.2, primary_tuple.3, | |
CAST(q.mig IS NULL AND q.lp IS NOT NULL AS Bool), | |
CAST(q.mig IS NOT NULL AS Bool), | |
if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.1, CAST(NULL AS Nullable(FixedString(32)))), | |
if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.2, CAST(NULL AS Nullable(String))), | |
if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.3, CAST(NULL AS Nullable(String))) | |
SETTINGS | |
max_memory_usage = '120G', | |
max_bytes_in_join = '120G', | |
join_algorithm = 'partial_merge', | |
max_threads = 32; | |
/* ---------- Counts for quick sanity ---------- */ | |
SELECT 'pool_aggregates_optimized rows:', count() FROM indexer.pool_aggregates_optimized; | |
SELECT 'best_pools_by_qmint_agg rows:', count() FROM indexer.best_pools_by_qmint_agg; | |
SELECT 'best_pool_resolved_by_qmint rows:', count() FROM indexer.best_pool_resolved_by_qmint; | |
/* ---------- Migration bookkeeping ---------- */ | |
INSERT INTO system_migrations (name) | |
VALUES ('004_good_pools_and_metrics'); | |
-- from 005_split_per_date.sql | |
------------------------------ | |
-- 1. Create the target table for MV | |
CREATE TABLE IF NOT EXISTS indexer.k_parsed_transactions_by_quote | |
( | |
signature String, | |
slot UInt64, | |
timestamp DateTime64(9), | |
method LowCardinality(String), | |
platform LowCardinality(String), | |
wallet FixedString(32), | |
quote_mint FixedString(32), | |
base_mint FixedString(32), | |
pool String, | |
base_amount Float64, | |
quote_amount Float64, | |
usd_amount Float64, | |
price_base Float64, | |
price_usd Float64, | |
quote_decimals UInt8, | |
base_decimals UInt8, | |
pre_quote_balance Float64, | |
post_quote_balance Float64, | |
pre_base_balance Float64, | |
post_base_balance Float64, | |
base_liquidity Float64, | |
quote_liquidity Float64, | |
base_liquidity_real Float64, | |
quote_liquidity_real Float64, | |
transaction_index UInt32, | |
instruction_index UInt32, | |
inner_instruction_index UInt32, | |
tx_fees Float64, | |
platform_fees Float64, | |
processor_tips Float64, | |
platform_type LowCardinality(String) | |
) | |
ENGINE = MergeTree | |
ORDER BY (quote_mint, wallet, method, slot, transaction_index, instruction_index, inner_instruction_index); | |
-- 2. Create MV that auto-populates into the target | |
CREATE MATERIALIZED VIEW IF NOT EXISTS indexer.k_mv_parsed_transactions_by_quote | |
TO indexer.k_parsed_transactions_by_quote | |
AS | |
SELECT | |
signature, | |
slot, | |
timestamp, | |
method, | |
platform, | |
wallet, | |
quote_mint, | |
base_mint, | |
pool, | |
base_amount, | |
quote_amount, | |
usd_amount, | |
price_base, | |
price_usd, | |
quote_decimals, | |
base_decimals, | |
pre_quote_balance, | |
post_quote_balance, | |
pre_base_balance, | |
post_base_balance, | |
base_liquidity, | |
quote_liquidity, | |
base_liquidity_real, | |
quote_liquidity_real, | |
transaction_index, | |
instruction_index, | |
inner_instruction_index, | |
tx_fees, | |
platform_fees, | |
processor_tips, | |
platform_type | |
FROM indexer.parsed_transactions_optimized; | |
-- 3. backfill per yyyymm | |
INSERT INTO indexer.k_parsed_transactions_by_quote | |
SELECT signature, | |
slot, | |
timestamp, | |
method, | |
platform, | |
wallet, | |
quote_mint, | |
base_mint, | |
pool, | |
base_amount, | |
quote_amount, | |
usd_amount, | |
price_base, | |
price_usd, | |
quote_decimals, | |
base_decimals, | |
pre_quote_balance, | |
post_quote_balance, | |
pre_base_balance, | |
post_base_balance, | |
base_liquidity, | |
quote_liquidity, | |
base_liquidity_real, | |
quote_liquidity_real, | |
transaction_index, | |
instruction_index, | |
inner_instruction_index, | |
tx_fees, | |
platform_fees, | |
processor_tips, | |
platform_type | |
FROM indexer.parsed_transactions_optimized | |
WHERE toYYYYMM(timestamp) = '202409' | |
; | |
INSERT INTO indexer.k_parsed_transactions_by_quote | |
SELECT signature, | |
slot, | |
timestamp, | |
method, | |
platform, | |
wallet, | |
quote_mint, | |
base_mint, | |
pool, | |
base_amount, | |
quote_amount, | |
usd_amount, | |
price_base, | |
price_usd, | |
quote_decimals, | |
base_decimals, | |
pre_quote_balance, | |
post_quote_balance, | |
pre_base_balance, | |
post_base_balance, | |
base_liquidity, | |
quote_liquidity, | |
base_liquidity_real, | |
quote_liquidity_real, | |
transaction_index, | |
instruction_index, | |
inner_instruction_index, | |
tx_fees, | |
platform_fees, | |
processor_tips, | |
platform_type | |
FROM indexer.parsed_transactions_optimized | |
WHERE toYYYYMM(timestamp) = '202410' | |
; | |
INSERT INTO indexer.k_parsed_transactions_by_quote | |
SELECT signature, | |
slot, | |
timestamp, | |
method, | |
platform, | |
wallet, | |
quote_mint, | |
base_mint, | |
pool, | |
base_amount, | |
quote_amount, | |
usd_amount, | |
price_base, | |
price_usd, | |
quote_decimals, | |
base_decimals, | |
pre_quote_balance, | |
post_quote_balance, | |
pre_base_balance, | |
post_base_balance, | |
base_liquidity, | |
quote_liquidity, | |
base_liquidity_real, | |
quote_liquidity_real, | |
transaction_index, | |
instruction_index, | |
inner_instruction_index, | |
tx_fees, | |
platform_fees, | |
processor_tips, | |
platform_type | |
FROM indexer.parsed_transactions_optimized | |
WHERE toYYYYMM(timestamp) = '202411' | |
; | |
INSERT INTO indexer.k_parsed_transactions_by_quote | |
SELECT signature, | |
slot, | |
timestamp, | |
method, | |
platform, | |
wallet, | |
quote_mint, | |
base_mint, | |
pool, | |
base_amount, | |
quote_amount, | |
usd_amount, | |
price_base, | |
price_usd, | |
quote_decimals, | |
base_decimals, | |
pre_quote_balance, | |
post_quote_balance, | |
pre_base_balance, | |
post_base_balance, | |
base_liquidity, | |
quote_liquidity, | |
base_liquidity_real, | |
quote_liquidity_real, | |
transaction_index, | |
instruction_index, | |
inner_instruction_index, | |
tx_fees, | |
platform_fees, | |
processor_tips, | |
platform_type | |
FROM indexer.parsed_transactions_optimized | |
WHERE toYYYYMM(timestamp) = '202412' | |
; | |
INSERT INTO indexer.k_parsed_transactions_by_quote | |
SELECT signature, | |
slot, | |
timestamp, | |
method, | |
platform, | |
wallet, | |
quote_mint, | |
base_mint, | |
pool, | |
base_amount, | |
quote_amount, | |
usd_amount, | |
price_base, | |
price_usd, | |
quote_decimals, | |
base_decimals, | |
pre_quote_balance, | |
post_quote_balance, | |
pre_base_balance, | |
post_base_balance, | |
base_liquidity, | |
quote_liquidity, | |
base_liquidity_real, | |
quote_liquidity_real, | |
transaction_index, | |
instruction_index, | |
inner_instruction_index, | |
tx_fees, | |
platform_fees, | |
processor_tips, | |
platform_type | |
FROM indexer.parsed_transactions_optimized | |
WHERE toYYYYMM(timestamp) = '202501' | |
; | |
INSERT INTO indexer.k_parsed_transactions_by_quote | |
SELECT signature, | |
slot, | |
timestamp, | |
method, | |
platform, | |
wallet, | |
quote_mint, | |
base_mint, | |
pool, | |
base_amount, | |
quote_amount, | |
usd_amount, | |
price_base, | |
price_usd, | |
quote_decimals, | |
base_decimals, | |
pre_quote_balance, | |
post_quote_balance, | |
pre_base_balance, | |
post_base_balance, | |
base_liquidity, | |
quote_liquidity, | |
base_liquidity_real, | |
quote_liquidity_real, | |
transaction_index, | |
instruction_index, | |
inner_instruction_index, | |
tx_fees, | |
platform_fees, | |
processor_tips, | |
platform_type | |
FROM indexer.parsed_transactions_optimized | |
WHERE toYYYYMM(timestamp) = '202502' | |
; | |
INSERT INTO indexer.k_parsed_transactions_by_quote | |
SELECT signature, | |
slot, | |
timestamp, | |
method, | |
platform, | |
wallet, | |
quote_mint, | |
base_mint, | |
pool, | |
base_amount, | |
quote_amount, | |
usd_amount, | |
price_base, | |
price_usd, | |
quote_decimals, | |
base_decimals, | |
pre_quote_balance, | |
post_quote_balance, | |
pre_base_balance, | |
post_base_balance, | |
base_liquidity, | |
quote_liquidity, | |
base_liquidity_real, | |
quote_liquidity_real, | |
transaction_index, | |
instruction_index, | |
inner_instruction_index, | |
tx_fees, | |
platform_fees, | |
processor_tips, | |
platform_type | |
FROM indexer.parsed_transactions_optimized | |
WHERE toYYYYMM(timestamp) = '202503' | |
; | |
INSERT INTO indexer.k_parsed_transactions_by_quote | |
SELECT signature, | |
slot, | |
timestamp, | |
method, | |
platform, | |
wallet, | |
quote_mint, | |
base_mint, | |
pool, | |
base_amount, | |
quote_amount, | |
usd_amount, | |
price_base, | |
price_usd, | |
quote_decimals, | |
base_decimals, | |
pre_quote_balance, | |
post_quote_balance, | |
pre_base_balance, | |
post_base_balance, | |
base_liquidity, | |
quote_liquidity, | |
base_liquidity_real, | |
quote_liquidity_real, | |
transaction_index, | |
instruction_index, | |
inner_instruction_index, | |
tx_fees, | |
platform_fees, | |
processor_tips, | |
platform_type | |
FROM indexer.parsed_transactions_optimized | |
WHERE toYYYYMM(timestamp) = '202504' | |
; | |
INSERT INTO indexer.k_parsed_transactions_by_quote | |
SELECT signature, | |
slot, | |
timestamp, | |
method, | |
platform, | |
wallet, | |
quote_mint, | |
base_mint, | |
pool, | |
base_amount, | |
quote_amount, | |
usd_amount, | |
price_base, | |
price_usd, | |
quote_decimals, | |
base_decimals, | |
pre_quote_balance, | |
post_quote_balance, | |
pre_base_balance, | |
post_base_balance, | |
base_liquidity, | |
quote_liquidity, | |
base_liquidity_real, | |
quote_liquidity_real, | |
transaction_index, | |
instruction_index, | |
inner_instruction_index, | |
tx_fees, | |
platform_fees, | |
processor_tips, | |
platform_type | |
FROM indexer.parsed_transactions_optimized | |
WHERE toYYYYMM(timestamp) = '202505' | |
; | |
INSERT INTO indexer.k_parsed_transactions_by_quote | |
SELECT signature, | |
slot, | |
timestamp, | |
method, | |
platform, | |
wallet, | |
quote_mint, | |
base_mint, | |
pool, | |
base_amount, | |
quote_amount, | |
usd_amount, | |
price_base, | |
price_usd, | |
quote_decimals, | |
base_decimals, | |
pre_quote_balance, | |
post_quote_balance, | |
pre_base_balance, | |
post_base_balance, | |
base_liquidity, | |
quote_liquidity, | |
base_liquidity_real, | |
quote_liquidity_real, | |
transaction_index, | |
instruction_index, | |
inner_instruction_index, | |
tx_fees, | |
platform_fees, | |
processor_tips, | |
platform_type | |
FROM indexer.parsed_transactions_optimized | |
WHERE toYYYYMM(timestamp) = '202506' | |
; | |
INSERT INTO indexer.k_parsed_transactions_by_quote | |
SELECT signature, | |
slot, | |
timestamp, | |
method, | |
platform, | |
wallet, | |
quote_mint, | |
base_mint, | |
pool, | |
base_amount, | |
quote_amount, | |
usd_amount, | |
price_base, | |
price_usd, | |
quote_decimals, | |
base_decimals, | |
pre_quote_balance, | |
post_quote_balance, | |
pre_base_balance, | |
post_base_balance, | |
base_liquidity, | |
quote_liquidity, | |
base_liquidity_real, | |
quote_liquidity_real, | |
transaction_index, | |
instruction_index, | |
inner_instruction_index, | |
tx_fees, | |
platform_fees, | |
processor_tips, | |
platform_type | |
FROM indexer.parsed_transactions_optimized | |
WHERE toYYYYMM(timestamp) = '202507' | |
; | |
INSERT INTO indexer.k_parsed_transactions_by_quote | |
SELECT signature, | |
slot, | |
timestamp, | |
method, | |
platform, | |
wallet, | |
quote_mint, | |
base_mint, | |
pool, | |
base_amount, | |
quote_amount, | |
usd_amount, | |
price_base, | |
price_usd, | |
quote_decimals, | |
base_decimals, | |
pre_quote_balance, | |
post_quote_balance, | |
pre_base_balance, | |
post_base_balance, | |
base_liquidity, | |
quote_liquidity, | |
base_liquidity_real, | |
quote_liquidity_real, | |
transaction_index, | |
instruction_index, | |
inner_instruction_index, | |
tx_fees, | |
platform_fees, | |
processor_tips, | |
platform_type | |
FROM indexer.parsed_transactions_optimized | |
WHERE toYYYYMM(timestamp) = '202508' | |
; | |
INSERT INTO indexer.k_parsed_transactions_by_quote | |
SELECT signature, | |
slot, | |
timestamp, | |
method, | |
platform, | |
wallet, | |
quote_mint, | |
base_mint, | |
pool, | |
base_amount, | |
quote_amount, | |
usd_amount, | |
price_base, | |
price_usd, | |
quote_decimals, | |
base_decimals, | |
pre_quote_balance, | |
post_quote_balance, | |
pre_base_balance, | |
post_base_balance, | |
base_liquidity, | |
quote_liquidity, | |
base_liquidity_real, | |
quote_liquidity_real, | |
transaction_index, | |
instruction_index, | |
inner_instruction_index, | |
tx_fees, | |
platform_fees, | |
processor_tips, | |
platform_type | |
FROM indexer.parsed_transactions_optimized | |
WHERE toYYYYMM(timestamp) = '202509' | |
; | |
INSERT INTO indexer.k_parsed_transactions_by_quote | |
SELECT signature, | |
slot, | |
timestamp, | |
method, | |
platform, | |
wallet, | |
quote_mint, | |
base_mint, | |
pool, | |
base_amount, | |
quote_amount, | |
usd_amount, | |
price_base, | |
price_usd, | |
quote_decimals, | |
base_decimals, | |
pre_quote_balance, | |
post_quote_balance, | |
pre_base_balance, | |
post_base_balance, | |
base_liquidity, | |
quote_liquidity, | |
base_liquidity_real, | |
quote_liquidity_real, | |
transaction_index, | |
instruction_index, | |
inner_instruction_index, | |
tx_fees, | |
platform_fees, | |
processor_tips, | |
platform_type | |
FROM indexer.parsed_transactions_optimized | |
WHERE toYYYYMM(timestamp) = '202509' | |
; | |
-- 4. check whether it's getting faster or not | |
EXPLAIN indexes = 1 | |
SELECT | |
signature, | |
slot, | |
timestamp, | |
method, | |
platform, | |
wallet, | |
quote_mint, | |
base_mint, | |
pool, | |
base_amount, | |
quote_amount, | |
usd_amount, | |
price_base, | |
price_usd, | |
quote_decimals, | |
base_decimals, | |
pre_quote_balance, | |
post_quote_balance, | |
pre_base_balance, | |
post_base_balance, | |
base_liquidity, | |
quote_liquidity, | |
base_liquidity_real, | |
quote_liquidity_real, | |
transaction_index, | |
instruction_index, | |
inner_instruction_index, | |
tx_fees, | |
platform_fees, | |
processor_tips, | |
platform_type | |
FROM indexer.k_parsed_transactions_by_quote | |
WHERE wallet IN ( | |
SELECT toFixedString(base58Decode(w), 32) | |
FROM (SELECT arrayJoin([ | |
'4GQeEya6ZTwvXre4Br6ZfDyfe2WQMkcDz2QbkJZazVqS', | |
'cook3vQojECt9LgMUse6Y3PLDzzUmdVGt1Uz8sohGai', | |
'86ca1fHVACfW57vQaGwvx2841WnpphuT5YF3WoHxuouo', | |
'H3jfDkDCeBgzM5wm7WCwtjdMC48poTAKXbEeWdvKLnVL', | |
'3tc4BVAdzjr1JpeZu6NAjLHyp4kK3iic7TexMBYGJ4Xk', | |
'RFSqPtn1JfavGiUD4HJsZyYXvZsycxf31hnYfbyG6iB', | |
'8deJ9xeUvXSJwicYptA9mHsU2rN2pDx37KWzkDkEXhU6', | |
'DfMxre4cKmvogbLrPigxmibVTTQDuzjdXojWzjCXXhzj', | |
/* ⚠️ The next two end with "pump" and are not valid Base58 pubkeys. | |
Remove or replace them, or base58Decode will throw. */ | |
-- '34qXg7fx9AJDmBA1fRs8xMsfNvWQbabkdkEWfGSRpump', | |
'8zFZHuSRuDpuAR7J6FzwyF3vKNx4CVW3DFHJerQhc7Zd', | |
'FpR5RsXauqdsGhgdW9C6zmBHMkcT9HJvXJQDhSUa4kYe', | |
'3kebnKw7cPdSkLRfiMEALyZJGZ4wdiSRvmoN4rD1yPzV', | |
'4Be9CvxqHW6BYiRAxW9Q3xu1ycTMWaL5z8NX4HR3ha7t', | |
'9jyqFiLnruggwNn4EQwBNFXwpbLM9hrA4hV59ytyAVVz', | |
'3rSZJHysEk2ueFVovRLtZ8LGnQBMZGg96H2Q4jErspAF', | |
'CRVidEDtEUTYZisCxBZkpELzhQc9eauMLR3FWg74tReL', | |
'73LnJ7G9ffBDjEBGgJDdgvLUhD5APLonKrNiHsKDCw5B', | |
'91HANKimRsWTXMkfcDipLaDVmeTkj6m2eNSVobpkdUNz', | |
'GQWLRHtR18vy8myoHkgc9SMcSzwUdBjJ816vehSBwcis', | |
'62FZUSWPMX9pofoV1uWHMdzFJRjwMa1LHgh2zhdEB7Zj', | |
'AVAZvHLR2PcWpDf8BXY4rVxNHYRBytycHkcB5z5QNXYm', | |
'73KcgcTVyLYhnVX3PJbit8e4mC7jU8QotNGBFGix14jn', | |
'BDC19MFbzDpoF37QCxe4JR6i7UmXNMSiEqrxuQgJoWte', | |
'4vgDxDJubQxmHsaNxgsTESer9Qgk1bGKuE1dN4fXzsoj', | |
'26kZ9rg8Y5pd4j1tdT4cbT8BQRu5uDbXkaVs3L5QasHy', | |
'5TMR5GFPTzkNhprnty2ocXEFLoLur1uchg4NkEr5WEf4', | |
'G2VzymsKt3zNAn4CKBndYcS67w6Kny5sDEp7Y2W1aTf6', | |
'GfXQesPe3Zuwg8JhAt6Cg8euJDTVx751enp9EQQmhzPH', | |
'dVs7zZksjFuq73xbtUC62brFXYYuxCuPSG4wZeGiHck', | |
'DGPYpCdiVg2shab2TnNiZ2RnsjBQSmhgN71hJyWC5cYn', | |
'GTvBQnRvAPweU2qmYg8MDLND2PAAyYFKe35aKQGMRDaL', | |
'EcJWNtETrzdbj8s2dXpaE4Tu4r7fxALD6TNw9H8S6ksz', | |
'7nt58K2KF8HwBiCcRprCYP7civVifXP59fE4x229gAuv', | |
'2QwCvtKyVew25mvDDJDwgopMhwRmNQqqGDsjCBVACo8A', | |
'4aDdi3EiDPMbeZ3e5BvbFMt4vfJaoahaHxZuwKQRtFc1', | |
'Hg5SEgwdHw8FUoKMPDZbREii3qEXwn6EvvsUdBPM2rmi', | |
'7Dt5oUpxHWuKH8bCTXDLz2j3JyxA7jEmtzqCG6pnh96X', | |
'AbcX4XBm7DJ3i9p29i6sU8WLmiW4FWY5tiwB9D6UBbcE', | |
'HrTZPWV4ZPebBiwyzoTBajCD49kQqVwf4dwsLuYG8CXX', | |
'4hSXPtxZgXFpo6Vxq9yqxNjcBoqWN3VoaPJWonUtupzD', | |
'BrNoqdHUCcv9yTncnZeSjSov8kqhpmzv1nAiPbq1M95H', | |
'28ipXVfkdmu1PDowCHbcSfzkpH9edZmSiVoDhY5xGVfR', | |
'HsfSzb7BBv4oKNFamgoyBEQ4v6BHUhEREEmBnWzZJXLj', | |
'ECCKBDWX3MkEcf3bULbLBb9FvrEQLsmPMFTKFpvjzqgP', | |
'EDmbDc7sY87dKszqyZ3rHczWbKcCyUvJQSJpE3Cg4RcZ', | |
'DSVc1Rd69sLcXBoZAsvkzK4jGFyJKP77K3J2CaFVNAFP', | |
'HEibUTHW7JHt7VgKYtWpQCSX7a6YHBmRQrixZs9i4wGY', | |
'CAUbSmiNuj16phNiskMdwWZEAUXCfXaUSamDFyf7pAa6', | |
'6RoLbZJWJHpTk4sdPsWzocEHiRtzPS36WcBjnMXuQrfU', | |
'JDd3hy3gQn2V982mi1zqhNqUw1GfV2UL6g76STojCJPN', | |
'9UWZFoiCHeYRLmzmDJhdMrP7wgrTw7DMSpPiT2eHgJHe', | |
'EXEeLF1YiZxC1tFneoQPv6rcdq5VMUbKNGy3DLPEo2oH', | |
'D3Z7weHeLGWA7eg1qwVB66NCs8YLxiDTBVE6Eb1tTmwg', | |
'HVajxfNTWqLGxsfJA9DFThnvddveKfJLK8re1kNpeCVv', | |
'3PgV4tvihn16J6uaQsKeekChCAH7MKyxGQYb7eaRJbMu', | |
'AT6FsbUy1jNR2AHE35WCVh3nSdDtYADtdhA2KitQXmL5', | |
'HdxkiXqeN6qpK2YbG51W23QSWj3Yygc1eEk2zwmKJExp', | |
'Fxj1j5ohVoRos1yQnvx6bsvYCt493VGpyLbMiq4eLff3', | |
'7VBTpiiEjkwRbRGHJFUz6o5fWuhPFtAmy8JGhNqwHNnn', | |
'6LChaYRYtEYjLEHhzo4HdEmgNwu2aia8CM8VhR9wn6n7', | |
'DNfuF1L62WWyW3pNakVkyGGFzVVhj4Yr52jSmdTyeBHm', | |
'636N7frU8bUwYfyUAtvMQQsXhTFRuSWjxnEZihr5axGV', | |
'DpNVrtA3ERfKzX4F8Pi2CVykdJJjoNxyY5QgoytAwD26', | |
'f1BuDDhr9myYFm8X49sQ5RharwsoNPvXDtB18CMA1fu', | |
'HABhDh9zrzf8mA4SBo1yro8M6AirH2hZdLNPpuvMH6iA', | |
'9ecRdqNCBxwReiY2pxtnLeayeBzE1CrSSmKKJJGdmsaJ', | |
'F72vY99ihQsYwqEDCfz7igKXA5me6vN2zqVsVUTpw6qL', | |
'FAwHi11KyJVh2pR2b2vNFEbbunTVkMFpdNBuD2dh9NRf', | |
'HmBmSYwYEgEZuBUYuDs9xofyqBAkw4ywugB1d7R7sTGh', | |
'7iabBMwmSvS4CFPcjW2XYZY53bUCHzXjCFEFhxeYP4CY', | |
'AJ6MGExeK7FXmeKkKPmALjcdXVStXYokYNv9uVfDRtvo', | |
'CyaE1VxvBrahnPWkqm5VsdCvyS2QmNht2UFrKJHga54o', | |
'BCnqsPEtA1TkgednYEebRpkmwFRJDCjMQcKZMMtEdArc', | |
'7Ny3AmDQ1cnQ29ywXdQheaoeTPvo6NGkDDtGa5bK9wRH', | |
/* ⚠️ Same here — ends with "pump". Remove or replace. */ | |
-- '36Qg5TussomUkM7g9FUR8KNcUNwDPbL8jdurKvw3pump', | |
'suqh5sHtr8HyJ7q8scBimULPkPpA557prMG47xCHQfK', | |
'4zq1iLpmepj2Rj7W6A3XQMRQA1HyjYqVpZiBzM6aPyH7', | |
'4gbKBZQ8aaXfmuwq2bbN6cFJEhLm1uxHv2ravrkYTgBU', | |
'o5e4MABwiMuhnwXWUPgwnhvGPK91i2ytnWZvAz2begi', | |
'7SDs3PjT2mswKQ7Zo4FTucn9gJdtuW4jaacPA65BseHS', | |
'77kq9xNZg4Vb94UJMpeHsx7pKXUBU3Wpqsm5UXhjA9dd', | |
'831qmkeGhfL8YpcXuhrug6nHj1YdK3aXMDQUCo85Auh1' | |
]) AS w) | |
) | |
AND quote_mint = toFixedString(base58Decode('MEFNBXixkEbait3xn9bkm8WsJzXtVsaJEn4c8Sam21u'), 32) | |
AND method IN ('buy', 'sell') | |
ORDER BY (slot, transaction_index, instruction_index, inner_instruction_index) DESC | |
LIMIT 1000 OFFSET 0 | |
SETTINGS optimize_read_in_order=0, enable_filesystem_cache=0; | |
-- to undo | |
/* | |
DROP VIEW indexer.k_mv_parsed_transactions_by_quote; | |
DROP TABLE indexer.k_parsed_transactions_by_quote | |
SETTINGS max_table_size_to_drop = 0; | |
*/ | |
INSERT INTO system_migrations (name) | |
VALUES ('005_wallet_quote_mint'); | |
-- from 006_various_changes.sql | |
------------------------------- | |
/* =============================== | |
MIGRATION 006 — Various changes | |
=============================== */ | |
USE indexer; | |
SET max_memory_usage = 480000000000, | |
max_execution_time = 0, | |
send_timeout = 0, | |
receive_timeout = 0, | |
optimize_on_insert = 0, | |
max_rows_to_read = 0, | |
max_bytes_to_read = 0, | |
max_threads = 56, | |
max_bytes_before_external_group_by = '100G', | |
group_by_two_level_threshold = 500000, | |
group_by_two_level_threshold_bytes = '8G', | |
max_bytes_before_external_sort = '100G', | |
allow_nondeterministic_mutations = 1, | |
max_table_size_to_drop = 0; | |
DROP TABLE IF EXISTS indexer.mv_trade_volume_hourly_by_mint_mv; | |
DROP TABLE IF EXISTS indexer.mv_trade_volume_hourly_by_mint; | |
ALTER TABLE indexer.parsed_new_token_transactions_optimized | |
DROP PROJECTION IF EXISTS uri_search_projection; | |
/* ---------- Trading Volume Hourly Materialized View ---------- */ | |
CREATE TABLE indexer.mv_trade_volume_hourly_by_mint | |
( | |
`mint` FixedString(32), | |
`hour` DateTime('UTC'), | |
`total_volume` AggregateFunction(sum, Float64), | |
`buy_volume` AggregateFunction(sum, Float64), | |
`sell_volume` AggregateFunction(sum, Float64), | |
`total_txns` AggregateFunction(count), | |
`buy_txns` AggregateFunction(count), | |
`sell_txns` AggregateFunction(count), | |
PROJECTION hour_projection | |
( | |
SELECT * | |
ORDER BY hour | |
), | |
PROJECTION mint_hour_projection | |
( | |
SELECT * | |
ORDER BY mint, hour | |
) | |
) | |
ENGINE = AggregatingMergeTree | |
PARTITION BY toYYYYMM(hour) | |
ORDER BY (mint, hour) | |
SETTINGS index_granularity = 8192, | |
deduplicate_merge_projection_mode = 'rebuild'; | |
CREATE MATERIALIZED VIEW indexer.mv_trade_volume_hourly_by_mint_mv | |
TO indexer.mv_trade_volume_hourly_by_mint | |
AS SELECT | |
quote_mint AS mint, | |
toStartOfHour(timestamp) AS hour, | |
sumState(usd_amount) AS total_volume, | |
sumStateIf(usd_amount, method = 'buy') AS buy_volume, | |
sumStateIf(usd_amount, method = 'sell') AS sell_volume, | |
countState() AS total_txns, | |
countStateIf(method = 'buy') AS buy_txns, | |
countStateIf(method = 'sell') AS sell_txns | |
FROM indexer.parsed_transactions_optimized | |
WHERE is_swap = 1 | |
GROUP BY mint, hour; | |
/* ---------- URI Search Projection ---------- */ | |
ALTER TABLE indexer.parsed_new_token_transactions_optimized | |
ADD PROJECTION uri_search_projection | |
( | |
SELECT | |
* | |
ORDER BY uri, timestamp | |
); | |
ALTER TABLE indexer.parsed_new_token_transactions_optimized | |
MATERIALIZE PROJECTION uri_search_projection; | |
INSERT INTO indexer.mv_trade_volume_hourly_by_mint | |
SELECT | |
quote_mint AS mint, | |
toStartOfHour(timestamp) AS hour, | |
sumState(usd_amount) AS total_volume, | |
sumStateIf(usd_amount, method = 'buy') AS buy_volume, | |
sumStateIf(usd_amount, method = 'sell') AS sell_volume, | |
countState() AS total_txns, | |
countStateIf(method = 'buy') AS buy_txns, | |
countStateIf(method = 'sell') AS sell_txns | |
FROM indexer.parsed_transactions_optimized | |
WHERE is_swap = 1 | |
GROUP BY mint, hour; | |
/* ---------- Migration bookkeeping ---------- */ | |
INSERT INTO system_migrations (name) | |
VALUES ('006_various_changes'); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment