Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 131 additions & 5 deletions linera-chain/src/block_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::collections::{BTreeMap, BTreeSet};

use custom_debug_derive::Debug;
use futures::channel::mpsc;
#[cfg(with_metrics)]
use linera_base::prometheus_util::MeasureLatency;
use linera_base::{
Expand All @@ -12,11 +13,16 @@ use linera_base::{
identifiers::{AccountOwner, BlobId, ChainId, StreamId},
};
use linera_execution::{
execution_state_actor::ExecutionStateActor, ExecutionRuntimeContext, ExecutionStateView,
Message, MessageContext, MessageKind, OperationContext, OutgoingMessage, ResourceController,
ResourceTracker, SystemExecutionStateView, TransactionOutcome, TransactionTracker,
execution_state_actor::{
ExecutionRequest, ExecutionStateActor, RuntimeChannels, RuntimeCommand,
},
ExecutionRuntimeContext, ExecutionStateView, Message, MessageContext, MessageKind,
OperationContext, OutgoingMessage, ResourceController, ResourceTracker,
SystemExecutionStateView, TransactionOutcome, TransactionTracker,
};
use linera_views::context::Context;
#[cfg(not(web))]
use linera_views::views::View;
use tracing::instrument;

#[cfg(with_metrics)]
Expand Down Expand Up @@ -121,16 +127,119 @@ impl<'resources, 'blobs> BlockExecutionTracker<'resources, 'blobs> {
self.resource_controller_mut()
.track_block_size_of(&incoming_bundle)
.with_execution_context(chain_execution_context)?;

// On native, spawn a bundle-level contract runtime thread so that contract
// instances persist across messages within an incoming bundle.
#[cfg(not(web))]
let (mut runtime_channels, contract_runtime_task) = {
let (command_tx, command_rx) = std::sync::mpsc::channel::<RuntimeCommand>();
let (execution_state_sender, execution_state_receiver) =
futures::channel::mpsc::unbounded();

let allow_application_logs = chain
.context()
.extra()
.execution_runtime_config()
.allow_application_logs;

let runtime_resource_controller = ResourceController::new(
self.resource_controller.policy().clone(),
ResourceTracker::default(),
Amount::ZERO,
);
let rt_chain_id = self.chain_id;
let rt_height = self.block_height;
let rt_timestamp = self.timestamp;
let task = chain
.context()
.extra()
.thread_pool()
.run_send((), move |()| async move {
let runtime =
linera_execution::runtime::ContractSyncRuntime::new_for_bundle(
execution_state_sender,
rt_chain_id,
rt_height,
round,
rt_timestamp,
runtime_resource_controller,
allow_application_logs,
);
runtime.run_bundle_loop(&command_rx)
})
.await;

(Some((command_tx, execution_state_receiver)), task)
};
#[cfg(web)]
let mut runtime_channels: Option<(
std::sync::mpsc::Sender<RuntimeCommand>,
mpsc::UnboundedReceiver<ExecutionRequest>,
)> = None;

for posted_message in incoming_bundle.messages() {
Box::pin(self.execute_message_in_block(
chain,
posted_message,
incoming_bundle,
round,
&mut txn_tracker,
&mut runtime_channels,
))
.await?;
}

// Finalize the bundle-level runtime: signal it to finalize all loaded
// contract instances and shut down.
#[cfg(not(web))]
if let Some((command_tx, mut execution_state_receiver)) = runtime_channels {
use futures::StreamExt as _;

let finalize_context = linera_execution::FinalizeContext {
authenticated_owner: self.authenticated_owner,
chain_id: self.chain_id,
height: self.block_height,
round,
};
let resource_tracker = self.resource_controller.tracker;
command_tx
.send(RuntimeCommand::FinalizeAll {
context: finalize_context,
tracker: Box::new(resource_tracker),
})
.map_err(|_| {
ChainError::InternalError(
"Runtime thread stopped unexpectedly".to_string(),
)
})?;

// Handle remaining state requests (write_batch from finalize) until
// channel closes.
let mut actor =
ExecutionStateActor::new(chain, &mut txn_tracker, self.resource_controller);
while let Some(request) = execution_state_receiver.next().await {
if let ExecutionRequest::ActionComplete { .. } = request {
continue;
}
actor
.handle_request(request)
.await
.map_err(|error| ChainError::InternalError(error.to_string()))?;
}

// Wait for the runtime thread to finish and use its final tracker.
let runtime_resource_controller = contract_runtime_task
.await
.map_err(|error| {
ChainError::InternalError(format!("Runtime thread failed: {error}"))
})?
.map_err(|error| {
ChainError::InternalError(format!(
"Runtime finalization failed: {error}"
))
})?;
self.resource_controller.tracker = runtime_resource_controller.tracker;
}
}
Transaction::ExecuteOperation(operation) => {
self.resource_controller_mut()
Expand Down Expand Up @@ -195,6 +304,10 @@ impl<'resources, 'blobs> BlockExecutionTracker<'resources, 'blobs> {
incoming_bundle: &IncomingBundle,
round: Option<u32>,
txn_tracker: &mut TransactionTracker,
runtime_channels: &mut Option<(
std::sync::mpsc::Sender<RuntimeCommand>,
mpsc::UnboundedReceiver<ExecutionRequest>,
)>,
) -> Result<(), ChainError>
where
C: Context + Clone + 'static,
Expand All @@ -220,8 +333,21 @@ impl<'resources, 'blobs> BlockExecutionTracker<'resources, 'blobs> {
// Once a chain is closed, accepting incoming messages is not allowed.
ensure!(!chain.system.closed.get(), ChainError::ClosedChain);

let mut actor =
ExecutionStateActor::new(chain, txn_tracker, self.resource_controller);
let mut actor = match runtime_channels.as_mut() {
Some((command_tx, execution_state_receiver)) => {
let channels = RuntimeChannels {
command_tx,
execution_state_receiver,
};
ExecutionStateActor::with_runtime(
chain,
txn_tracker,
self.resource_controller,
channels,
)
}
None => ExecutionStateActor::new(chain, txn_tracker, self.resource_controller),
};
Box::pin(actor.execute_message(
context,
posted_message.message.clone(),
Expand Down
Loading
Loading