Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
68f005e
Implement the new scheme for executing Wasm instances.
MathieuDutSik Apr 15, 2026
f74fea6
Unify the execution platform between web and non-web.
MathieuDutSik May 5, 2026
f9947e1
Reformatting.
MathieuDutSik May 5, 2026
54e34fd
Remove the performance test.
MathieuDutSik May 5, 2026
ecc0914
Revert the changes with LoadContract / LoadService.
MathieuDutSik May 5, 2026
83f639d
Some update so that wasmtime pass nicely.
MathieuDutSik May 5, 2026
d475468
Rename the snapshot and related to something VM agnostic.
MathieuDutSik May 5, 2026
11f9fc7
Address clippy issues.
MathieuDutSik May 5, 2026
af3dcb5
Add the serialization of the snapshots.
MathieuDutSik May 5, 2026
f822a42
Rewrite a little the type access, "Value" is ambiguous.
MathieuDutSik May 5, 2026
b0341f6
Add some forgotten new files.
MathieuDutSik May 5, 2026
29f92fe
First step of the changes.
MathieuDutSik May 5, 2026
dc92ed8
Some clippy corrections.
MathieuDutSik May 5, 2026
7885be7
Next step.
MathieuDutSik May 5, 2026
e7581b6
Add the current_snapshots entry.
MathieuDutSik May 5, 2026
394134c
One step.
MathieuDutSik May 5, 2026
d66a4b3
Some update.
MathieuDutSik May 5, 2026
58bbdb2
Next step.
MathieuDutSik May 5, 2026
6bcf4ae
Introduce the next execution scheme.
MathieuDutSik May 5, 2026
8c93543
Change the execution path.
MathieuDutSik May 5, 2026
c3e5490
Add some execution scheme.
MathieuDutSik May 6, 2026
6552057
Some simplification of the comparison code.
MathieuDutSik May 6, 2026
a1c92d6
Make the online test pass.
MathieuDutSik May 6, 2026
f09e068
Reformat.
MathieuDutSik May 6, 2026
5dd5eb8
Some upgrade from clippy corrections.
MathieuDutSik May 6, 2026
dc1cbcc
Implement memory expansion.
MathieuDutSik May 6, 2026
bd90219
Improve the support for the snapshots.
MathieuDutSik May 6, 2026
a34422f
Make the snapshot operations return error instead of using "expect(..…
MathieuDutSik May 6, 2026
e66c241
Switch to a BTreeMap for the snapshot entries.
MathieuDutSik May 6, 2026
b6304b9
Remove the FlashLoanInitial.
MathieuDutSik May 7, 2026
8ba7bb3
Avoid the possible overflow.
MathieuDutSik May 7, 2026
e38ec39
Extend the scope of the test.
MathieuDutSik May 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ crowd-funding = { path = "./examples/crowd-funding" }
ethereum-tracker = { path = "./examples/ethereum-tracker" }
event-emitter = { path = "./linera-sdk/tests/fixtures/event-emitter" }
event-subscriber = { path = "./linera-sdk/tests/fixtures/event-subscriber" }
flash-loan = { path = "./linera-sdk/tests/fixtures/flash-loan" }
fungible = { path = "./examples/fungible" }
hex-game = { path = "./examples/hex-game" }
matching-engine = { path = "./examples/matching-engine" }
Expand Down
2 changes: 2 additions & 0 deletions examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

177 changes: 169 additions & 8 deletions linera-chain/src/block_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,21 @@
use std::collections::{BTreeMap, BTreeSet};

