Created
July 6, 2022 14:36
-
-
Save buffalu/46e0aa9315546f6c22a43b5b784fe90a to your computer and use it in GitHub Desktop.
Bigtable Copy Script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::{ | |
cmp::max, | |
collections::HashMap, | |
sync::{ | |
atomic::{AtomicBool, Ordering}, | |
Arc, Mutex, | |
}, | |
thread::{self, sleep}, | |
time::{Duration, Instant}, | |
}; | |
use clap::Parser; | |
use log::{debug, info, warn}; | |
use solana_sdk::clock::Slot; | |
use solana_transaction_status::{ | |
ConfirmedBlock, TransactionWithStatusMeta, VersionedConfirmedBlock, | |
}; | |
use tokio::task::JoinHandle; | |
#[derive(Parser, Debug)] | |
#[clap(author, version, about, long_about = None)] | |
struct Args { | |
#[clap(long)] | |
bigtable_source_creds_path: String, | |
#[clap(long)] | |
bigtable_dest_creds_path: String, | |
#[clap(long)] | |
lowest_slot: u64, | |
#[clap(long)] | |
highest_slot: u64, | |
/// Dry-run without uploading anything to the destination bigtable | |
#[clap(long)] | |
dry_run: bool, | |
} | |
fn main() { | |
env_logger::init(); | |
let args: Args = Args::parse(); | |
let num_tasks = 128; | |
let lowest_slot: Slot = args.lowest_slot; | |
let highest_slot: Slot = args.highest_slot; | |
let chunk_size = 10; | |
// split each thread into a "task_unit" of N slots to stripe-copy the bigtable | |
let task_unit = max( | |
highest_slot | |
.checked_sub(lowest_slot) | |
.unwrap() | |
.checked_div(num_tasks) | |
.unwrap(), | |
1, | |
); | |
let log_duration = Duration::from_secs(1); | |
let total_blocks_copied = Arc::new(Mutex::new(0_usize)); | |
let exit = Arc::new(AtomicBool::new(false)); | |
let thread = { | |
let total_blocks_copied = total_blocks_copied.clone(); | |
let exit = exit.clone(); | |
thread::spawn(move || { | |
let test_start = Instant::now(); | |
let mut last_update_time = Instant::now(); | |
let mut last_update_count = 0; | |
loop { | |
if exit.load(Ordering::Relaxed) { | |
info!("done printing stats, exiting"); | |
break; | |
} | |
let elapsed = last_update_time.elapsed(); | |
if elapsed > log_duration { | |
let blocks_copied = *total_blocks_copied.lock().unwrap(); | |
let blocks_received = blocks_copied.checked_sub(last_update_count).unwrap(); | |
let recent_block_rate = blocks_received as f64 / elapsed.as_secs_f64(); | |
let total_block_rate = | |
blocks_copied as f64 / test_start.elapsed().as_secs_f64(); | |
info!( | |
"tasks: {}, chunk_size: {}, recent_block_rate: {:.2}, total_blocks_copied: {}, total_elapsed: {:.2}, total blocks/s: {:.2}", | |
num_tasks, | |
chunk_size, | |
recent_block_rate, | |
blocks_copied, | |
test_start.elapsed().as_secs_f64(), | |
total_block_rate | |
); | |
last_update_time = Instant::now(); | |
last_update_count = blocks_copied; | |
} | |
sleep(Duration::from_millis(100)); | |
} | |
}) | |
}; | |
let runtime = tokio::runtime::Runtime::new().unwrap(); | |
let slots: Vec<_> = (lowest_slot..highest_slot).collect(); | |
runtime.block_on(async { | |
let tasks: Vec<JoinHandle<()>> = slots | |
.chunks(task_unit as usize) | |
.enumerate() | |
.map(|(idx, slot_range)| { | |
let slot_range = slot_range.to_vec(); | |
let total_blocks_copied = total_blocks_copied.clone(); | |
let source_bigtable_creds = args.bigtable_source_creds_path.clone(); | |
let dest_bigtable_creds = args.bigtable_dest_creds_path.clone(); | |
let dry_run = args.dry_run.clone(); | |
runtime.spawn(async move { | |
// read-only source bigtable | |
let source_bigtable = solana_storage_bigtable::LedgerStorage::new( | |
true, | |
None, | |
Some(source_bigtable_creds), | |
) | |
.await | |
.expect("connected to source bigtable"); | |
// writeable destination bigtable | |
let dest_bigtable = solana_storage_bigtable::LedgerStorage::new( | |
false, | |
None, | |
Some(dest_bigtable_creds), | |
) | |
.await | |
.expect("connected to source bigtable"); | |
for chunk in slot_range.chunks(chunk_size) { | |
debug!("fetching chunks t_id: {} chunks: {:?}", idx, chunk); | |
let source_slots_blocks: HashMap<Slot, ConfirmedBlock> = source_bigtable | |
.get_confirmed_blocks_with_data(chunk) | |
.await | |
.expect("got blocks") | |
.collect(); | |
let dest_slots_blocks: HashMap<Slot, ConfirmedBlock> = dest_bigtable | |
.get_confirmed_blocks_with_data(chunk) | |
.await | |
.expect("got blocks") | |
.collect(); | |
debug!("source: {:?}", source_slots_blocks); | |
debug!("dest: {:?}", dest_slots_blocks); | |
let slots_blocks_to_upload: Vec<(Slot, VersionedConfirmedBlock)> = | |
source_slots_blocks | |
.into_iter() | |
.filter_map(|(source_slot, source_block)| { | |
let all_source_txs_have_meta = | |
source_block.transactions.iter().all(|tx| { | |
matches!(tx, TransactionWithStatusMeta::Complete(_)) | |
}); | |
if all_source_txs_have_meta { | |
match dest_slots_blocks.get(&source_slot) { | |
None => Some(( | |
source_slot, | |
confirmed_block_to_versioned(source_block)?, | |
)), | |
Some(dest_block) => { | |
let all_dest_txs_have_meta = | |
dest_block.transactions.iter().all(|tx| { | |
matches!( | |
tx, | |
TransactionWithStatusMeta::Complete(_) | |
) | |
}); | |
if !all_dest_txs_have_meta { | |
Some(( | |
source_slot, | |
confirmed_block_to_versioned(source_block)?, | |
)) | |
} else { | |
None | |
} | |
} | |
} | |
} else { | |
debug!( | |
"source bigtable slot {} missing metadata", | |
source_slot | |
); | |
None | |
} | |
}) | |
.collect(); | |
if slots_blocks_to_upload.is_empty() { | |
continue; | |
} | |
let slots_missing: Vec<Slot> = | |
slots_blocks_to_upload.iter().map(|(s, _)| *s).collect(); | |
info!("slots missing meta: {:?}", slots_missing); | |
if !dry_run { | |
let mut slots_results: Vec<_> = slots_blocks_to_upload | |
.into_iter() | |
.map(|(s, b)| { | |
let dest_bigtable = dest_bigtable.clone(); | |
tokio::spawn(async move { | |
dest_bigtable | |
.upload_confirmed_block(s.clone(), b.clone()) | |
.await | |
}) | |
}) | |
.collect(); | |
let results = futures::future::join_all(slots_results).await; | |
} | |
// add to stats | |
{ | |
let mut total_blocks_copied = total_blocks_copied.lock().unwrap(); | |
*total_blocks_copied = total_blocks_copied | |
.checked_add(slots_missing.len()) | |
.unwrap(); | |
} | |
} | |
}) | |
}) | |
.collect(); | |
let mut results = Vec::new(); | |
for t in tasks { | |
let r = t.await.expect("results fetched"); | |
results.push(r); | |
} | |
}); | |
exit.store(true, Ordering::Relaxed); | |
thread.join().unwrap(); | |
} | |
fn confirmed_block_to_versioned(block: ConfirmedBlock) -> Option<VersionedConfirmedBlock> { | |
let num_txs = block.transactions.len(); | |
let txs: Vec<_> = block | |
.transactions | |
.into_iter() | |
.filter_map(|tx| match tx { | |
TransactionWithStatusMeta::MissingMetadata(_) => None, | |
TransactionWithStatusMeta::Complete(tx) => Some(tx), | |
}) | |
.collect(); | |
if txs.len() != num_txs { | |
warn!("yo something fucked"); | |
return None; | |
} | |
Some(VersionedConfirmedBlock { | |
previous_blockhash: block.previous_blockhash, | |
blockhash: block.blockhash, | |
parent_slot: block.parent_slot, | |
transactions: txs, | |
rewards: block.rewards, | |
block_time: block.block_time, | |
block_height: block.block_height, | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment