Skip to content
22 changes: 13 additions & 9 deletions linera-chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,10 @@ where
}
let origins = bundles_by_origin.keys().copied().collect::<Vec<_>>();
let inboxes = self.inboxes.try_load_entries_mut(&origins).await?;
// When the bundles must already be present (block proposals), collect *every* missing
// `(origin, height)` rather than bailing on the first, so the caller can be told the
// full set of cross-chain updates to fetch in a single round-trip.
let mut missing_bundles = Vec::new();
for ((origin, bundles), mut inbox) in bundles_by_origin.into_iter().zip(inboxes) {
tracing::trace!(
"Removing [{}] from inbox for {origin}",
Expand All @@ -679,22 +683,22 @@ where
.remove_bundle(bundle)
.await
.map_err(|error| (chain_id, origin, error))?;
if must_be_present {
ensure!(
was_present,
ChainError::MissingCrossChainUpdate {
chain_id,
origin,
height: bundle.height,
}
);
if must_be_present && !was_present {
missing_bundles.push((origin, bundle.height));

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What could happen here is that a previous message from the same origin is "negative" in the inbox (due to a confirmed block consuming it). Currently, this older message is not reported as missing. (It would use more network data but could arguably make the work of [future] VM-free clients easier.)

Conversely, we are not trying to normalize the reported missing messages to include only the maximum height from each origin. (It would be a small optimization in line with the rest of the code)

}
}
inbox.observe_size_metric();
if inbox.added_bundles.count() == 0 {
self.nonempty_inboxes.get_mut().remove(&origin);
}
}
ensure!(
missing_bundles.is_empty(),
ChainError::MissingCrossChainUpdates {
chain_id,
bundles: missing_bundles,
}
);
Ok(())
}

Expand Down
15 changes: 9 additions & 6 deletions linera-chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,16 @@ pub enum ChainError {
#[error("The chain being queried is not active {0}")]
InactiveChain(ChainId),
#[error(
"Cannot vote for block proposal of chain {chain_id} because a message \
from chain {origin} at height {height} has not been received yet"
"Cannot vote for block proposal of chain {chain_id} because {} cross-chain message \
bundle(s) have not been received yet",
bundles.len()
)]
MissingCrossChainUpdate {
MissingCrossChainUpdates {
chain_id: ChainId,
origin: ChainId,
height: BlockHeight,
/// The missing incoming message bundles, as `(origin chain, height)` pairs that must
/// all be received before this block can be validated. The validator reports every
/// missing bundle at once so the client can fetch them in a single round.
bundles: Vec<(ChainId, BlockHeight)>,
},
#[error(
"Message in block proposed to {chain_id} does not match the previously received messages from \
Expand Down Expand Up @@ -216,7 +219,7 @@ impl ChainError {
| ChainError::RoundDoesNotTimeOut
| ChainError::NotTimedOutYet(_)
| ChainError::CheckpointPreconditionFailed(_)
| ChainError::MissingCrossChainUpdate { .. } => false,
| ChainError::MissingCrossChainUpdates { .. } => false,
ChainError::ViewError(_)
| ChainError::UnexpectedMessage { .. }
| ChainError::InboxGapDetected { .. }
Expand Down
44 changes: 28 additions & 16 deletions linera-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2210,21 +2210,35 @@ impl<Env: Environment> Client<Env> {
continue;
}
}
while let LocalNodeError::WorkerError(WorkerError::ChainError(chain_err)) = &err {
if let ChainError::MissingCrossChainUpdate {
chain_id,
origin,
height,
} = &**chain_err
// The local node reports every missing sender bundle in a single
// `MissingCrossChainUpdates`, so we download them all in one pass and retry once.
if let LocalNodeError::WorkerError(WorkerError::ChainError(chain_err)) = &err {
if let ChainError::MissingCrossChainUpdates { chain_id, bundles } = &**chain_err
{
self.download_sender_block_with_sending_ancestors(
*chain_id,
*origin,
*height,
remote_node,
)
.await?;
// Retry
let chain_id = *chain_id;
// `download_sender_block_with_sending_ancestors` walks each origin's
// message-bearing blocks back from the given height, so the highest missing
// height per origin subsumes the lower ones. Deduplicate to that (also
// ending the borrow of `err` so we can reassign it below), then download the
// independent origins concurrently, bounded by `max_joined_tasks`.
let mut origin_heights: BTreeMap<ChainId, BlockHeight> = BTreeMap::new();
for (origin, height) in bundles {
let entry = origin_heights.entry(*origin).or_insert(*height);
*entry = (*entry).max(*height);
}
stream::iter(origin_heights.into_iter().map(|(origin, height)| {
self.download_sender_block_with_sending_ancestors(
chain_id,
origin,
height,
remote_node,
)
}))
.buffer_unordered(self.options.max_joined_tasks)
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<Result<(), _>>()?;
if let Err(new_err) = self
.local_node
.handle_block_proposal(proposal.clone())
Expand All @@ -2234,8 +2248,6 @@ impl<Env: Environment> Client<Env> {
} else {
continue 'proposal_loop;
}
} else {
break;
}
}

Expand Down
24 changes: 9 additions & 15 deletions linera-core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,13 @@ pub enum NodeError {

// This error must be normalized during conversions.
#[error(
"Cannot vote for block proposal of chain {chain_id} because a message \
from chain {origin} at height {height} has not been received yet"
"Validator is missing {} cross-chain message bundle(s) to validate the block for \
chain {chain_id}",
bundles.len()
)]
MissingCrossChainUpdate {
MissingCrossChainUpdates {
chain_id: ChainId,
origin: ChainId,
height: BlockHeight,
bundles: Vec<(ChainId, BlockHeight)>,
},

#[error("Blobs not found: {0:?}")]
Expand Down Expand Up @@ -382,7 +382,7 @@ impl NodeError {
NodeError::BlobsNotFound(_)
| NodeError::BlocksNotFound(_)
| NodeError::EventsNotFound(_)
| NodeError::MissingCrossChainUpdate { .. }
| NodeError::MissingCrossChainUpdates { .. }
| NodeError::WrongRound(_)
| NodeError::UnexpectedBlockHeight { .. }
| NodeError::InactiveChain(_)
Expand Down Expand Up @@ -473,15 +473,9 @@ impl From<CryptoError> for NodeError {
impl From<ChainError> for NodeError {
fn from(error: ChainError) -> Self {
match error {
ChainError::MissingCrossChainUpdate {
chain_id,
origin,
height,
} => Self::MissingCrossChainUpdate {
chain_id,
origin,
height,
},
ChainError::MissingCrossChainUpdates { chain_id, bundles } => {
Self::MissingCrossChainUpdates { chain_id, bundles }
}
ChainError::InactiveChain(chain_id) => Self::InactiveChain(chain_id),
ChainError::ExecutionError(execution_error, context) => match *execution_error {
ExecutionError::BlobsNotFound(blob_ids) => Self::BlobsNotFound(blob_ids),
Expand Down
155 changes: 155 additions & 0 deletions linera-core/src/unit_tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1132,6 +1132,161 @@ where
Ok(())
}

#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "storage-service", test_case(ServiceStorageBuilder::new(); "storage_service"))]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
#[cfg_attr(feature = "scylladb", test_case(ScyllaDbStorageBuilder::default(); "scylla_db"))]
#[test_log::test(tokio::test)]
async fn test_proposal_batches_missing_cross_chain_update_catch_up<B>(
storage_builder: B,
) -> anyhow::Result<()>
where
B: StorageBuilder,
{
// A block that consumes incoming bundles from several sender chains is proposed to a
// validator that is behind on *all* of those senders. The validator must report every
// missing sender at once (as a single `MissingCrossChainUpdates`) and the client must catch
// them up in a single batch, instead of the one-rejection-and-round-trip-per-sender loop
// that serialized into multi-minute stalls on busy hub chains. This exercises the
// client-side batching/termination logic in `updater.rs`.
let signer = InMemorySigner::new(None);
// Zero default-faulty validators so the only unavailable validators are the ones this test
// deliberately takes offline below; the quorum is still three out of four.
let mut builder = TestBuilder::new(storage_builder, 4, 0, signer).await?;

// Three independent sender chains and one recipient ("hub") chain. The committee has four
// validators with a quorum of three.
let sender1 = builder.add_root_chain(1, Amount::from_tokens(2)).await?;
let sender2 = builder.add_root_chain(2, Amount::from_tokens(2)).await?;
let sender3 = builder.add_root_chain(3, Amount::from_tokens(2)).await?;
let recipient = builder.add_root_chain(4, Amount::ZERO).await?;
let recipient_id = recipient.chain_id();

// Phase A: validator 3 is unavailable while the senders transfer to the recipient. In the
// single-process harness a validator only delivers a cross-chain message to the recipient's
// inbox when it handles the sender's certificate, so validator 3 ends up missing both the
// sender blocks and the resulting bundles in the recipient's inbox. The transfers still
// reach the quorum {0, 1, 2}.
builder.set_fault_type([3], FaultType::OfflineWithInfo);
for sender in [&sender1, &sender2, &sender3] {
sender
.transfer_to_account(
AccountOwner::CHAIN,
Amount::ONE,
Account::chain(recipient_id),
)
.await
.unwrap_ok_committed();
}

// The recipient learns about all three incoming bundles (and preprocesses the sender blocks
// into its local storage) from the honest quorum.
recipient.synchronize_from_validators().await?;

// Phase B: validator 3 comes back, but validator 2 — one of the validators that *does* have
// the sender blocks — goes offline. The recipient's block now needs validator 3's vote to
// reach a quorum {0, 1, 3}, so the updater cannot route around validator 3: it must catch it
// up on every missing sender before validator 3 can accept the proposal.
builder.set_fault_type([3], FaultType::Honest);
builder.set_fault_type([2], FaultType::Offline);

// Consume all three bundles in a single block. Validator 3 rejects the proposal with an
// aggregated `MissingCrossChainUpdates` listing all three senders; the client syncs them in
// one batch and the block is confirmed.
recipient.process_inbox().await?;

assert_eq!(
recipient.local_balance().await.unwrap(),
Amount::from_tokens(3),
);
// A single block consumed all three bundles (height 1, not 3).
assert_eq!(
recipient.chain_info().await?.next_block_height,
BlockHeight::from(1),
);
assert!(recipient.pending_proposal().await.is_none());

// The previously-lagging validator 3 was caught up and voted: the block reached the quorum
// {0, 1, 3} (validator 2 is offline and is skipped by the check).
builder
.check_that_validators_have_certificate(recipient_id, BlockHeight::ZERO, 3)
.await
.unwrap();

Ok(())
}

#[test_case(MemoryStorageBuilder::default(); "memory")]
#[test_log::test(tokio::test)]
async fn test_proposal_catch_up_with_sender_gap<B>(storage_builder: B) -> anyhow::Result<()>
where
B: StorageBuilder,
{
// A lagging validator must be caught up on a sender that has a *gap* from the recipient's
// perspective: sender block 0 messages the recipient, block 1 does not, block 2 does. The
// recipient consumes those two bundles in two separate blocks, so the second proposal only
// references sender block 2 — whose bundle can only be scheduled once block 0 is executed.
let signer = InMemorySigner::new(None);
let mut builder = TestBuilder::new(storage_builder, 4, 0, signer).await?;
let sender = builder.add_root_chain(1, Amount::from_tokens(10)).await?;
let recipient = builder.add_root_chain(2, Amount::ZERO).await?;
let sender_id = sender.chain_id();
let recipient_id = recipient.chain_id();

// Validator 3 is offline while the sender builds a gapped chain and the recipient consumes
// the first bundle.
builder.set_fault_type([3], FaultType::OfflineWithInfo);

// Sender block 0 -> recipient.
sender
.transfer_to_account(
AccountOwner::CHAIN,
Amount::ONE,
Account::chain(recipient_id),
)
.await
.unwrap_ok_committed();
recipient.synchronize_from_validators().await?;
recipient.process_inbox().await?; // recipient block 0 consumes sender block 0's bundle

// Sender block 1 -> itself (no message to the recipient: the gap), block 2 -> recipient.
sender
.transfer_to_account(AccountOwner::CHAIN, Amount::ONE, Account::chain(sender_id))
.await
.unwrap_ok_committed();
sender
.transfer_to_account(
AccountOwner::CHAIN,
Amount::from_tokens(3),
Account::chain(recipient_id),
)
.await
.unwrap_ok_committed();
recipient.synchronize_from_validators().await?;

// Validator 3 comes back; validator 2 goes offline so the recipient's next block needs
// validator 3's vote — the updater must catch it up on the sender across the gap.
builder.set_fault_type([3], FaultType::Honest);
builder.set_fault_type([2], FaultType::Offline);

recipient.process_inbox().await?; // recipient block 1 consumes sender block 2's bundle

assert_eq!(
recipient.local_balance().await.unwrap(),
Amount::from_tokens(4),
);
assert_eq!(
recipient.chain_info().await?.next_block_height,
BlockHeight::from(2),
);
builder
.check_that_validators_have_certificate(recipient_id, BlockHeight::from(1), 3)
.await
.unwrap();

Ok(())
}

#[test_case(MemoryStorageBuilder::default(); "memory")]
#[cfg_attr(feature = "storage-service", test_case(ServiceStorageBuilder::new(); "storage_service"))]
#[cfg_attr(feature = "rocksdb", test_case(RocksDbStorageBuilder::new().await; "rocks_db"))]
Expand Down
Loading
Loading