use custom_debug_derive::Debug;
use futures::channel::mpsc;
#[cfg(with_metrics)]
use linera_base::prometheus_util::MeasureLatency;
use linera_base::{
data_types::{Amount, Blob, BlockHeight, Event, OracleResponse, Timestamp},
ensure,
identifiers::{AccountOwner, BlobId, ChainId, StreamId},
identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, StreamId},
};
use linera_execution::{
execution_state_actor::ExecutionStateActor, ExecutionRuntimeContext, ExecutionStateView,
Message, MessageContext, MessageKind, OperationContext, OutgoingMessage, ResourceController,
ResourceTracker, SystemExecutionStateView, TransactionOutcome, TransactionTracker,
execution_state_actor::{
ExecutionRequest, ExecutionStateActor, RuntimeChannels, RuntimeCommand,
},
ExecutionRuntimeContext, ExecutionStateView, Message, MessageContext, MessageKind,
OperationContext, OutgoingMessage, ResourceController, ResourceTracker,
SystemExecutionStateView, TransactionOutcome, TransactionTracker,
};
use linera_views::context::Context;
use tracing::instrument;
Expand Down Expand Up @@ -58,10 +62,28 @@ pub struct BlockExecutionTracker<'resources, 'blobs> {

// Blobs published in the block.
published_blobs: BTreeMap<BlobId, &'blobs Blob>,

/// Command channel sender to the block-level contract runtime thread, or `None`
/// when the snapshot-based per-action path is used (e.g. on web).
pub(crate) command_tx: Option<std::sync::mpsc::Sender<RuntimeCommand>>,
/// Receiver for state requests from the block-level contract runtime thread, or
/// `None` when the snapshot-based per-action path is used.
pub(crate) execution_state_receiver: Option<mpsc::UnboundedReceiver<ExecutionRequest>>,

/// Per-application contract instance snapshots accumulated across actions in
/// the block. Empty when the threaded path is in use; populated by the
/// snapshot-based per-action path. Absence of a key means the contract has not
/// been loaded yet in this block (next worker for it will call `Contract::load`).
#[debug(skip_if = BTreeMap::is_empty)]
pub(crate) block_snapshots: BTreeMap<ApplicationId, Vec<u8>>,
}

