Skip to content

Instantly share code, notes, and snippets.

@0x009922
Created March 11, 2025 00:11
Show Gist options
  • Save 0x009922/57e53a56be69abfebae66a45d3e4394a to your computer and use it in GitHub Desktop.
Save 0x009922/57e53a56be69abfebae66a45d3e4394a to your computer and use it in GitHub Desktop.
Iroha - sync issue reproduction
use std::{
iter::once,
num::NonZero,
ops::{ControlFlow, RangeInclusive},
sync::Arc,
time::Duration,
};
use assert_matches::assert_matches;
use eyre::{bail, eyre, Result};
use futures_util::{stream::FuturesUnordered, StreamExt};
use iroha::{
client::Client,
data_model::isi::{Register, Unregister},
};
use iroha_config_base::toml::WriteExt;
use iroha_data_model::{
isi::{InstructionBox, Log, SetParameter},
parameter::{BlockParameter, Parameter},
Level,
};
use iroha_test_network::*;
use nonzero_ext::nonzero;
use rand::{prelude::IteratorRandom, seq::SliceRandom, thread_rng};
use tokio::{
spawn,
task::spawn_blocking,
time::{sleep, timeout},
};
struct NetworkHeight {
peers: Vec<Option<u64>>,
}
impl NetworkHeight {
fn new(n_peers: usize) -> Self {
Self {
peers: vec![None; n_peers],
}
}
fn update(&mut self, peer: usize, height: u64) {
self.peers[peer] = Some(height);
}
fn height_range(&self) -> Option<RangeInclusive<u64>> {
match self
.peers
.iter()
.try_fold(None::<(u64, u64)>, |acc, maybe_height| {
if let Some(value) = maybe_height {
let next = match acc {
None => Some((*value, *value)),
Some((min, max)) => Some((min.min(*value), max.max(*value))),
};
ControlFlow::Continue(next)
} else {
ControlFlow::Break(())
}
}) {
ControlFlow::Continue(Some((min, max))) => Some(min..=max),
ControlFlow::Continue(None) => unreachable!(),
ControlFlow::Break(_) => None,
}
}
}
fn dummy_tx(client: &Client) -> Result<()> {
client
.submit(InstructionBox::Log(Log::new(
Level::TRACE,
"dummy".to_owned(),
)))
.map(|_| ())
}
async fn watch_block_gaps(network: Arc<Network>) -> Result<()> {
let (height_tx, mut height_rx) = tokio::sync::mpsc::channel(32);
let peers = network.peers();
assert!(peers.len() > 0);
for (i, peer) in peers.iter().enumerate() {
let tx = height_tx.clone();
let peer = peer.clone();
let _ = spawn(async move {
let mut events = peer.events();
while let Ok(event) = events.recv().await {
if let PeerLifecycleEvent::BlockApplied { height } = event {
if let Err(_) = tx.send((i, height)).await {
break;
}
}
}
});
}
let mut state = NetworkHeight::new(network.peers().len());
while let Some((i, height)) = height_rx.recv().await {
state.update(i, height);
if let Some(range) = state.height_range() {
let min = range.start();
let max = range.end();
if max - min > 4 {
return Err(eyre!("detected a large gap between peers ({max} vs {min})"));
}
}
}
unreachable!()
}
#[tokio::test]
async fn peers_with_full_queue_dont_fall_behind() -> Result<()> {
const PEERS: usize = 4;
const QUEUE_CAP: usize = 5;
const TX_GOSSIP_SIZE: usize = 5;
const TX_GOSSIP_PERIOD: Duration = Duration::from_secs(5);
const BLOCK_GOSSIP_SIZE: usize = 10;
const BLOCK_GOSSIP_PERIOD: Duration = Duration::from_secs(10);
const TXS_PER_BLOCK: NonZero<u64> = nonzero!(1_u64);
const LOAD_TXS_BATCH: usize = 2;
const LOAD_DELAY: Duration = Duration::from_millis(250);
const OBSERVATION_PERIOD: Duration = Duration::from_secs(30);
// TODO: assert that load tps per peer is greater or equal than the gossip tps
let network = NetworkBuilder::new()
.with_peers(PEERS)
.with_default_pipeline_time()
.with_config(|c| {
c.write(["queue", "capacity"], QUEUE_CAP)
.write(["network", "transaction_gossip_size"], TX_GOSSIP_SIZE)
.write(
["network", "transaction_gossip_period_ms"],
TX_GOSSIP_PERIOD.as_millis() as usize,
)
.write(["network", "block_gossip_size"], BLOCK_GOSSIP_SIZE)
.write(
["network", "block_gossip_period_ms"],
BLOCK_GOSSIP_PERIOD.as_millis() as usize,
)
.write(["logger", "level"], "info,iroha_core=trace,iroha_p2p=trace");
})
.with_genesis_instruction(SetParameter::new(Parameter::Block(
BlockParameter::MaxTransactions(TXS_PER_BLOCK),
)))
.start()
.await?;
let clients = network
.peers()
.iter()
.map(|peer| peer.client())
.collect::<Vec<_>>();
let _load_handle = spawn(async move {
let clients_cycle = clients.iter().cycle();
for client in clients_cycle {
let client = client.clone();
spawn_blocking(move || {
for _ in 0..LOAD_TXS_BATCH {
let _ = dummy_tx(&client);
}
});
sleep(LOAD_DELAY).await;
}
});
match timeout(OBSERVATION_PERIOD, watch_block_gaps(Arc::new(network))).await {
Err(_) => Ok(()),
Ok(Err(err)) => bail!(err),
Ok(Ok(())) => unreachable!("watcher is indefinite"),
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment