Created
March 28, 2022 22:46
-
-
Save Rizary/90c1aad97b2d72cc22644e795b4a9786 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pub async fn run_core( | |
connection_pool: ConnectionPool, | |
config: &ZkSyncConfig, | |
eth_gateway: EthereumGateway, | |
) -> anyhow::Result<Vec<JoinHandle<()>>> { | |
let (proposed_blocks_sender, proposed_blocks_receiver) = | |
mpsc::channel(DEFAULT_CHANNEL_CAPACITY); | |
let (state_keeper_req_sender, state_keeper_req_receiver) = | |
mpsc::channel(DEFAULT_CHANNEL_CAPACITY); | |
let (eth_watch_req_sender, eth_watch_req_receiver) = mpsc::channel(DEFAULT_CHANNEL_CAPACITY); | |
let (mempool_tx_request_sender, mempool_tx_request_receiver) = | |
mpsc::channel(DEFAULT_CHANNEL_CAPACITY); | |
let (mempool_block_request_sender, mempool_block_request_receiver) = | |
mpsc::channel(DEFAULT_CHANNEL_CAPACITY); | |
let (processed_tx_events_sender, processed_tx_events_receiver) = | |
mpsc::channel(DEFAULT_CHANNEL_CAPACITY); | |
// Start Ethereum Watcher. | |
let eth_watch_task = start_eth_watch( | |
eth_watch_req_sender.clone(), | |
eth_watch_req_receiver, | |
eth_gateway.clone(), | |
&config.contracts, | |
&config.eth_watch, | |
mempool_tx_request_sender.clone(), | |
); | |
// Insert pending withdrawals into database (if required) | |
let mut storage_processor = connection_pool.access_storage().await?; | |
// Start state keeper and root hash calculator. | |
let state_keeper_init = ZkSyncStateInitParams::restore_from_db( | |
&mut storage_processor, | |
config.chain.state_keeper.fee_account_addr, | |
&config.chain.state_keeper.block_chunk_sizes, | |
) | |
.await; | |
let (mut state_keeper, root_hash_calculator) = ZkSyncStateKeeper::new( | |
state_keeper_init, | |
config.chain.state_keeper.fee_account_addr, | |
state_keeper_req_receiver, | |
proposed_blocks_sender, | |
config.chain.state_keeper.block_chunk_sizes.clone(), | |
config.chain.state_keeper.miniblock_iterations as usize, | |
config.chain.state_keeper.fast_block_miniblock_iterations as usize, | |
processed_tx_events_sender, | |
); | |
let root_hash_queue = state_keeper.root_hash_queue(); | |
// Execute reverted blocks before start | |
state_keeper.execute_reverted_blocks().await; | |
let state_keeper_task = start_state_keeper(state_keeper); | |
let root_hash_calculator_task = start_root_hash_calculator(root_hash_calculator); | |
// Start committer. | |
let committer_task = run_committer( | |
proposed_blocks_receiver, | |
mempool_block_request_sender.clone(), | |
connection_pool.clone(), | |
config.chain.clone(), | |
); | |
// Start mempool. | |
// Andika: This is where mempool task is created to maintain the pool. | |
let mempool_task = run_mempool_tasks( | |
connection_pool.clone(), | |
mempool_tx_request_receiver, | |
mempool_block_request_receiver, | |
4, | |
DEFAULT_CHANNEL_CAPACITY, | |
config.chain.state_keeper.block_chunk_sizes.clone(), | |
); | |
// Start token handler. | |
let token_handler_task = run_token_handler( | |
connection_pool.clone(), | |
eth_gateway.clone(), | |
&config.token_handler, | |
eth_watch_req_sender.clone(), | |
); | |
// Start token handler. | |
let register_factory_task = run_register_factory_handler( | |
connection_pool.clone(), | |
eth_watch_req_sender.clone(), | |
config.token_handler.clone(), | |
); | |
let tx_event_emitter_task = tx_event_emitter::run_tx_event_emitter_task( | |
connection_pool.clone(), | |
processed_tx_events_receiver, | |
); | |
// Start block proposer. | |
// Andika: This is where the block is proposed to be create | |
// Block proposer is main driver of the application, it polls transactions from mempool | |
//and sends them to `StateKeeper` | |
let proposer_task = run_block_proposer_task( | |
config, | |
mempool_block_request_sender.clone(), | |
state_keeper_req_sender.clone(), | |
root_hash_queue, | |
); | |
// Start private API. | |
let private_api_task = | |
start_private_core_api(mempool_tx_request_sender, config.api.private.clone()); | |
let task_futures = vec![ | |
eth_watch_task, | |
state_keeper_task, | |
root_hash_calculator_task, | |
committer_task, | |
mempool_task, | |
proposer_task, | |
token_handler_task, | |
register_factory_task, | |
tx_event_emitter_task, | |
private_api_task, | |
]; | |
Ok(task_futures) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#[must_use] | |
pub fn run_mempool_tasks( | |
db_pool: ConnectionPool, | |
tx_requests: mpsc::Receiver<MempoolTransactionRequest>, | |
block_requests: mpsc::Receiver<MempoolBlocksRequest>, | |
number_of_mempool_transaction_handlers: u8, | |
channel_capacity: usize, | |
block_chunk_sizes: Vec<usize>, | |
) -> JoinHandle<()> { | |
tokio::spawn(async move { | |
// Andika: This is where the first mempool state is created | |
let mempool_state = Arc::new(RwLock::new(MempoolState::restore_from_db(&db_pool).await)); | |
let max_block_size_chunks = *block_chunk_sizes | |
.iter() | |
.max() | |
.expect("failed to find max block chunks size"); | |
let mut tasks = vec![]; | |
// Andika: This is where the balancer and handlers are created to | |
// manage memory pool transaction | |
let (balancer, handlers) = Balancer::new( | |
MempoolTransactionsHandlerBuilder { | |
db_pool: db_pool.clone(), | |
mempool_state: mempool_state.clone(), | |
max_block_size_chunks, | |
}, | |
tx_requests, | |
number_of_mempool_transaction_handlers, | |
channel_capacity, | |
); | |
// Andika: every items in the handlers are then run | |
// using tokio's spawn function | |
for item in handlers.into_iter() { | |
tasks.push(tokio::spawn(item.run())); | |
} | |
// Andika: the same goes to the balancer | |
tasks.push(tokio::spawn(balancer.run())); | |
// Andika: Memory pool block handlers is created based | |
// on the current mempool_state | |
let blocks_handler = MempoolBlocksHandler { | |
mempool_state, | |
requests: block_requests, | |
max_block_size_chunks, | |
}; | |
// Andika: Block handler are then run using tokio spawn function. | |
tasks.push(tokio::spawn(blocks_handler.run())); | |
wait_for_tasks(tasks).await | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment