diff --git a/CLI.md b/CLI.md index 7efc3be8f97b..37571eb45ef7 100644 --- a/CLI.md +++ b/CLI.md @@ -199,7 +199,7 @@ Client implementation and command-line tool for the Linera blockchain * `--quorum-grace-period ` — An additional delay, after reaching a quorum, to wait for additional validator signatures, as a fraction of time taken to reach quorum Default value: `0.2` -* `--blob-download-timeout-ms ` — The delay when downloading a blob, after which we try a second validator, in milliseconds +* `--blob-download-timeout-ms ` — The maximum time without progress (stream opening or a new blob arriving) when downloading blobs from a validator, after which we try the next validator, in milliseconds Default value: `1000` * `--cert-batch-download-timeout-ms ` — The delay when downloading a batch of certificates, after which we try a second validator, in milliseconds diff --git a/linera-client/src/client_options.rs b/linera-client/src/client_options.rs index 30f930e7ddf5..1aac4b131594 100644 --- a/linera-client/src/client_options.rs +++ b/linera-client/src/client_options.rs @@ -207,7 +207,9 @@ pub struct Options { #[arg(long, default_value_t = DEFAULT_QUORUM_GRACE_PERIOD)] pub quorum_grace_period: f64, - /// The delay when downloading a blob, after which we try a second validator, in milliseconds. + /// The maximum time without progress (stream opening or a new blob arriving) when + /// downloading blobs from a validator, after which we try the next validator, in + /// milliseconds. #[arg( long = "blob-download-timeout-ms", default_value = "1000", diff --git a/linera-core/src/client/chain_client/mod.rs b/linera-core/src/client/chain_client/mod.rs index dd3282f6d76f..9c492bb2f384 100644 --- a/linera-core/src/client/chain_client/mod.rs +++ b/linera-core/src/client/chain_client/mod.rs @@ -104,7 +104,8 @@ pub struct Options { /// An additional delay, after reaching a quorum, to wait for additional validator signatures, /// as a fraction of time taken to reach quorum. pub quorum_grace_period: f64, - /// The delay when downloading a blob, after which we try a second validator. + /// The maximum time without progress (stream opening or a new blob arriving) when + /// downloading blobs from a validator, after which we try the next validator. pub blob_download_timeout: Duration, /// The delay when downloading a batch of certificates, after which we try a second validator. pub certificate_batch_download_timeout: Duration, diff --git a/linera-core/src/client/requests_scheduler/scheduler.rs b/linera-core/src/client/requests_scheduler/scheduler.rs index ea10c27d2105..158917269f3c 100644 --- a/linera-core/src/client/requests_scheduler/scheduler.rs +++ b/linera-core/src/client/requests_scheduler/scheduler.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use std::{ - collections::BTreeMap, + collections::{BTreeMap, BTreeSet, HashMap}, future::Future, sync::{ atomic::{AtomicU32, Ordering}, @@ -11,7 +11,7 @@ use std::{ }; use custom_debug_derive::Debug; -use futures::stream::{FuturesUnordered, StreamExt}; +use futures::stream::StreamExt; use linera_base::{ crypto::ValidatorPublicKey, data_types::{Blob, BlobContent, BlockHeight}, @@ -19,7 +19,10 @@ use linera_base::{ time::{Duration, Instant}, }; use linera_chain::types::ConfirmedBlockCertificate; -use rand::distributions::{Distribution, WeightedIndex}; +use rand::{ + distributions::{Distribution, WeightedIndex}, + seq::SliceRandom, +}; use tracing::{instrument, warn}; use super::{ @@ -298,42 +301,15 @@ impl RequestsScheduler { .await } - #[instrument(level = "trace", skip_all)] - async fn download_blob( - &self, - peers: &[RemoteNode], - blob_id: BlobId, - timeout: Duration, - ) -> Result, NodeError> { - let key = RequestKey::Blob(blob_id); - communicate_concurrently( - peers, - async move |peer| { - self.with_peer(key, peer, move |peer| async move { - peer.download_blob(blob_id).await - }) - .await - }, - |errors| { - for (validator, error) in &errors { - warn!( - %validator, - %blob_id, - %error, - "failed to download blob from validator", - ); - } - errors.last().cloned().unwrap() - }, - timeout, - ) - .await - .map_err(|(_validator, error)| error) - } - - /// Downloads the blobs with the given IDs. This is done in one concurrent task per blob. - /// Uses intelligent peer selection based on scores and load balancing. - /// Returns `None` if it couldn't find all blobs. + /// Downloads the blobs with the given IDs using the streaming `download_blobs` RPC. + /// + /// Tries peers in shuffled order: each peer is asked for the IDs that are still + /// missing after previous peers' attempts. The remote stream yields blobs in + /// requested order; partial progress is preserved across peers when a stream + /// errors mid-way (e.g. the validator is missing one of the IDs). + /// + /// Returns `Ok(None)` if any blob could not be retrieved from any peer. + /// Returns `Err` only if every peer attempt failed before returning a single blob. #[instrument(level = "trace", skip_all)] pub async fn download_blobs( &self, @@ -341,16 +317,148 @@ impl RequestsScheduler { blob_ids: &[BlobId], timeout: Duration, ) -> Result>, NodeError> { - let mut stream = blob_ids + if blob_ids.is_empty() { + return Ok(Some(Vec::new())); + } + + // Streaming endpoint aborts if the server echoes a blob ID twice; deduplicate upfront. + let blob_ids: Vec = blob_ids .iter() - .map(|blob_id| self.download_blob(peers, *blob_id, timeout)) - .collect::>(); + .copied() + .collect::>() + .into_iter() + .collect(); + + let mut shuffled_peers = peers.to_vec(); + shuffled_peers.shuffle(&mut rand::thread_rng()); + + let mut collected: HashMap = HashMap::with_capacity(blob_ids.len()); + let mut last_error: Option = None; + + for peer in shuffled_peers { + let pending: Vec = blob_ids + .iter() + .copied() + .filter(|id| !collected.contains_key(id)) + .collect(); + if pending.is_empty() { + break; + } + + self.add_peer(peer.clone()).await; + let address = peer.address(); + let (yielded, attempt_error) = + self.stream_blobs_from_peer(peer, &pending, timeout).await; + for blob in yielded { + collected.insert(blob.id(), blob); + } + if let Some(error) = attempt_error { + tracing::debug!( + %address, + pending = pending.len(), + %error, + "streaming download_blobs attempt failed", + ); + last_error = Some(error); + } + } - let mut blobs = Vec::new(); - while let Some(maybe_blob) = stream.next().await { - blobs.push(maybe_blob?); + if collected.len() == blob_ids.len() { + let blobs = blob_ids + .iter() + .map(|id| collected.remove(id).expect("checked above")) + .collect(); + return Ok(Some(blobs)); } - Ok(blobs.into_iter().collect::>>()) + if collected.is_empty() { + if let Some(error) = last_error { + return Err(error); + } + } + Ok(None) + } + + /// Streams blobs from a single peer, accepting any blob whose ID is in the + /// pending set. The server is allowed to skip blobs it does not have, so + /// the caller cannot assume a strict 1:1 correspondence with the request. + /// Returns whatever blobs the peer yielded and the error, if any, that + /// ended the stream. + /// + /// `timeout` bounds the wait for the stream to open and for each subsequent + /// item — it detects a stalled peer without cancelling a transfer that is + /// still making progress. Updates the peer's EMA metrics; an attempt counts + /// as failed if it errored, stalled, or the peer sent an unexpected blob. + async fn stream_blobs_from_peer( + &self, + peer: RemoteNode, + pending: &[BlobId], + timeout: Duration, + ) -> (Vec, Option) { + let start_time = Instant::now(); + let public_key = peer.public_key; + let address = peer.address(); + let mut yielded = Vec::::with_capacity(pending.len()); + let mut pending_set = pending.iter().copied().collect::>(); + let mut peer_misbehaved = false; + let attempt = async { + let stalled = || NodeError::ClientIoError { + error: format!("download_blobs made no progress for {timeout:?}"), + }; + let mut stream = + linera_base::time::timer::timeout(timeout, peer.download_blobs(pending.to_vec())) + .await + .map_err(|_| stalled())??; + loop { + let item = linera_base::time::timer::timeout(timeout, stream.next()) + .await + .map_err(|_| stalled())?; + let Some(item) = item else { break }; + let content = item?; + let blob = Blob::new(content); + let blob_id = blob.id(); + if !pending_set.remove(&blob_id) { + tracing::debug!( + %address, + %blob_id, + "validator sent an unexpected or duplicate blob; aborting stream", + ); + peer_misbehaved = true; + break; + } + yielded.push(blob); + } + Ok::<(), NodeError>(()) + }; + + let result = attempt.await; + + // Update per-peer EMA metrics; mirror what `track_request` does, but allow + // returning partial results separately from the error. + let response_time_ms = start_time.elapsed().as_millis() as u64; + let is_success = result.is_ok() && !peer_misbehaved; + { + let mut nodes_guard = self.nodes.write().await; + if let Some(info) = nodes_guard.get_mut(&public_key) { + info.update_metrics(is_success, response_time_ms); + } + } + #[cfg(with_metrics)] + { + let validator_name = public_key.to_string(); + metrics::VALIDATOR_RESPONSE_TIME + .with_label_values(&[&validator_name]) + .observe(response_time_ms as f64); + metrics::VALIDATOR_REQUEST_TOTAL + .with_label_values(&[&validator_name]) + .inc(); + if is_success { + metrics::VALIDATOR_REQUEST_SUCCESS + .with_label_values(&[&validator_name]) + .inc(); + } + } + + (yielded, result.err()) } pub async fn download_certificates(