impl<'resources, 'blobs> BlockExecutionTracker<'resources, 'blobs> {
/// Creates a new BlockExecutionTracker.
///
/// `runtime_channels` is `Some` for the threaded shared-memory path (a
/// long-lived block-level worker is running) and `None` for the snapshot-based
/// per-action path (each action spawns a fresh worker).
pub fn new(
resource_controller: &'resources mut ResourceController<
Option<AccountOwner>,
Expand All @@ -71,11 +93,20 @@ impl<'resources, 'blobs> BlockExecutionTracker<'resources, 'blobs> {
local_time: Timestamp,
replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
proposal: &ProposedBlock,
runtime_channels: Option<(
std::sync::mpsc::Sender<RuntimeCommand>,
mpsc::UnboundedReceiver<ExecutionRequest>,
)>,
) -> Result<Self, ChainError> {
resource_controller
.track_block_size(EMPTY_BLOCK_SIZE)
.with_execution_context(ChainExecutionContext::Block)?;

let (command_tx, execution_state_receiver) = match runtime_channels {
Some((tx, rx)) => (Some(tx), Some(rx)),
None => (None, None),
};

Ok(Self {
chain_id: proposal.chain_id,
block_height: proposal.height,
Expand All @@ -93,9 +124,26 @@ impl<'resources, 'blobs> BlockExecutionTracker<'resources, 'blobs> {
operation_results: Vec::new(),
transaction_index: 0,
published_blobs,
command_tx,
execution_state_receiver,
block_snapshots: BTreeMap::new(),
})
}

/// Takes ownership of the runtime channels if present, leaving `None` in the
/// tracker. Used before finalization to release the tracker's borrow on
/// `resource_controller`.
pub(crate) fn take_channels(
&mut self,
) -> Option<(
std::sync::mpsc::Sender<RuntimeCommand>,
mpsc::UnboundedReceiver<ExecutionRequest>,
)> {
let command_tx = self.command_tx.take()?;
let execution_state_receiver = self.execution_state_receiver.take()?;
Some((command_tx, execution_state_receiver))
}

/// Executes a transaction in the context of the block.
#[instrument(skip_all, fields(
chain_id = %self.chain_id,
Expand Down Expand Up @@ -147,8 +195,29 @@ impl<'resources, 'blobs> BlockExecutionTracker<'resources, 'blobs> {
authenticated_owner: self.authenticated_owner,
timestamp: self.timestamp,
};
let mut actor =
ExecutionStateActor::new(chain, &mut txn_tracker, self.resource_controller);
let mut actor = match (
self.command_tx.as_ref(),
self.execution_state_receiver.as_mut(),
) {
(Some(command_tx), Some(execution_state_receiver)) => {
let runtime_channels = RuntimeChannels {
command_tx,
execution_state_receiver,
};
ExecutionStateActor::with_runtime(
chain,
&mut txn_tracker,
self.resource_controller,
runtime_channels,
)
}
_ => ExecutionStateActor::with_block_snapshots(
chain,
&mut txn_tracker,
self.resource_controller,
&mut self.block_snapshots,
),
};
Box::pin(actor.execute_operation(context, operation.clone()))
.await
.with_execution_context(chain_execution_context)?;
Expand Down Expand Up @@ -220,8 +289,29 @@ impl<'resources, 'blobs> BlockExecutionTracker<'resources, 'blobs> {
// Once a chain is closed, accepting incoming messages is not allowed.
ensure!(!chain.system.closed.get(), ChainError::ClosedChain);

let mut actor =
ExecutionStateActor::new(chain, txn_tracker, self.resource_controller);
let mut actor = match (
self.command_tx.as_ref(),
self.execution_state_receiver.as_mut(),
) {
(Some(command_tx), Some(execution_state_receiver)) => {
let runtime_channels = RuntimeChannels {
command_tx,
execution_state_receiver,
};
ExecutionStateActor::with_runtime(
chain,
txn_tracker,
self.resource_controller,
runtime_channels,
)
}
_ => ExecutionStateActor::with_block_snapshots(
chain,
txn_tracker,
self.resource_controller,
&mut self.block_snapshots,
),
};
Box::pin(actor.execute_message(
context,
posted_message.message.clone(),
Expand Down Expand Up @@ -435,6 +525,77 @@ impl<'resources, 'blobs> BlockExecutionTracker<'resources, 'blobs> {
self.operation_results.truncate(*operation_results_len);
}

/// Sends a command to the runtime thread and handles state requests until
/// `ActionComplete` is received. No-op when there is no runtime thread (web).
async fn send_runtime_command<C>(
&mut self,
command: RuntimeCommand,
chain: &mut ExecutionStateView<C>,
) -> Result<(), ChainError>
where
C: Context + Clone + 'static,
C::Extra: ExecutionRuntimeContext,
{
use futures::StreamExt as _;

let Some(command_tx) = self.command_tx.as_ref() else {
return Ok(());
};
let Some(execution_state_receiver) = self.execution_state_receiver.as_mut() else {
return Ok(());
};

command_tx.send(command).map_err(|_| {
ChainError::InternalError("Runtime thread stopped unexpectedly".to_string())
})?;

let mut txn_tracker = TransactionTracker::default();
let mut actor = ExecutionStateActor::new(chain, &mut txn_tracker, self.resource_controller);
while let Some(request) = execution_state_receiver.next().await {
if let ExecutionRequest::ActionComplete { .. } = request {
break;
}
actor
.handle_request(request)
.await
.map_err(|error| ChainError::InternalError(error.to_string()))?;
}

Ok(())
}

/// Snapshots the Wasm state of all loaded contract instances on the runtime thread.
///
/// Captures memory and globals of all loaded Wasm instances so they can be
/// restored later with `restore_runtime_snapshots`.
pub async fn snapshot_runtime_instances<C>(
&mut self,
chain: &mut ExecutionStateView<C>,
) -> Result<(), ChainError>
where
C: Context + Clone + 'static,
C::Extra: ExecutionRuntimeContext,
{
self.send_runtime_command(RuntimeCommand::SnapshotAllInstances, chain)
.await
}

/// Restores all loaded contract instances from their Wasm snapshots on the runtime thread.
///
/// Undoes any Wasm-level state changes (memory, globals) that occurred since
/// the last `snapshot_runtime_instances` call.
pub async fn restore_runtime_snapshots<C>(
&mut self,
chain: &mut ExecutionStateView<C>,
) -> Result<(), ChainError>
where
C: Context + Clone + 'static,
C::Extra: ExecutionRuntimeContext,
{
self.send_runtime_command(RuntimeCommand::RestoreAllInstances, chain)
.await
}

/// Finalizes the execution and returns the collected results.
///
/// This method should be called after all transactions have been processed.
Expand Down
Loading
Loading