Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion linera-chain/src/block_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use linera_execution::{
FLAG_FREE_REJECT,
};
use linera_views::context::Context;
use tracing::instrument;
use tracing::{debug, instrument};

#[cfg(with_metrics)]
use crate::chain::metrics;
Expand Down Expand Up @@ -245,6 +245,11 @@ impl<'resources, 'blobs> BlockExecutionTracker<'resources, 'blobs> {
posted_message: Box::new(posted_message.clone()),
}
);
debug!(
chain_id = %self.chain_id,
origin = %incoming_bundle.origin,
"Rejecting incoming message"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now printed whenever we execute a block containing a rejected message.

The other log, in data_types, has similar wording but is where we make the decision to reject a message.

There is at least one more place where we make that decision, I think, which should probably also be logged: When executing a message fails during staging.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you removed this one, but I think there are still other places where we decide to choose Action::Reject, and we should either log at all or none of those places?

);
let mut actor =
ExecutionStateActor::new(chain, txn_tracker, self.resource_controller);
if posted_message.is_tracked() {
Expand Down
6 changes: 5 additions & 1 deletion linera-chain/src/data_types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use linera_base::{
};
use linera_execution::{committee::Committee, Message, MessageKind, Operation, OutgoingMessage};
use serde::{Deserialize, Serialize};
use tracing::instrument;
use tracing::{info, instrument};

use crate::{
block::{Block, ValidatedBlock},
Expand Down Expand Up @@ -308,6 +308,10 @@ impl IncomingBundle {
if self.bundle.is_skippable() {
return None;
} else if !self.bundle.is_protected() {
info!(
origin = %self.origin,
"Rejecting incoming message bundle due to the message policy"
);
self.action = MessageAction::Reject;
}
}
Expand Down
39 changes: 36 additions & 3 deletions linera-core/src/chain_worker/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ mod metrics {

use linera_base::prometheus_util::{
exponential_bucket_interval, exponential_bucket_latencies, register_histogram,
register_histogram_vec,
register_histogram_vec, register_int_counter, register_int_counter_vec,
};
use prometheus::{Histogram, HistogramVec};
use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec};

pub static CREATE_NETWORK_ACTIONS_LATENCY: LazyLock<Histogram> = LazyLock::new(|| {
register_histogram(
Expand All @@ -83,6 +83,21 @@ mod metrics {
exponential_bucket_interval(1.0, 10_000.0),
)
});

pub static BLOCK_PROPOSALS_RECEIVED_TOTAL: LazyLock<IntCounter> = LazyLock::new(|| {
register_int_counter(
"block_proposals_received_total",
"Total number of block proposals received by the worker",
)
});

pub static BLOCK_PROPOSALS_REJECTED_TOTAL: LazyLock<IntCounterVec> = LazyLock::new(|| {
register_int_counter_vec(
"block_proposals_rejected_total",
"Total number of block proposals rejected by the worker, labelled by error type",
&["error_type"],
)
});
}

/// The state of the chain worker.
Expand Down Expand Up @@ -2109,10 +2124,20 @@ where
&mut self,
proposal: BlockProposal,
) -> (Result<ChainInfoResponse, WorkerError>, NetworkActions) {
#[cfg(with_metrics)]
metrics::BLOCK_PROPOSALS_RECEIVED_TOTAL.inc();
let chain_id = proposal.content.block.chain_id;
let height = proposal.content.block.height;
let old_round = self.chain.manager.current_round();
match self.try_handle_block_proposal(proposal).await {
Ok((response, actions)) => (Ok(response), actions),
Err(err) => {
let error_type = err.error_type();
#[cfg(with_metrics)]
metrics::BLOCK_PROPOSALS_REJECTED_TOTAL
.with_label_values(&[error_type.as_str()])
.inc();
debug!(%chain_id, %height, %error_type, "Block proposal rejected");
// Even on error, the manager's `current_round` may have advanced
// (the `HasIncompatibleConfirmedVote` recovery path calls
// `update_signed_proposal`). Surface the resulting `NewRound`
Expand Down Expand Up @@ -2383,7 +2408,8 @@ where
metrics::NUM_INBOXES
.with_label_values(&[])
.observe(origins_and_inboxes.len() as f64);
let action = if *self.chain.execution_state.system.closed.get() {
let is_closed = *self.chain.execution_state.system.closed.get();
let action = if is_closed {
MessageAction::Reject
} else {
MessageAction::Accept
Expand All @@ -2397,6 +2423,13 @@ where
});
}
}
if is_closed && !bundles.is_empty() {
info!(
chain_id = %self.chain.chain_id(),
count = bundles.len(),
"Auto-rejecting all incoming message bundles because the chain is closed"
);
}
info.requested_pending_message_bundles = bundles;
}
let hashes = self
Expand Down
54 changes: 53 additions & 1 deletion linera-core/src/client/chain_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl<Env: Environment> Clone for ChainClient<Env> {
}

/// Error type for [`ChainClient`].
#[derive(Debug, Error)]
#[derive(Debug, Error, strum::IntoStaticStr)]
#[allow(missing_docs)]
pub enum Error {
#[error("Local node operation failed: {0}")]
Expand Down Expand Up @@ -369,6 +369,20 @@ impl Error {
pub fn signer_failure(err: impl signer::Error + 'static) -> Self {
Self::Signer(Box::new(err))
}

/// Returns the qualified error variant name for the `error_type` metric label,
/// delegating to the wrapped error's `error_type()` so the underlying worker or
/// chain error name is surfaced rather than just the outer variant.
pub fn error_type(&self) -> String {
match self {
Error::LocalNodeError(local_node_error) => local_node_error.error_type(),
Error::ChainError(chain_error) => chain_error.error_type(),
other => {
let variant: &'static str = other.into();
format!("ChainClientError::{variant}")
}
}
}
}

impl<Env: Environment> ChainClient<Env> {
Expand Down Expand Up @@ -1458,6 +1472,23 @@ impl<Env: Environment> ChainClient<Env> {
#[cfg(with_metrics)]
let _latency = super::metrics::EXECUTE_BLOCK_LATENCY.measure_latency();

let result = self.try_execute_block(operations, blobs).await;
if let Err(error) = &result {
let error_type = error.error_type();
#[cfg(with_metrics)]
super::metrics::BLOCK_STAGING_FAILURES_TOTAL
.with_label_values(&[error_type.as_str()])
.inc();
info!(chain_id = %self.chain_id, %error_type, "Block staging failed");
}
result
}

async fn try_execute_block(
&self,
operations: Vec<Operation>,
blobs: Vec<Blob>,
) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
let mutex = self.proposal_mutex();
let lock_start = linera_base::time::Instant::now();
let mut proposal_guard = mutex.lock_owned().await;
Expand Down Expand Up @@ -3539,3 +3570,24 @@ impl<Env: Environment> ChainClient<Env> {
.unwrap();
}
}

#[cfg(test)]
mod tests {
use super::{Error, LocalNodeError};

#[test]
fn error_type_delegates_to_local_node_error() {
assert_eq!(
Error::LocalNodeError(LocalNodeError::InvalidChainInfoResponse).error_type(),
"LocalNodeError::InvalidChainInfoResponse"
);
}

#[test]
fn error_type_falls_back_to_chain_client_variant() {
assert_eq!(
Error::WalletSynchronizationError.error_type(),
"ChainClientError::WalletSynchronizationError"
);
}
}
14 changes: 12 additions & 2 deletions linera-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ mod validator_trackers;
mod metrics {
use std::sync::LazyLock;

use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
use prometheus::HistogramVec;
use linera_base::prometheus_util::{
exponential_bucket_latencies, register_histogram_vec, register_int_counter_vec,
};
use prometheus::{HistogramVec, IntCounterVec};

pub static PROCESS_INBOX_WITHOUT_PREPARE_LATENCY: LazyLock<HistogramVec> =
LazyLock::new(|| {
Expand Down Expand Up @@ -125,6 +127,14 @@ mod metrics {
exponential_bucket_latencies(10_000.0),
)
});

pub static BLOCK_STAGING_FAILURES_TOTAL: LazyLock<IntCounterVec> = LazyLock::new(|| {
register_int_counter_vec(
"block_staging_failures_total",
"Total number of client block staging (execute_block) failures, labelled by error type",
&["error_type"],
)
});
}

/// Default number of certificates to download in a single batch.
Expand Down
37 changes: 36 additions & 1 deletion linera-core/src/local_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ where
}

/// Error type for the operations on a local node.
#[derive(Debug, Error)]
#[derive(Debug, Error, strum::IntoStaticStr)]
#[allow(missing_docs)]
pub enum LocalNodeError {
#[error(transparent)]
Expand All @@ -73,6 +73,20 @@ pub enum LocalNodeError {
EventsNotFound(Vec<EventId>),
}

impl LocalNodeError {
/// Returns the qualified error variant name for the `error_type` metric label,
/// delegating to [`WorkerError::error_type`] for wrapped worker errors.
pub fn error_type(&self) -> String {
match self {
LocalNodeError::WorkerError(worker_error) => worker_error.error_type(),
other => {
let variant: &'static str = other.into();
format!("LocalNodeError::{variant}")
}
}
}
}

impl From<WorkerError> for LocalNodeError {
fn from(error: WorkerError) -> Self {
match error {
Expand Down Expand Up @@ -480,3 +494,24 @@ where
Ok(self.node.state.get_manager_seed(chain_id).await?)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn error_type_delegates_to_worker_error() {
assert_eq!(
LocalNodeError::WorkerError(WorkerError::InvalidOwner).error_type(),
"WorkerError::InvalidOwner"
);
}

#[test]
fn error_type_falls_back_to_local_node_variant() {
assert_eq!(
LocalNodeError::InvalidChainInfoResponse.error_type(),
"LocalNodeError::InvalidChainInfoResponse"
);
}
}
Loading