Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ Client implementation and command-line tool for the Linera blockchain
Default value: `3600000`
* `--wait-for-outgoing-messages` — Whether to wait until a quorum of validators has confirmed that all sent cross-chain messages have been delivered
* `--allow-fast-blocks` — Whether to allow creating blocks in the fast round. Fast blocks have lower latency but must be used carefully so that there are never any conflicting fast block proposals
* `--disable-multi-leader-jitter` — Disable the multi-leader jitter delay. By default, when proposing in a multi-leader round with index `>= 1`, the client waits a deterministic delay derived from the owner and round before re-proposing. This spreads out concurrent proposals from honest clients; the owner with the lowest `hash(owner, round)` still proposes immediately
* `--long-lived-services` — (EXPERIMENTAL) Whether application services can persist in some cases between queries
* `--blanket-message-policy <BLANKET_MESSAGE_POLICY>` — The policy for handling incoming messages

Expand Down
142 changes: 133 additions & 9 deletions linera-base/src/ownership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use serde::{Deserialize, Serialize};
use thiserror::Error;

use crate::{
crypto::{BcsHashable, CryptoHash},
data_types::{Round, TimeDelta},
doc_scalar,
identifiers::AccountOwner,
Expand All @@ -39,9 +40,10 @@ pub struct TimeoutConfig {
/// The duration of the fast round.
#[debug(skip_if = Option::is_none)]
pub fast_round_duration: Option<TimeDelta>,
/// The duration of the first single-leader and all multi-leader rounds.
/// The duration of the first multi-leader and single-leader rounds.
pub base_timeout: TimeDelta,
/// The duration by which the timeout increases after each single-leader round.
/// The duration by which the timeout increases after each multi-leader or
/// single-leader round.
pub timeout_increment: TimeDelta,
/// The age of an incoming tracked or protected message after which the validators start
/// transitioning the chain to fallback mode.
Expand Down Expand Up @@ -173,11 +175,7 @@ impl ChainOwnership {
}
match round {
Round::Fast => tc.fast_round_duration,
Round::MultiLeader(r) if r.saturating_add(1) == self.multi_leader_rounds => {
Some(tc.base_timeout)
}
Round::MultiLeader(_) => None,
Round::SingleLeader(r) | Round::Validator(r) => {
Round::MultiLeader(r) | Round::SingleLeader(r) | Round::Validator(r) => {
let increment = tc.timeout_increment.saturating_mul(u64::from(r));
Some(tc.base_timeout.saturating_add(increment))
}
Expand Down Expand Up @@ -229,8 +227,73 @@ impl ChainOwnership {
pub fn is_super_owner_no_regular_owners(&self, owner: &AccountOwner) -> bool {
self.owners.is_empty() && self.super_owners.contains(owner)
}

/// Returns whether `owner` has the lowest `hash(owner, round)` among the eligible
/// multi-leader proposers, and should therefore propose immediately rather than wait
/// out a jitter delay. Returns `false` for `open_multi_leader_rounds`, where the set
/// of proposers is unbounded.
///
/// This is a gentle-clients convention; it is not enforced by the protocol.
fn is_preferred_multi_leader_proposer(&self, owner: &AccountOwner, round_index: u32) -> bool {
if self.open_multi_leader_rounds || !self.can_propose_in_multi_leader_round(owner) {
return false;
}
let our_priority = multi_leader_priority(owner, round_index);
self.all_owners().all(|other| {
other == owner || multi_leader_priority(other, round_index) >= our_priority
})
}

/// Returns the deterministic delay this owner should wait before proposing in `round`,
/// to spread out concurrent proposals from honest clients. The preferred owner returns
/// `TimeDelta::ZERO`; others return `hash(owner, round) mod round_duration`. Returns
/// `None` outside of multi-leader rounds, in the first multi-leader round (where
/// honest clients all attempt to propose immediately), and `Some(ZERO)` if the round
/// has no configured timeout.
pub fn multi_leader_proposal_delay(
&self,
owner: &AccountOwner,
round: Round,
) -> Option<TimeDelta> {
let Round::MultiLeader(round_index) = round else {
return None;
};
if round_index == 0 {
return None;
}
let round_duration = self.round_timeout(round).unwrap_or(TimeDelta::ZERO);
if round_duration == TimeDelta::ZERO
|| self.is_preferred_multi_leader_proposer(owner, round_index)
{
return Some(TimeDelta::ZERO);
}
let priority = multi_leader_priority(owner, round_index);
let prefix = <[u8; 8]>::try_from(&priority.as_bytes().as_slice()[..8])
.expect("hash is at least 8 bytes long");
let hash_u64 = u64::from_le_bytes(prefix);
Some(TimeDelta::from_micros(
hash_u64 % round_duration.as_micros(),
))
}
}

/// Returns the deterministic priority of `owner` in the multi-leader round with the
/// given index. The owner with the lowest priority is preferred to propose first.
fn multi_leader_priority(owner: &AccountOwner, round_index: u32) -> CryptoHash {
CryptoHash::new(&MultiLeaderPriorityInput {
round: round_index,
owner: *owner,
})
}

#[derive(Serialize, Deserialize)]
struct MultiLeaderPriorityInput {
round: u32,
owner: AccountOwner,
}

impl BcsHashable<'_> for MultiLeaderPriorityInput {}

/// Errors that can happen when attempting to manage a chain (close it, change ownership, or
/// change application permissions).
#[derive(Clone, Copy, Debug, Error, WitStore, WitType)]
Expand Down Expand Up @@ -279,11 +342,14 @@ mod tests {
ownership.round_timeout(Round::Fast),
Some(TimeDelta::from_secs(5))
);
assert_eq!(ownership.round_timeout(Round::MultiLeader(8)), None);
assert_eq!(
ownership.round_timeout(Round::MultiLeader(9)),
ownership.round_timeout(Round::MultiLeader(0)),
Some(TimeDelta::from_secs(10))
);
assert_eq!(
ownership.round_timeout(Round::MultiLeader(8)),
Some(TimeDelta::from_secs(18))
);
assert_eq!(
ownership.round_timeout(Round::SingleLeader(0)),
Some(TimeDelta::from_secs(10))
Expand All @@ -297,6 +363,64 @@ mod tests {
Some(TimeDelta::from_secs(18))
);
}

#[test]
fn test_multi_leader_proposal_delay() {
let owner_a = AccountOwner::from(Ed25519SecretKey::generate().public());
let owner_b = AccountOwner::from(Ed25519SecretKey::generate().public());
let owner_c = AccountOwner::from(Ed25519SecretKey::generate().public());
let mut ownership = ChainOwnership::multiple(
[(owner_a, 100), (owner_b, 100), (owner_c, 100)],
10,
TimeoutConfig {
fast_round_duration: None,
base_timeout: TimeDelta::from_secs(10),
timeout_increment: TimeDelta::ZERO,
fallback_duration: TimeDelta::MAX,
},
);

// No jitter in MultiLeader(0): all clients race; lowest-hash recovery kicks in
// only from MultiLeader(1) onwards.
for owner in [owner_a, owner_b, owner_c] {
assert_eq!(
ownership.multi_leader_proposal_delay(&owner, Round::MultiLeader(0)),
None
);
}

// Outside multi-leader rounds, no delay is computed.
assert_eq!(
ownership.multi_leader_proposal_delay(&owner_a, Round::SingleLeader(1)),
None
);

// In MultiLeader(1) exactly one owner is preferred (delay = 0); the others
// get a deterministic, bounded delay.
let delays = [owner_a, owner_b, owner_c].map(|owner| {
ownership
.multi_leader_proposal_delay(&owner, Round::MultiLeader(1))
.expect("delay should be defined in a multi-leader round")
});
let zero_count = delays.iter().filter(|d| **d == TimeDelta::ZERO).count();
assert_eq!(
zero_count, 1,
"exactly one owner should be the preferred proposer"
);
for delay in delays {
assert!(delay < TimeDelta::from_secs(10));
}

// Open multi-leader rounds have no fixed proposer set; nobody is preferred,
// so every owner waits its own deterministic jitter.
ownership.open_multi_leader_rounds = true;
for owner in [owner_a, owner_b, owner_c] {
let delay = ownership
.multi_leader_proposal_delay(&owner, Round::MultiLeader(1))
.expect("delay should be defined in a multi-leader round");
assert!(delay > TimeDelta::ZERO && delay < TimeDelta::from_secs(10));
}
}
}

doc_scalar!(ChainOwnership, "Represents the owner(s) of a chain");
9 changes: 9 additions & 0 deletions linera-client/src/client_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ pub struct Options {
#[arg(long)]
pub allow_fast_blocks: bool,

/// Disable the multi-leader jitter delay. By default, when proposing in a multi-leader
/// round with index `>= 1`, the client waits a deterministic delay derived from the
/// owner and round before re-proposing. This spreads out concurrent proposals from
/// honest clients; the owner with the lowest `hash(owner, round)` still proposes
/// immediately.
#[arg(long)]
pub disable_multi_leader_jitter: bool,

/// (EXPERIMENTAL) Whether application services can persist in some cases between queries.
#[arg(long)]
pub long_lived_services: bool,
Expand Down Expand Up @@ -369,6 +377,7 @@ impl Options {
max_concurrent_batch_downloads: self.max_concurrent_batch_downloads,
max_joined_tasks: self.max_joined_tasks,
allow_fast_blocks: self.allow_fast_blocks,
multi_leader_jitter: !self.disable_multi_leader_jitter,
notification_circuit_breaker_initial_probe_interval: self
.notification_circuit_breaker_initial_probe_interval,
notification_circuit_breaker_max_probe_interval: self
Expand Down
103 changes: 94 additions & 9 deletions linera-core/src/client/chain_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use linera_base::{
crypto::{signer, CryptoHash, Signer, ValidatorPublicKey},
data_types::{
Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
ChainDescription, Epoch, MessagePolicy, Round, Timestamp,
ChainDescription, Epoch, MessagePolicy, Round, TimeDelta, Timestamp,
},
ensure,
identifiers::{
Expand Down Expand Up @@ -120,6 +120,11 @@ pub struct Options {
/// Whether to allow creating blocks in the fast round. Fast blocks have lower latency but
/// must be used carefully so that there are never any conflicting fast block proposals.
pub allow_fast_blocks: bool,
/// Whether to apply the multi-leader jitter delay before proposing in a multi-leader
/// round with index `>= 1`, to spread out concurrent proposals across honest clients.
/// The owner with the lowest `hash(owner, round)` still proposes immediately. The
/// jitter only takes effect when the round has a configured timeout.
pub multi_leader_jitter: bool,
/// Initial probe interval for the notification circuit breaker. When a validator's
/// notification stream exhausts retries, the circuit breaker waits this long before
/// probing again. Doubles on each failed probe.
Expand Down Expand Up @@ -163,6 +168,7 @@ impl Options {
max_concurrent_batch_downloads: DEFAULT_MAX_CONCURRENT_BATCH_DOWNLOADS,
max_joined_tasks: 100,
allow_fast_blocks: false,
multi_leader_jitter: false,
notification_circuit_breaker_initial_probe_interval: Duration::from_secs(300),
notification_circuit_breaker_max_probe_interval: Duration::from_secs(3600),
max_event_stream_queries: DEFAULT_MAX_EVENT_STREAM_QUERIES,
Expand Down Expand Up @@ -1370,7 +1376,7 @@ impl<Env: Environment> ChainClient<Env> {
.await?
{
ClientOutcome::Committed(Some(certificate)) => {
return Ok(ClientOutcome::Conflict(Box::new(certificate)))
return Ok(self.classify_committed(certificate, &operations));
}
ClientOutcome::WaitForTimeout(timeout) => {
return Ok(ClientOutcome::WaitForTimeout(timeout))
Expand All @@ -1384,27 +1390,25 @@ impl<Env: Environment> ChainClient<Env> {
// Collect pending messages and epoch changes after acquiring the lock to avoid
// race conditions where messages valid for one block height are proposed at a
// different height.
let transactions = self.prepend_epochs_messages_and_events(operations).await?;
let transactions = self
.prepend_epochs_messages_and_events(operations.clone())
.await?;

if transactions.is_empty() {
return Err(Error::LocalNodeError(LocalNodeError::WorkerError(
WorkerError::ChainError(Box::new(ChainError::EmptyBlock)),
)));
}

let block = self
.new_pending_block(transactions, blobs, &mut proposal_guard)
self.new_pending_block(transactions, blobs, &mut proposal_guard)
.await?;

match self
.process_pending_block_without_prepare(&mut proposal_guard)
.await?
{
ClientOutcome::Committed(Some(certificate)) if certificate.block() == &block => {
Ok(ClientOutcome::Committed(certificate))
}
ClientOutcome::Committed(Some(certificate)) => {
Ok(ClientOutcome::Conflict(Box::new(certificate)))
Ok(self.classify_committed(certificate, &operations))
}
// Unreachable: We just set the pending proposal in the guard.
ClientOutcome::Committed(None) => {
Expand All @@ -1415,6 +1419,45 @@ impl<Env: Environment> ChainClient<Env> {
}
}

/// Returns `Committed` if the committed block reflects our request — same
/// authenticated owner, and our `operations`. All other transactions must be ones that
/// are added automatically, to process messages, streams or new epochs.
fn classify_committed(
&self,
certificate: ConfirmedBlockCertificate,
operations: &[Operation],

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@afck This is great. It seems like we're progressing towards a model where we have a number of parallel queues (say operations, bundles, events) feeding new transactions inside blocks and we could start tracking the fate of user operations with callbacks (and support cancelation while in the queue).

It seems very reasonable to index operations by (AuthenticatedOwner, Bytes) by the way (and maybe other data that I'm forgetting that can plausibly affect execution). We could decide that applications are responsible for

) -> ClientOutcome<ConfirmedBlockCertificate> {
let block = certificate.block();
if self.preferred_owner.is_none()
|| block.header.authenticated_owner != self.preferred_owner
{
return ClientOutcome::Conflict(Box::new(certificate));
}
let mut operations_iter = operations.iter().peekable();
for tx in &block.body.transactions {
let is_expected = match tx {
Transaction::ReceiveMessages(_) => true,
Transaction::ExecuteOperation(op) if Some(&op) == operations_iter.peek() => {
operations_iter.next();
true
}
Transaction::ExecuteOperation(Operation::System(op)) => matches!(
**op,
SystemOperation::ProcessNewEpoch(_) | SystemOperation::UpdateStream { .. }
),
Transaction::ExecuteOperation(Operation::User { .. }) => false,
};
if !is_expected {
return ClientOutcome::Conflict(Box::new(certificate));
}
}
if operations_iter.next().is_some() {
ClientOutcome::Conflict(Box::new(certificate))
} else {
ClientOutcome::Committed(certificate)
}
}

/// Creates a vector of transactions which, in addition to the provided operations,
/// also contains epoch changes, receiving message bundles and event stream updates
/// (if there are any to be processed).
Expand Down Expand Up @@ -2169,6 +2212,13 @@ impl<Env: Environment> ChainClient<Env> {
.map(|v| (AccountOwner::from(v.account_public_key), v.votes))
.collect();
if manager.should_propose(identity, round, seed, &current_committee) {
if let Some(wait_until) = self.multi_leader_jitter_target(info, identity, round) {
return Ok(Either::Right(RoundTimeout {
timestamp: wait_until,
current_round: round,
next_block_height: info.next_block_height,
}));
}
return Ok(Either::Left(round));
}
if let Some(timeout) = info.round_timeout() {
Expand All @@ -2179,6 +2229,41 @@ impl<Env: Environment> ChainClient<Env> {
))
}

/// Returns the timestamp at which `owner` should propose in `round`, to spread out
/// concurrent proposals from honest clients in a multi-leader round. Returns `None` if
/// the owner should propose immediately (either because the round is not a multi-leader
/// round, the owner is the preferred proposer, or the jitter target is already in the past).
///
/// The delay is deterministic per `(owner, round)` and is anchored at the round's start
/// time when known, so that retrying after an interrupting notification does not extend
/// the wait further.
fn multi_leader_jitter_target(
&self,
info: &ChainInfo,
owner: &AccountOwner,
round: Round,
) -> Option<Timestamp> {
if !self.options.multi_leader_jitter {
return None;
}
let ownership = &info.manager.ownership;
let delay = ownership.multi_leader_proposal_delay(owner, round)?;
if delay == TimeDelta::ZERO {
return None;
}
let now = self.storage_client().clock().current_time();
let round_start = if round == info.manager.current_round {
match (info.manager.round_timeout, ownership.round_timeout(round)) {
(Some(end), Some(duration)) => end.saturating_sub(duration),
_ => now,
}
} else {
now
};
let propose_at = round_start.saturating_add(delay);
(propose_at > now).then_some(propose_at)
Comment thread
deuszx marked this conversation as resolved.
}

/// Clears the information on any operation that previously failed.
#[cfg(with_testing)]
#[instrument(level = "trace")]
Expand Down
Loading
Loading