Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ Client implementation and command-line tool for the Linera blockchain
* `--quorum-grace-period <QUORUM_GRACE_PERIOD>` — An additional delay, after reaching a quorum, to wait for additional validator signatures, as a fraction of time taken to reach quorum

Default value: `0.2`
* `--blob-download-timeout-ms <BLOB_DOWNLOAD_TIMEOUT>` — The delay when downloading a blob, after which we try a second validator, in milliseconds
* `--blob-download-timeout-ms <BLOB_DOWNLOAD_TIMEOUT>` — The maximum time without progress (stream opening or a new blob arriving) when downloading blobs from a validator, after which we try the next validator, in milliseconds

Default value: `1000`
* `--cert-batch-download-timeout-ms <CERTIFICATE_BATCH_DOWNLOAD_TIMEOUT>` — The delay when downloading a batch of certificates, after which we try a second validator, in milliseconds
Expand Down
4 changes: 3 additions & 1 deletion linera-client/src/client_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ pub struct Options {
#[arg(long, default_value_t = DEFAULT_QUORUM_GRACE_PERIOD)]
pub quorum_grace_period: f64,

/// The delay when downloading a blob, after which we try a second validator, in milliseconds.
/// The maximum time without progress (stream opening or a new blob arriving) when
/// downloading blobs from a validator, after which we try the next validator, in
/// milliseconds.
#[arg(
long = "blob-download-timeout-ms",
default_value = "1000",
Expand Down
3 changes: 2 additions & 1 deletion linera-core/src/client/chain_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ pub struct Options {
/// An additional delay, after reaching a quorum, to wait for additional validator signatures,
/// as a fraction of time taken to reach quorum.
pub quorum_grace_period: f64,
/// The delay when downloading a blob, after which we try a second validator.
/// The maximum time without progress (stream opening or a new blob arriving) when
/// downloading blobs from a validator, after which we try the next validator.
pub blob_download_timeout: Duration,
/// The delay when downloading a batch of certificates, after which we try a second validator.
pub certificate_batch_download_timeout: Duration,
Expand Down
200 changes: 154 additions & 46 deletions linera-core/src/client/requests_scheduler/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use std::{
collections::BTreeMap,
collections::{BTreeMap, BTreeSet, HashMap},
future::Future,
sync::{
atomic::{AtomicU32, Ordering},
Expand All @@ -11,15 +11,18 @@ use std::{
};

use custom_debug_derive::Debug;
use futures::stream::{FuturesUnordered, StreamExt};
use futures::stream::StreamExt;
use linera_base::{
crypto::ValidatorPublicKey,
data_types::{Blob, BlobContent, BlockHeight},
identifiers::{BlobId, ChainId},
time::{Duration, Instant},
};
use linera_chain::types::ConfirmedBlockCertificate;
use rand::distributions::{Distribution, WeightedIndex};
use rand::{
distributions::{Distribution, WeightedIndex},
seq::SliceRandom,
};
use tracing::{instrument, warn};

use super::{
Expand Down Expand Up @@ -298,59 +301,164 @@ impl<Env: Environment> RequestsScheduler<Env> {
.await
}

#[instrument(level = "trace", skip_all)]
async fn download_blob(
&self,
peers: &[RemoteNode<Env::ValidatorNode>],
blob_id: BlobId,
timeout: Duration,
) -> Result<Option<Blob>, NodeError> {
let key = RequestKey::Blob(blob_id);
communicate_concurrently(
peers,
async move |peer| {
self.with_peer(key, peer, move |peer| async move {
peer.download_blob(blob_id).await
})
.await
},
|errors| {
for (validator, error) in &errors {
warn!(
%validator,
%blob_id,
%error,
"failed to download blob from validator",
);
}
errors.last().cloned().unwrap()
},
timeout,
)
.await
.map_err(|(_validator, error)| error)
}

/// Downloads the blobs with the given IDs. This is done in one concurrent task per blob.
/// Uses intelligent peer selection based on scores and load balancing.
/// Returns `None` if it couldn't find all blobs.
/// Downloads the blobs with the given IDs using the streaming `download_blobs` RPC.
///
/// Tries peers in shuffled order: each peer is asked for the IDs that are still
/// missing after previous peers' attempts. The remote stream yields blobs in
/// requested order; partial progress is preserved across peers when a stream
/// errors mid-way (e.g. the validator is missing one of the IDs).
///
/// Returns `Ok(None)` if any blob could not be retrieved from any peer.
/// Returns `Err` only if every peer attempt failed before returning a single blob.
#[instrument(level = "trace", skip_all)]
pub async fn download_blobs(
&self,
peers: &[RemoteNode<Env::ValidatorNode>],
blob_ids: &[BlobId],
timeout: Duration,
) -> Result<Option<Vec<Blob>>, NodeError> {
let mut stream = blob_ids
if blob_ids.is_empty() {
return Ok(Some(Vec::new()));
}

// Streaming endpoint aborts if the server echoes a blob ID twice; deduplicate upfront.
let blob_ids: Vec<BlobId> = blob_ids
.iter()
.map(|blob_id| self.download_blob(peers, *blob_id, timeout))
.collect::<FuturesUnordered<_>>();
.copied()
.collect::<BTreeSet<_>>()
.into_iter()
.collect();

let mut shuffled_peers = peers.to_vec();
shuffled_peers.shuffle(&mut rand::thread_rng());

let mut collected: HashMap<BlobId, Blob> = HashMap::with_capacity(blob_ids.len());
let mut last_error: Option<NodeError> = None;

for peer in shuffled_peers {
let pending: Vec<BlobId> = blob_ids
.iter()
.copied()
.filter(|id| !collected.contains_key(id))
.collect();
if pending.is_empty() {
break;
}

self.add_peer(peer.clone()).await;
let address = peer.address();
let (yielded, attempt_error) =
self.stream_blobs_from_peer(peer, &pending, timeout).await;
for blob in yielded {
collected.insert(blob.id(), blob);
}
if let Some(error) = attempt_error {
tracing::debug!(
%address,
pending = pending.len(),
%error,
"streaming download_blobs attempt failed",
);
last_error = Some(error);
}
}

let mut blobs = Vec::new();
while let Some(maybe_blob) = stream.next().await {
blobs.push(maybe_blob?);
if collected.len() == blob_ids.len() {
let blobs = blob_ids
.iter()
.map(|id| collected.remove(id).expect("checked above"))
.collect();
return Ok(Some(blobs));
}
Ok(blobs.into_iter().collect::<Option<Vec<_>>>())
if collected.is_empty() {
if let Some(error) = last_error {
return Err(error);
}
}
Ok(None)
}

/// Streams blobs from a single peer, accepting any blob whose ID is in the
/// pending set. The server is allowed to skip blobs it does not have, so
/// the caller cannot assume a strict 1:1 correspondence with the request.
/// Returns whatever blobs the peer yielded and the error, if any, that
/// ended the stream.
///
/// `timeout` bounds the wait for the stream to open and for each subsequent
/// item — it detects a stalled peer without cancelling a transfer that is
/// still making progress. Updates the peer's EMA metrics; an attempt counts
/// as failed if it errored, stalled, or the peer sent an unexpected blob.
async fn stream_blobs_from_peer(
&self,
peer: RemoteNode<Env::ValidatorNode>,
pending: &[BlobId],
timeout: Duration,
) -> (Vec<Blob>, Option<NodeError>) {
let start_time = Instant::now();
let public_key = peer.public_key;
let address = peer.address();
let mut yielded = Vec::<Blob>::with_capacity(pending.len());
let mut pending_set = pending.iter().copied().collect::<BTreeSet<BlobId>>();
let mut peer_misbehaved = false;
let attempt = async {
let stalled = || NodeError::ClientIoError {
error: format!("download_blobs made no progress for {timeout:?}"),
};
let mut stream =
linera_base::time::timer::timeout(timeout, peer.download_blobs(pending.to_vec()))
.await
.map_err(|_| stalled())??;
loop {
let item = linera_base::time::timer::timeout(timeout, stream.next())
.await
.map_err(|_| stalled())?;
let Some(item) = item else { break };
let content = item?;
let blob = Blob::new(content);
let blob_id = blob.id();
if !pending_set.remove(&blob_id) {
tracing::debug!(
%address,
%blob_id,
"validator sent an unexpected or duplicate blob; aborting stream",
);
peer_misbehaved = true;
break;
}
yielded.push(blob);
}
Ok::<(), NodeError>(())
};

let result = attempt.await;

// Update per-peer EMA metrics; mirror what `track_request` does, but allow
// returning partial results separately from the error.
let response_time_ms = start_time.elapsed().as_millis() as u64;
let is_success = result.is_ok() && !peer_misbehaved;
{
let mut nodes_guard = self.nodes.write().await;
if let Some(info) = nodes_guard.get_mut(&public_key) {
info.update_metrics(is_success, response_time_ms);
}
}
#[cfg(with_metrics)]
{
let validator_name = public_key.to_string();
metrics::VALIDATOR_RESPONSE_TIME
.with_label_values(&[&validator_name])
.observe(response_time_ms as f64);
metrics::VALIDATOR_REQUEST_TOTAL
.with_label_values(&[&validator_name])
.inc();
if is_success {
metrics::VALIDATOR_REQUEST_SUCCESS
.with_label_values(&[&validator_name])
.inc();
}
}

(yielded, result.err())
}

pub async fn download_certificates(
Expand Down
Loading