Created
March 11, 2025 00:11
-
-
Save 0x009922/57e53a56be69abfebae66a45d3e4394a to your computer and use it in GitHub Desktop.
Iroha - sync issue reproduction
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::{ | |
iter::once, | |
num::NonZero, | |
ops::{ControlFlow, RangeInclusive}, | |
sync::Arc, | |
time::Duration, | |
}; | |
use assert_matches::assert_matches; | |
use eyre::{bail, eyre, Result}; | |
use futures_util::{stream::FuturesUnordered, StreamExt}; | |
use iroha::{ | |
client::Client, | |
data_model::isi::{Register, Unregister}, | |
}; | |
use iroha_config_base::toml::WriteExt; | |
use iroha_data_model::{ | |
isi::{InstructionBox, Log, SetParameter}, | |
parameter::{BlockParameter, Parameter}, | |
Level, | |
}; | |
use iroha_test_network::*; | |
use nonzero_ext::nonzero; | |
use rand::{prelude::IteratorRandom, seq::SliceRandom, thread_rng}; | |
use tokio::{ | |
spawn, | |
task::spawn_blocking, | |
time::{sleep, timeout}, | |
}; | |
struct NetworkHeight { | |
peers: Vec<Option<u64>>, | |
} | |
impl NetworkHeight { | |
fn new(n_peers: usize) -> Self { | |
Self { | |
peers: vec![None; n_peers], | |
} | |
} | |
fn update(&mut self, peer: usize, height: u64) { | |
self.peers[peer] = Some(height); | |
} | |
fn height_range(&self) -> Option<RangeInclusive<u64>> { | |
match self | |
.peers | |
.iter() | |
.try_fold(None::<(u64, u64)>, |acc, maybe_height| { | |
if let Some(value) = maybe_height { | |
let next = match acc { | |
None => Some((*value, *value)), | |
Some((min, max)) => Some((min.min(*value), max.max(*value))), | |
}; | |
ControlFlow::Continue(next) | |
} else { | |
ControlFlow::Break(()) | |
} | |
}) { | |
ControlFlow::Continue(Some((min, max))) => Some(min..=max), | |
ControlFlow::Continue(None) => unreachable!(), | |
ControlFlow::Break(_) => None, | |
} | |
} | |
} | |
fn dummy_tx(client: &Client) -> Result<()> { | |
client | |
.submit(InstructionBox::Log(Log::new( | |
Level::TRACE, | |
"dummy".to_owned(), | |
))) | |
.map(|_| ()) | |
} | |
async fn watch_block_gaps(network: Arc<Network>) -> Result<()> { | |
let (height_tx, mut height_rx) = tokio::sync::mpsc::channel(32); | |
let peers = network.peers(); | |
assert!(peers.len() > 0); | |
for (i, peer) in peers.iter().enumerate() { | |
let tx = height_tx.clone(); | |
let peer = peer.clone(); | |
let _ = spawn(async move { | |
let mut events = peer.events(); | |
while let Ok(event) = events.recv().await { | |
if let PeerLifecycleEvent::BlockApplied { height } = event { | |
if let Err(_) = tx.send((i, height)).await { | |
break; | |
} | |
} | |
} | |
}); | |
} | |
let mut state = NetworkHeight::new(network.peers().len()); | |
while let Some((i, height)) = height_rx.recv().await { | |
state.update(i, height); | |
if let Some(range) = state.height_range() { | |
let min = range.start(); | |
let max = range.end(); | |
if max - min > 4 { | |
return Err(eyre!("detected a large gap between peers ({max} vs {min})")); | |
} | |
} | |
} | |
unreachable!() | |
} | |
#[tokio::test] | |
async fn peers_with_full_queue_dont_fall_behind() -> Result<()> { | |
const PEERS: usize = 4; | |
const QUEUE_CAP: usize = 5; | |
const TX_GOSSIP_SIZE: usize = 5; | |
const TX_GOSSIP_PERIOD: Duration = Duration::from_secs(5); | |
const BLOCK_GOSSIP_SIZE: usize = 10; | |
const BLOCK_GOSSIP_PERIOD: Duration = Duration::from_secs(10); | |
const TXS_PER_BLOCK: NonZero<u64> = nonzero!(1_u64); | |
const LOAD_TXS_BATCH: usize = 2; | |
const LOAD_DELAY: Duration = Duration::from_millis(250); | |
const OBSERVATION_PERIOD: Duration = Duration::from_secs(30); | |
// TODO: assert that load tps per peer is greater or equal than the gossip tps | |
let network = NetworkBuilder::new() | |
.with_peers(PEERS) | |
.with_default_pipeline_time() | |
.with_config(|c| { | |
c.write(["queue", "capacity"], QUEUE_CAP) | |
.write(["network", "transaction_gossip_size"], TX_GOSSIP_SIZE) | |
.write( | |
["network", "transaction_gossip_period_ms"], | |
TX_GOSSIP_PERIOD.as_millis() as usize, | |
) | |
.write(["network", "block_gossip_size"], BLOCK_GOSSIP_SIZE) | |
.write( | |
["network", "block_gossip_period_ms"], | |
BLOCK_GOSSIP_PERIOD.as_millis() as usize, | |
) | |
.write(["logger", "level"], "info,iroha_core=trace,iroha_p2p=trace"); | |
}) | |
.with_genesis_instruction(SetParameter::new(Parameter::Block( | |
BlockParameter::MaxTransactions(TXS_PER_BLOCK), | |
))) | |
.start() | |
.await?; | |
let clients = network | |
.peers() | |
.iter() | |
.map(|peer| peer.client()) | |
.collect::<Vec<_>>(); | |
let _load_handle = spawn(async move { | |
let clients_cycle = clients.iter().cycle(); | |
for client in clients_cycle { | |
let client = client.clone(); | |
spawn_blocking(move || { | |
for _ in 0..LOAD_TXS_BATCH { | |
let _ = dummy_tx(&client); | |
} | |
}); | |
sleep(LOAD_DELAY).await; | |
} | |
}); | |
match timeout(OBSERVATION_PERIOD, watch_block_gaps(Arc::new(network))).await { | |
Err(_) => Ok(()), | |
Ok(Err(err)) => bail!(err), | |
Ok(Ok(())) => unreachable!("watcher is indefinite"), | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment