Skip to content

Instantly share code, notes, and snippets.

@Rizary
Created March 28, 2022 22:46
Show Gist options
  • Save Rizary/90c1aad97b2d72cc22644e795b4a9786 to your computer and use it in GitHub Desktop.
Save Rizary/90c1aad97b2d72cc22644e795b4a9786 to your computer and use it in GitHub Desktop.
pub async fn run_core(
connection_pool: ConnectionPool,
config: &ZkSyncConfig,
eth_gateway: EthereumGateway,
) -> anyhow::Result<Vec<JoinHandle<()>>> {
let (proposed_blocks_sender, proposed_blocks_receiver) =
mpsc::channel(DEFAULT_CHANNEL_CAPACITY);
let (state_keeper_req_sender, state_keeper_req_receiver) =
mpsc::channel(DEFAULT_CHANNEL_CAPACITY);
let (eth_watch_req_sender, eth_watch_req_receiver) = mpsc::channel(DEFAULT_CHANNEL_CAPACITY);
let (mempool_tx_request_sender, mempool_tx_request_receiver) =
mpsc::channel(DEFAULT_CHANNEL_CAPACITY);
let (mempool_block_request_sender, mempool_block_request_receiver) =
mpsc::channel(DEFAULT_CHANNEL_CAPACITY);
let (processed_tx_events_sender, processed_tx_events_receiver) =
mpsc::channel(DEFAULT_CHANNEL_CAPACITY);
// Start Ethereum Watcher.
let eth_watch_task = start_eth_watch(
eth_watch_req_sender.clone(),
eth_watch_req_receiver,
eth_gateway.clone(),
&config.contracts,
&config.eth_watch,
mempool_tx_request_sender.clone(),
);
// Insert pending withdrawals into database (if required)
let mut storage_processor = connection_pool.access_storage().await?;
// Start state keeper and root hash calculator.
let state_keeper_init = ZkSyncStateInitParams::restore_from_db(
&mut storage_processor,
config.chain.state_keeper.fee_account_addr,
&config.chain.state_keeper.block_chunk_sizes,
)
.await;
let (mut state_keeper, root_hash_calculator) = ZkSyncStateKeeper::new(
state_keeper_init,
config.chain.state_keeper.fee_account_addr,
state_keeper_req_receiver,
proposed_blocks_sender,
config.chain.state_keeper.block_chunk_sizes.clone(),
config.chain.state_keeper.miniblock_iterations as usize,
config.chain.state_keeper.fast_block_miniblock_iterations as usize,
processed_tx_events_sender,
);
let root_hash_queue = state_keeper.root_hash_queue();
// Execute reverted blocks before start
state_keeper.execute_reverted_blocks().await;
let state_keeper_task = start_state_keeper(state_keeper);
let root_hash_calculator_task = start_root_hash_calculator(root_hash_calculator);
// Start committer.
let committer_task = run_committer(
proposed_blocks_receiver,
mempool_block_request_sender.clone(),
connection_pool.clone(),
config.chain.clone(),
);
// Start mempool.
// Andika: This is where mempool task is created to maintain the pool.
let mempool_task = run_mempool_tasks(
connection_pool.clone(),
mempool_tx_request_receiver,
mempool_block_request_receiver,
4,
DEFAULT_CHANNEL_CAPACITY,
config.chain.state_keeper.block_chunk_sizes.clone(),
);
// Start token handler.
let token_handler_task = run_token_handler(
connection_pool.clone(),
eth_gateway.clone(),
&config.token_handler,
eth_watch_req_sender.clone(),
);
// Start token handler.
let register_factory_task = run_register_factory_handler(
connection_pool.clone(),
eth_watch_req_sender.clone(),
config.token_handler.clone(),
);
let tx_event_emitter_task = tx_event_emitter::run_tx_event_emitter_task(
connection_pool.clone(),
processed_tx_events_receiver,
);
// Start block proposer.
// Andika: This is where the block is proposed to be create
// Block proposer is main driver of the application, it polls transactions from mempool
//and sends them to `StateKeeper`
let proposer_task = run_block_proposer_task(
config,
mempool_block_request_sender.clone(),
state_keeper_req_sender.clone(),
root_hash_queue,
);
// Start private API.
let private_api_task =
start_private_core_api(mempool_tx_request_sender, config.api.private.clone());
let task_futures = vec![
eth_watch_task,
state_keeper_task,
root_hash_calculator_task,
committer_task,
mempool_task,
proposer_task,
token_handler_task,
register_factory_task,
tx_event_emitter_task,
private_api_task,
];
Ok(task_futures)
}
#[must_use]
pub fn run_mempool_tasks(
db_pool: ConnectionPool,
tx_requests: mpsc::Receiver<MempoolTransactionRequest>,
block_requests: mpsc::Receiver<MempoolBlocksRequest>,
number_of_mempool_transaction_handlers: u8,
channel_capacity: usize,
block_chunk_sizes: Vec<usize>,
) -> JoinHandle<()> {
tokio::spawn(async move {
// Andika: This is where the first mempool state is created
let mempool_state = Arc::new(RwLock::new(MempoolState::restore_from_db(&db_pool).await));
let max_block_size_chunks = *block_chunk_sizes
.iter()
.max()
.expect("failed to find max block chunks size");
let mut tasks = vec![];
// Andika: This is where the balancer and handlers are created to
// manage memory pool transaction
let (balancer, handlers) = Balancer::new(
MempoolTransactionsHandlerBuilder {
db_pool: db_pool.clone(),
mempool_state: mempool_state.clone(),
max_block_size_chunks,
},
tx_requests,
number_of_mempool_transaction_handlers,
channel_capacity,
);
// Andika: every items in the handlers are then run
// using tokio's spawn function
for item in handlers.into_iter() {
tasks.push(tokio::spawn(item.run()));
}
// Andika: the same goes to the balancer
tasks.push(tokio::spawn(balancer.run()));
// Andika: Memory pool block handlers is created based
// on the current mempool_state
let blocks_handler = MempoolBlocksHandler {
mempool_state,
requests: block_requests,
max_block_size_chunks,
};
// Andika: Block handler are then run using tokio spawn function.
tasks.push(tokio::spawn(blocks_handler.run()));
wait_for_tasks(tasks).await
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment