diff --git a/core/blockchain.go b/core/blockchain.go index d6edd9013307..1b24dd9d1504 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2111,82 +2111,122 @@ type ExecuteConfig struct { EnableWitnessStats bool } -// ProcessBlock executes and validates the given block. If there was no error -// it writes the block and associated state to database. -func (bc *BlockChain) ProcessBlock(ctx context.Context, parentRoot common.Hash, block *types.Block, config ExecuteConfig) (result *blockProcessingResult, blockEndErr error) { - var ( - err error - startTime = time.Now() - statedb *state.StateDB - interrupt atomic.Bool - sdb state.Database - ) - defer interrupt.Store(true) // terminate the prefetch at the end +// useBALExecution reports whether the block will be executed through the +// BAL-driven parallel processor. +func (bc *BlockChain) useBALExecution(block *types.Block, wantWitness bool) bool { + return supportsParallelExecution(block.AccessList(), bc.chainConfig, block.Number(), block.Time(), wantWitness, bc.cfg.VmConfig.Tracer != nil) +} +// setupExecutionState builds the state instance that block execution reads from +// and writes to. +// +// - BAL-driven parallel execution (Amsterdam blocks carrying an access list): +// a single reader(the underlying state reader wrapped with a shared cache +// and an access-list-hint prefetcher) feeds both the canonical state and +// every per-transaction state built on top of it. +// +// - Sequential execution with prefetching: the main processor and a +// speculative whole-block prefetcher share one cached reader. +// +// - No prefetching: a plain reader, with a no-op cleanup. +func (bc *BlockChain) setupExecutionState(parentRoot common.Hash, block *types.Block, config ExecuteConfig, interrupt *atomic.Bool) (*state.StateDB, func(*blockProcessingResult), error) { + noop := func(*blockProcessingResult) {} + + var sdb state.Database if bc.chainConfig.IsUBT(block.Number(), block.Time()) { sdb = state.NewUBTDatabase(bc.triedb, bc.codedb) } else { sdb = state.NewMPTDatabase(bc.triedb, bc.codedb).WithSnapshot(bc.snaps) } - // If prefetching is enabled, run that against the current state to pre-cache - // transactions and probabilistically some of the account/storage trie nodes. - // - // Note: the main processor and prefetcher share the same reader with a local - // cache for mitigating the overhead of state access. type prewarmReader interface { // ReadersWithCacheStats creates a pair of state readers that share the // same underlying state reader and internal state cache, while maintaining // separate statistics respectively. ReadersWithCacheStats(stateRoot common.Hash) (state.Reader, state.Reader, error) } - warmer, ok := sdb.(prewarmReader) + wantWitness := config.StatelessSelfValidation || config.MakeWitness - if bc.cfg.NoPrefetch || !ok { - statedb, err = state.New(parentRoot, sdb) + switch warmer, ok := sdb.(prewarmReader); { + case bc.useBALExecution(block, wantWitness): + base, err := sdb.Reader(parentRoot) if err != nil { - return nil, err + return nil, nil, err } - } else { - // If prefetching is enabled, run that against the current state to pre-cache - // transactions and probabilistically some of the account/storage trie nodes. - // - // Note: the main processor and prefetcher share the same reader with a local - // cache for mitigating the overhead of state access. + reader, stop := state.NewBlockExecutionReader(base, prefetchHint(block.AccessList()), runtime.NumCPU()) + statedb, err := state.NewWithReader(parentRoot, sdb, reader) + if err != nil { + stop() + return nil, nil, err + } + return statedb, func(*blockProcessingResult) { stop() }, nil + + case bc.cfg.NoPrefetch || !ok: + statedb, err := state.New(parentRoot, sdb) + if err != nil { + return nil, nil, err + } + return statedb, noop, nil + + default: + // The main processor and the speculative prefetcher share the same reader + // with a local cache for mitigating the overhead of state access. prefetch, process, err := warmer.ReadersWithCacheStats(parentRoot) if err != nil { - return nil, err + return nil, nil, err } throwaway, err := state.NewWithReader(parentRoot, sdb, prefetch) if err != nil { - return nil, err + return nil, nil, err } - statedb, err = state.NewWithReader(parentRoot, sdb, process) + statedb, err := state.NewWithReader(parentRoot, sdb, process) if err != nil { - return nil, err + return nil, nil, err } - // Upload the statistics of reader at the end - defer func() { - if result != nil { - if stater, ok := prefetch.(state.ReaderStater); ok { - result.stats.StatePrefetchCacheStats = stater.GetStats() - } - if stater, ok := process.(state.ReaderStater); ok { - result.stats.StateReadCacheStats = stater.GetStats() - } - } - }() - go func(start time.Time, throwaway *state.StateDB, block *types.Block) { + go func(start time.Time) { // Disable tracing for prefetcher executions. vmCfg := bc.cfg.VmConfig vmCfg.Tracer = nil - bc.prefetcher.Prefetch(block, throwaway, bc.jumpDestCache, vmCfg, &interrupt) + bc.prefetcher.Prefetch(block, throwaway, bc.jumpDestCache, vmCfg, interrupt) blockPrefetchExecuteTimer.Update(time.Since(start)) if interrupt.Load() { blockPrefetchInterruptMeter.Mark(1) } - }(time.Now(), throwaway, block) + }(time.Now()) + + return statedb, func(result *blockProcessingResult) { + // Upload the statistics of reader at the end. + if result == nil { + return + } + if stater, ok := prefetch.(state.ReaderStater); ok { + result.stats.StatePrefetchCacheStats = stater.GetStats() + } + if stater, ok := process.(state.ReaderStater); ok { + result.stats.StateReadCacheStats = stater.GetStats() + } + }, nil } +} + +// ProcessBlock executes and validates the given block. If there was no error +// it writes the block and associated state to database. +func (bc *BlockChain) ProcessBlock(ctx context.Context, parentRoot common.Hash, block *types.Block, config ExecuteConfig) (result *blockProcessingResult, blockEndErr error) { + var ( + err error + startTime = time.Now() + statedb *state.StateDB + interrupt atomic.Bool + ) + defer interrupt.Store(true) // terminate the prefetch at the end + + // Set up the state reader feeding execution, along with a cleanup to run once + // processing is complete (stop the prefetcher, upload reader statistics). + statedb, cleanup, err := bc.setupExecutionState(parentRoot, block, config, &interrupt) + if err != nil { + return nil, err + } + defer func() { cleanup(result) }() // If we are past Byzantium, enable prefetching to pull in trie node paths // while processing transactions. Before Byzantium the prefetcher is mostly @@ -2201,9 +2241,14 @@ func (bc *BlockChain) ProcessBlock(ctx context.Context, parentRoot common.Hash, if err != nil { return nil, err } + defer witness.ReportMetrics(block.NumberU64()) + } + // In BAL-driven parallel execution, state hashing is performed independently + // with the transaction execution, disable the trie prefetcher explicitly. + if !bc.useBALExecution(block, witness != nil) { + statedb.StartPrefetcher("chain", witness) + defer statedb.StopPrefetcher() } - statedb.StartPrefetcher("chain", witness) - defer statedb.StopPrefetcher() } // Instrument the blockchain tracing @@ -2328,10 +2373,6 @@ func (bc *BlockChain) ProcessBlock(ctx context.Context, parentRoot common.Hash, stats.DatabaseCommit = statedb.DatabaseCommits // Database commits are complete, we can mark them stats.BlockWrite = time.Since(wstart) - max(statedb.AccountCommits, statedb.StorageCommits) /* concurrent */ - statedb.DatabaseCommits } - // Report the collected witness statistics - if witness != nil { - witness.ReportMetrics(block.NumberU64()) - } elapsed := time.Since(startTime) + 1 // prevent zero division stats.TotalTime = elapsed stats.MgasPerSecond = float64(res.GasUsed) * 1000 / float64(elapsed) diff --git a/core/gaspool.go b/core/gaspool.go index 83420c764010..dbd0c1a49b0b 100644 --- a/core/gaspool.go +++ b/core/gaspool.go @@ -109,6 +109,19 @@ func (gp *GasPool) CumulativeUsed() uint64 { return gp.cumulativeUsed } +// CumulativeRegular returns the cumulative regular-dimension gas consumed +// (EIP-8037). It is used to derive the block gas used when transactions are +// charged against independent pools during parallel execution. +func (gp *GasPool) CumulativeRegular() uint64 { + return gp.cumulativeRegular +} + +// CumulativeState returns the cumulative state-dimension gas consumed +// (EIP-8037). See CumulativeRegular for the rationale. +func (gp *GasPool) CumulativeState() uint64 { + return gp.cumulativeState +} + // Used returns the amount of consumed gas. func (gp *GasPool) Used() uint64 { // After 8037, return max(sum_regular, sum_state) diff --git a/core/state/reader_eip_7928.go b/core/state/reader_eip_7928.go index ff315ac5eb6e..8dcf063bb59a 100644 --- a/core/state/reader_eip_7928.go +++ b/core/state/reader_eip_7928.go @@ -22,6 +22,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types/bal" + "github.com/ethereum/go-ethereum/crypto" ) // The EIP27928 reader utilizes a hierarchical architecture to optimize state @@ -86,7 +87,6 @@ type prefetchStateReader struct { closeOnce sync.Once } -// nolint:unused func newPrefetchStateReader(reader StateReader, accessList map[common.Address][]common.Hash, nThreads int) *prefetchStateReader { tasks := make([]*fetchTask, 0, len(accessList)) for addr, slots := range accessList { @@ -193,13 +193,28 @@ func (r *prefetchStateReader) process(start, limit int) { } } +// NewBlockExecutionReader wraps base with a shared, concurrency-safe cache so +// that any state resolved once, whether by the background prefetcher or by +// transaction execution, is not fetched from the underlying reader again. +func NewBlockExecutionReader(base Reader, prefetch map[common.Address][]common.Hash, threads int) (Reader, func()) { + var ( + cache = newStateReaderWithCache(base) + stop = func() {} + ) + if len(prefetch) > 0 && threads > 0 { + pf := newPrefetchStateReader(cache, prefetch, threads) + stop = pf.Close + } + return newReader(base, newStateReaderWithStats(cache)), stop +} + // ReaderWithBlockLevelAccessList provides state access that reflects the // pre-transition state combined with the mutations made by transactions // prior to TxIndex. type ReaderWithBlockLevelAccessList struct { Reader - AccessList *bal.ConstructionBlockAccessList - TxIndex int + lookup *bal.Lookup + txIndex uint32 } // NewReaderWithBlockLevelAccessList constructs a reader for accessing states @@ -209,39 +224,85 @@ type ReaderWithBlockLevelAccessList struct { // - 0 for pre‑execution system contract calls. // - 1 … n for transactions (in block order). // - n + 1 for post‑execution system contract calls. -func NewReaderWithBlockLevelAccessList(base Reader, accessList *bal.ConstructionBlockAccessList, txIndex int) *ReaderWithBlockLevelAccessList { +func NewReaderWithBlockLevelAccessList(base Reader, lookup *bal.Lookup, txIndex int) *ReaderWithBlockLevelAccessList { return &ReaderWithBlockLevelAccessList{ - Reader: base, - AccessList: accessList, - TxIndex: txIndex, + Reader: base, + lookup: lookup, + txIndex: uint32(txIndex), } } // Account implements Reader, returning the account with the specific address. +// +// The returned account reflects the pre-transition state overlaid with all +// mutations made by call frames prior to the reader's TxIndex. func (r *ReaderWithBlockLevelAccessList) Account(addr common.Address) (*types.StateAccount, error) { - panic("implement me") + base, err := r.Reader.Account(addr) + if err != nil { + return nil, err + } + balance, nonce, code, hasBalance, hasNonce, hasCode := r.lookup.AccountChanges(addr, r.txIndex) + + // No mutation precedes the current call frame, return the base account as is. + if !hasBalance && !hasNonce && !hasCode { + return base, nil + } + // Overlay the mutations on top of a copy of the base account. The base + // account must not be mutated in place: with a shared cache in front of the + // underlying reader, the same instance is handed to concurrent readers. + account := types.NewEmptyStateAccount() + if base != nil { + account = base.Copy() + } + if hasBalance { + account.Balance = balance.Clone() + } + if hasNonce { + account.Nonce = nonce + } + if hasCode { + if len(code) == 0 { + account.CodeHash = types.EmptyCodeHash.Bytes() + } else { + account.CodeHash = crypto.Keccak256(code) + } + } + return account, nil } // Storage implements Reader, returning the storage slot with the specific // address and slot key. func (r *ReaderWithBlockLevelAccessList) Storage(addr common.Address, slot common.Hash) (common.Hash, error) { - panic("implement me") + if value, ok := r.lookup.Storage(addr, slot, r.txIndex); ok { + return value, nil + } + return r.Reader.Storage(addr, slot) } // Has implements Reader, returning the flag indicating whether the contract // code with specified address and hash exists or not. func (r *ReaderWithBlockLevelAccessList) Has(addr common.Address, codeHash common.Hash) bool { - panic("implement me") + if _, ok := r.lookup.Code(addr, r.txIndex); ok { + return true + } + return r.Reader.Has(addr, codeHash) } // Code implements Reader, returning the contract code with specified address -// and hash. -func (r *ReaderWithBlockLevelAccessList) Code(addr common.Address, codeHash common.Hash) ([]byte, error) { - panic("implement me") +// and hash. Code created earlier in the block (and therefore absent from the +// pre-transition state) is served directly from the access list. +func (r *ReaderWithBlockLevelAccessList) Code(addr common.Address, codeHash common.Hash) []byte { + if code, ok := r.lookup.Code(addr, r.txIndex); ok { + return code + } + return r.Reader.Code(addr, codeHash) } // CodeSize implements Reader, returning the contract code size with specified // address and hash. -func (r *ReaderWithBlockLevelAccessList) CodeSize(addr common.Address, codeHash common.Hash) (int, error) { - panic("implement me") +func (r *ReaderWithBlockLevelAccessList) CodeSize(addr common.Address, codeHash common.Hash) int { + if code, ok := r.lookup.Code(addr, r.txIndex); ok { + return len(code) + } + return r.Reader.CodeSize(addr, codeHash) } diff --git a/core/state/state_object.go b/core/state/state_object.go index ce456e7668f6..876d113ba79a 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -345,6 +345,8 @@ func (s *stateObject) updateTrie() (Trie, error) { // into a shortnode. This requires `B` to be resolved from disk. // Whereas if the created node is handled first, then the collapse is avoided, and `B` is not resolved. var ( + keys = make([][]byte, 0, len(s.uncommittedStorage)) + values = make([][]byte, 0, len(s.uncommittedStorage)) deletions []common.Hash used = make([]common.Hash, 0, len(s.uncommittedStorage)) ) @@ -360,17 +362,23 @@ func (s *stateObject) updateTrie() (Trie, error) { continue } if (value != common.Hash{}) { - if err := tr.UpdateStorage(s.address, key[:], common.TrimLeftZeroes(value[:])); err != nil { - s.db.setError(err) - return nil, err - } - s.db.StorageUpdated.Add(1) + keys = append(keys, key.Bytes()) + values = append(values, common.TrimLeftZeroes(value.Bytes())) } else { deletions = append(deletions, key) } // Cache the items for preloading used = append(used, key) // Copy needed for closure } + // Apply the updates in batch mode, allowing the trie nodes on the paths + // to be resolved concurrently. + if len(keys) > 0 { + if err := tr.UpdateStorageBatch(s.address, keys, values); err != nil { + s.db.setError(err) + return nil, err + } + s.db.StorageUpdated.Add(int64(len(keys))) + } for _, key := range deletions { if err := tr.DeleteStorage(s.address, key[:]); err != nil { s.db.setError(err) diff --git a/core/state/statedb.go b/core/state/statedb.go index 09a896a89ae2..d2f70118e076 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -571,17 +571,6 @@ func (s *StateDB) GetTransientState(addr common.Address, key common.Hash) common // Setting, updating & deleting state object methods. // -// updateStateObject writes the given object to the trie. -func (s *StateDB) updateStateObject(obj *stateObject) { - // Encode the account and update the account trie - if err := s.trie.UpdateAccount(obj.Address(), &obj.data, len(obj.code)); err != nil { - s.setError(fmt.Errorf("updateStateObject (%x) error: %v", obj.Address(), err)) - } - if obj.dirtyCode { - s.trie.UpdateContractCode(obj.Address(), common.BytesToHash(obj.CodeHash()), obj.code) - } -} - // deleteStateObject removes the given object from the state trie. func (s *StateDB) deleteStateObject(addr common.Address) { if err := s.trie.DeleteAccount(addr); err != nil { @@ -1072,8 +1061,12 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { // into a shortnode. This requires `B` to be resolved from disk. // Whereas if the created node is handled first, then the collapse is avoided, and `B` is not resolved. var ( - usedAddrs []common.Address - deletedAddrs []common.Address + usedAddrs = make([]common.Address, 0, len(s.mutations)) + updatedAddrs = make([]common.Address, 0, len(s.mutations)) + accounts = make([]*types.StateAccount, 0, len(s.mutations)) + codeLens = make([]int, 0, len(s.mutations)) + dirtyCodeObjs []*stateObject + deletedAddrs []common.Address ) for addr, op := range s.mutations { if op.applied { @@ -1085,17 +1078,32 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { deletedAddrs = append(deletedAddrs, addr) } else { obj := s.stateObjects[addr] - s.updateStateObject(obj) - s.AccountUpdated += 1 + updatedAddrs = append(updatedAddrs, addr) + accounts = append(accounts, &obj.data) + codeLens = append(codeLens, len(obj.code)) - // Count code writes post-Finalise so reverted CREATEs are excluded. if obj.dirtyCode { - s.CodeUpdated += 1 - s.CodeUpdateBytes += len(obj.code) + dirtyCodeObjs = append(dirtyCodeObjs, obj) } } usedAddrs = append(usedAddrs, addr) // Copy needed for closure } + // Apply the account updates in batch mode, allowing the trie nodes on the + // paths to be resolved concurrently. + if len(updatedAddrs) > 0 { + if err := s.trie.UpdateAccountBatch(updatedAddrs, accounts, codeLens); err != nil { + s.setError(fmt.Errorf("account update error: %v", err)) + } + s.AccountUpdated += len(updatedAddrs) + } + // Apply the code updates after the account updates. + for _, obj := range dirtyCodeObjs { + s.trie.UpdateContractCode(obj.Address(), common.BytesToHash(obj.CodeHash()), obj.code) + + // Count code writes post-Finalise so reverted CREATEs are excluded. + s.CodeUpdated += 1 + s.CodeUpdateBytes += len(obj.code) + } for _, deletedAddr := range deletedAddrs { s.deleteStateObject(deletedAddr) s.AccountDeleted += 1 diff --git a/core/state_processor.go b/core/state_processor.go index 5b81abef6f78..7444edbd117f 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -64,6 +64,9 @@ func (p *StateProcessor) chainConfig() *params.ChainConfig { // returns the amount of gas that was used in the process. If any of the // transactions failed to execute due to insufficient gas it will return an error. func (p *StateProcessor) Process(ctx context.Context, block *types.Block, statedb *state.StateDB, jumpDestCache vm.JumpDestCache, cfg vm.Config) (*ProcessResult, error) { + if supportsParallelExecution(block.AccessList(), p.chainConfig(), block.Number(), block.Time(), statedb.Witness() != nil, cfg.Tracer != nil) { + return p.processParallel(ctx, block, statedb, jumpDestCache, cfg) + } var ( config = p.chainConfig() receipts = make(types.Receipts, 0, len(block.Transactions())) diff --git a/core/state_processor_parallel.go b/core/state_processor_parallel.go new file mode 100644 index 000000000000..96e999987f6b --- /dev/null +++ b/core/state_processor_parallel.go @@ -0,0 +1,376 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package core + +import ( + "context" + "fmt" + "math/big" + "runtime" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/tracing" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/types/bal" + "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/params" + "golang.org/x/sync/errgroup" +) + +// Per-phase timers for BAL-driven parallel block execution. +var ( + parallelSystemExecTimer = metrics.NewRegisteredResettingTimer("chain/execution/parallel/system", nil) + parallelTxExecTimer = metrics.NewRegisteredResettingTimer("chain/execution/parallel/transactions", nil) + parallelStateHashTimer = metrics.NewRegisteredResettingTimer("chain/execution/parallel/statehash", nil) + parallelTotalTimer = metrics.NewRegisteredResettingTimer("chain/execution/parallel/total", nil) + parallelAccountCacheHitMeter = metrics.NewRegisteredMeter("chain/execution/parallel/reads/account/cache/hit", nil) + parallelAccountCacheMissMeter = metrics.NewRegisteredMeter("chain/execution/parallel/reads/account/cache/miss", nil) + parallelStorageCacheHitMeter = metrics.NewRegisteredMeter("chain/execution/parallel/reads/storage/cache/hit", nil) + parallelStorageCacheMissMeter = metrics.NewRegisteredMeter("chain/execution/parallel/reads/storage/cache/miss", nil) +) + +// supportsParallelExecution reports whether the block can be executed using the +// BAL-driven parallel processor. +func supportsParallelExecution(accessList *bal.BlockAccessList, config *params.ChainConfig, number *big.Int, time uint64, wantWitness bool, wantTrace bool) bool { + // Disable the parallel execution if either the Amsterdam hasn't been + // activated, or the accessList is not accessible. + if accessList == nil || !config.IsAmsterdam(number, time) { + return false + } + // No tracer is attached (tracing requires the strict sequential + // ordering of state operations that parallel execution does not + // preserve). + if wantTrace { + return false + } + // No witness is being collected (witness building must observe + // every state access alongside the proof). + if wantWitness { + return false + } + return true +} + +// txExecResult holds the per-transaction outcome of parallel execution. +type txExecResult struct { + receipt *types.Receipt + accessList *bal.ConstructionBlockAccessList + + // regular and state are the EIP-8037 per-transaction + // gas contributions to the two block-inclusion dimensions. + regular uint64 + state uint64 +} + +// processParallel executes the block's transactions concurrently using the +// block-level access list. +func (p *StateProcessor) processParallel(ctx context.Context, block *types.Block, statedb *state.StateDB, jumpDestCache vm.JumpDestCache, cfg vm.Config) (*ProcessResult, error) { + var ( + config = p.chainConfig() + header = block.Header() + txs = block.Transactions() + start = time.Now() + + signer = types.MakeSigner(config, header.Number, header.Time) + context = NewEVMBlockContext(header, p.chain, nil) + postIndex = uint32(len(txs) + 1) + db = statedb.Database() + + accessList = block.AccessList() + lookup = accessList.Lookup() + + // blockAccessList is the access list rebuilt from the actual execution. + blockAccessList = bal.NewConstructionBlockAccessList() + ) + + // Resolve the parent state root, the point all execution reads from. + parent := p.chain.GetHeader(block.ParentHash(), block.NumberU64()-1) + if parent == nil { + return nil, fmt.Errorf("parent header %x not found", block.ParentHash()) + } + parentRoot := parent.Root + + // The base reader: the underlying state reader wrapped with a shared + // cache and an access-list-hint prefetcher. This reader is shared by + // all tx-executors. + base := statedb.Reader() + + var ( + // Stats + systemExec time.Duration + txExec time.Duration + stateHash time.Duration + ) + // Post-execution state root, computed concurrently with execution. + var wg errgroup.Group + wg.Go(func() error { + start := time.Now() + applyBlockAccessList(statedb, accessList) + statedb.IntermediateRoot(config.IsEIP158(header.Number)) + stateHash = time.Since(start) + return statedb.Error() + }) + // Ensure the root goroutine has stopped mutating the canonical state before + // returning on any path, including the error paths below. Wait is idempotent, + // so the explicit join on the happy path remains valid. + defer func() { _ = wg.Wait() }() + + // Pre-execution system calls, replayed against an ephemeral access-list + // state at block-access index 0, to contribute their entries to the rebuilt + // access list. + // + // TODO(rjl493456442) both the pre/post execution can be performed alongside + // the transaction execution. Measure the overhead before making the changes. + preStart := time.Now() + preState, err := newAccessListState(db, parentRoot, base, lookup, 0) + if err != nil { + return nil, err + } + preEVM := vm.NewEVM(context, preState, config, cfg) + if jumpDestCache != nil { + preEVM.SetJumpDestCache(jumpDestCache) + } + blockAccessList.Merge(PreExecution(ctx, block.BeaconRoot(), block.ParentHash(), config, preEVM, header.Number, header.Time)) + preEVM.Release() + systemExec += time.Since(preStart) + + // Execute the transactions concurrently. Each transaction runs against its + // own ephemeral state instance, whose reads are served from the block-level + // access list overlaid on the parent state. + txStart := time.Now() + results, err := p.executeTransactionsParallel(block, parentRoot, db, base, lookup, context, signer, jumpDestCache, cfg) + if err != nil { + return nil, err + } + txExec = time.Since(txStart) + + // Gather the per-transaction results in block order and charge their gas into + // a single block-level gas pool, exactly as sequential execution does. + var ( + receipts = make(types.Receipts, 0, len(txs)) + allLogs []*types.Log + gp = NewGasPool(block.GasLimit()) + logIndex uint + ) + for i := range txs { + receipt := results[i].receipt + if err := gp.ChargeGasAmsterdam(results[i].regular, results[i].state, receipt.GasUsed); err != nil { + return nil, fmt.Errorf("could not apply tx %d [%v]: %w", i, txs[i].Hash().Hex(), err) + } + // Correct the receipt object with block-level fields + receipt.CumulativeGasUsed = gp.CumulativeUsed() + for _, lg := range receipt.Logs { + lg.Index = logIndex + logIndex++ + } + receipts = append(receipts, receipt) + allLogs = append(allLogs, receipt.Logs...) + blockAccessList.Merge(results[i].accessList) + } + // Post-execution system calls against an ephemeral access-list state at + // index n+1. + postStart := time.Now() + postState, err := newAccessListState(db, parentRoot, base, lookup, int(postIndex)) + if err != nil { + return nil, err + } + postEVM := vm.NewEVM(context, postState, config, cfg) + if jumpDestCache != nil { + postEVM.SetJumpDestCache(jumpDestCache) + } + requests, postBAL, err := PostExecution(ctx, config, header.Number, header.Time, allLogs, postEVM, postIndex) + postEVM.Release() + if err != nil { + return nil, err + } + blockAccessList.Merge(postBAL) + p.chain.Engine().Finalize(p.chain, header, postState, block.Body(), postIndex, blockAccessList) + systemExec += time.Since(postStart) + + // Join the concurrent root computation. + if err := wg.Wait(); err != nil { + return nil, err + } + parallelSystemExecTimer.Update(systemExec) + parallelTxExecTimer.Update(txExec) + parallelStateHashTimer.Update(stateHash) + parallelTotalTimer.UpdateSince(start) + log.Info("Parallel block execution", "number", header.Number, "txs", len(txs), "system", common.PrettyDuration(systemExec), "txexec", common.PrettyDuration(txExec), "statehash", common.PrettyDuration(stateHash), "elapsed", common.PrettyDuration(time.Since(start))) + + return &ProcessResult{ + Receipts: receipts, + Requests: requests, + Logs: allLogs, + GasUsed: gp.Used(), + Bal: blockAccessList, + }, nil +} + +// newAccessListState constructs an ephemeral state, reading through base, whose +// view reflects the mutations recorded in the access list for all block-access +// indices below index. +func newAccessListState(db state.Database, parentRoot common.Hash, base state.Reader, lookup *bal.Lookup, index int) (*state.StateDB, error) { + return state.NewWithReader(parentRoot, db, state.NewReaderWithBlockLevelAccessList(base, lookup, index)) +} + +// executeTransactionsParallel applies all transactions to independent, +// access-list-backed state instances using a pool of workers, and returns +// the per-transaction results in block order. +func (p *StateProcessor) executeTransactionsParallel(block *types.Block, parentRoot common.Hash, db state.Database, base state.Reader, lookup *bal.Lookup, context vm.BlockContext, signer types.Signer, jumpDestCache vm.JumpDestCache, cfg vm.Config) ([]txExecResult, error) { + var ( + config = p.chainConfig() + header = block.Header() + blockHash = block.Hash() + blockNumber = block.Number() + txs = block.Transactions() + results = make([]txExecResult, len(txs)) + ) + workers := runtime.GOMAXPROCS(0) + if workers > len(txs) { + workers = len(txs) + } + var ( + cursor atomic.Int64 + group errgroup.Group + ) + for w := 0; w < workers; w++ { + group.Go(func() error { + evm := vm.NewEVM(context, nil, config, cfg) + if jumpDestCache != nil { + evm.SetJumpDestCache(jumpDestCache) + } + defer evm.Release() + + for { + i := int(cursor.Add(1)) - 1 + if i >= len(txs) { + return nil + } + tx := txs[i] + msg, err := TransactionToMessage(tx, signer, header.BaseFee) + if err != nil { + return fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err) + } + + // Construct the dedicated pre-tx state with the BAL overlay wrapped. + reader := state.NewReaderWithBlockLevelAccessList(base, lookup, i+1) + sdb, err := state.NewWithReader(parentRoot, db, reader) + if err != nil { + return err + } + sdb.SetTxContext(tx.Hash(), i, uint32(i+1)) + evm.SetStateDB(sdb) + + // A transaction-local gas pool, sized to the transaction's own gas + // limit: enough to let the state transition run to completion. + gp := NewGasPool(msg.GasLimit) + receipt, accessList, err := ApplyTransactionWithEVM(msg, gp, sdb, blockNumber, blockHash, context.Time, tx, evm) + if err != nil { + return fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err) + } + results[i] = txExecResult{ + receipt: receipt, + accessList: accessList, + regular: gp.CumulativeRegular(), + state: gp.CumulativeState(), + } + } + }) + } + if err := group.Wait(); err != nil { + return nil, err + } + reportParallelReadStats(block, base) + return results, nil +} + +// reportParallelReadStats reports the state read statistics. TODO(rjl) integrate +// it into blockchain stats. +func reportParallelReadStats(block *types.Block, reader state.Reader) { + stater, ok := reader.(state.ReaderStater) + if !ok { + return + } + var ( + stats = stater.GetStats().StateStats + accountHit = stats.AccountCacheHit + accountMiss = stats.AccountCacheMiss + storageHit = stats.StorageCacheHit + storageMiss = stats.StorageCacheMiss + ) + parallelAccountCacheHitMeter.Mark(accountHit) + parallelAccountCacheMissMeter.Mark(accountMiss) + parallelStorageCacheHitMeter.Mark(storageHit) + parallelStorageCacheMissMeter.Mark(storageMiss) + + log.Debug("Parallel execution read statistics", "number", block.Number(), + "account.hit", accountHit, "account.miss", accountMiss, + "account.hitrate", stats.AccountCacheHitRate(), + "storage.hit", storageHit, "storage.miss", storageMiss, + "storage.hitrate", stats.StorageCacheHitRate()) +} + +// applyBlockAccessList writes the final (highest block-access index) value of +// every mutated account field and storage slot recorded in the access list into +// the supplied state. +func applyBlockAccessList(statedb *state.StateDB, list *bal.BlockAccessList) { + for i := range *list { + acc := &(*list)[i] + addr := acc.Address + if n := len(acc.BalanceChanges); n > 0 { + statedb.SetBalance(addr, acc.BalanceChanges[n-1].PostBalance.Clone(), tracing.BalanceChangeUnspecified) + } + if n := len(acc.NonceChanges); n > 0 { + statedb.SetNonce(addr, acc.NonceChanges[n-1].PostNonce, tracing.NonceChangeUnspecified) + } + if n := len(acc.CodeChanges); n > 0 { + statedb.SetCode(addr, acc.CodeChanges[n-1].NewCode, tracing.CodeChangeUnspecified) + } + for j := range acc.StorageChanges { + sc := &acc.StorageChanges[j] + if n := len(sc.SlotChanges); n > 0 { + statedb.SetState(addr, sc.Slot.Bytes32(), sc.SlotChanges[n-1].PostValue.Bytes32()) + } + } + } +} + +// prefetchHint derives, for every account referenced by the block, the set of +// storage slots that will be read from the underlying pre-state during +// execution (both read-only slots and the slots that get written, since a write +// is generally preceded by a read of the committed value). It is used to warm +// the shared reader's cache ahead of execution. +func prefetchHint(list *bal.BlockAccessList) map[common.Address][]common.Hash { + hint := make(map[common.Address][]common.Hash, len(*list)) + for i := range *list { + acc := &(*list)[i] + slots := make([]common.Hash, 0, len(acc.StorageReads)+len(acc.StorageChanges)) + for _, slot := range acc.StorageReads { + slots = append(slots, slot.Bytes32()) + } + for j := range acc.StorageChanges { + slots = append(slots, acc.StorageChanges[j].Slot.Bytes32()) + } + hint[acc.Address] = slots + } + return hint +} diff --git a/core/types/bal/bal_lookup.go b/core/types/bal/bal_lookup.go new file mode 100644 index 000000000000..77b9014a33bd --- /dev/null +++ b/core/types/bal/bal_lookup.go @@ -0,0 +1,127 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package bal + +import ( + "sort" + + "github.com/ethereum/go-ethereum/common" + "github.com/holiman/uint256" +) + +// accountLookup references an account's per-index mutations. The slices are the +// ones from the encoded access list, which the spec requires to be sorted +// ascending (and unique) by block-access index, so they can be binary-searched +// directly without copying. +type accountLookup struct { + balances []encodingBalanceChange + nonces []encodingAccountNonce + codes []encodingCodeChange + storage map[common.Hash][]encodingStorageWrite +} + +// Lookup is a read-optimized, index-addressable view over a block access list. +type Lookup struct { + accounts map[common.Address]*accountLookup +} + +// Lookup builds a Lookup over the access list. The returned view aliases the +// receiver's slices, so the access list must not be mutated while it is in use. +func (e *BlockAccessList) Lookup() *Lookup { + l := &Lookup{ + accounts: make(map[common.Address]*accountLookup, len(*e)), + } + for i := range *e { + acc := &(*e)[i] + al := &accountLookup{ + balances: acc.BalanceChanges, + nonces: acc.NonceChanges, + codes: acc.CodeChanges, + storage: make(map[common.Hash][]encodingStorageWrite, len(acc.StorageChanges)), + } + for j := range acc.StorageChanges { + sc := &acc.StorageChanges[j] + al.storage[sc.Slot.Bytes32()] = sc.SlotChanges + } + l.accounts[acc.Address] = al + } + return l +} + +// searchLatest returns the entry with the highest block-access index strictly +// below limit, relying on entries being sorted ascending by that index. +func searchLatest[E any](entries []E, limit uint32, index func(E) uint32) (E, bool) { + i := sort.Search(len(entries), func(i int) bool { + return index(entries[i]) >= limit + }) + // All entries satisfy the condition (index >= limit) + if i == 0 { + var zero E + return zero, false + } + return entries[i-1], true +} + +// AccountChanges returns the account field values observed at block-access index +// limit (i.e. the latest mutation recorded strictly before limit). Each boolean +// reports whether the corresponding field was mutated before limit. +func (l *Lookup) AccountChanges(addr common.Address, limit uint32) (balance *uint256.Int, nonce uint64, code []byte, hasBalance, hasNonce, hasCode bool) { + acc, ok := l.accounts[addr] + if !ok { + return nil, 0, nil, false, false, false + } + if e, ok := searchLatest(acc.balances, limit, func(e encodingBalanceChange) uint32 { return e.BlockAccessIndex }); ok { + balance, hasBalance = e.PostBalance, true + } + if e, ok := searchLatest(acc.nonces, limit, func(e encodingAccountNonce) uint32 { return e.BlockAccessIndex }); ok { + nonce, hasNonce = e.PostNonce, true + } + if e, ok := searchLatest(acc.codes, limit, func(e encodingCodeChange) uint32 { return e.BlockAccessIndex }); ok { + code, hasCode = e.NewCode, true + } + return balance, nonce, code, hasBalance, hasNonce, hasCode +} + +// Code returns the contract code observed at block-access index limit, and +// whether the code was set before limit. +func (l *Lookup) Code(addr common.Address, limit uint32) ([]byte, bool) { + acc, ok := l.accounts[addr] + if !ok { + return nil, false + } + if e, ok := searchLatest(acc.codes, limit, func(e encodingCodeChange) uint32 { return e.BlockAccessIndex }); ok { + return e.NewCode, true + } + return nil, false +} + +// Storage returns the value of the storage slot observed at block-access index +// limit, and whether the slot was written before limit. +func (l *Lookup) Storage(addr common.Address, slot common.Hash, limit uint32) (common.Hash, bool) { + acc, ok := l.accounts[addr] + if !ok { + return common.Hash{}, false + } + writes, ok := acc.storage[slot] + if !ok { + return common.Hash{}, false + } + if e, ok := searchLatest(writes, limit, func(e encodingStorageWrite) uint32 { return e.BlockAccessIndex }); ok { + return e.PostValue.Bytes32(), true + } + return common.Hash{}, false +} diff --git a/core/vm/evm.go b/core/vm/evm.go index 15609a0205d6..935c06de9fe3 100644 --- a/core/vm/evm.go +++ b/core/vm/evm.go @@ -213,6 +213,11 @@ func (evm *EVM) SetJumpDestCache(jumpDests JumpDestCache) { evm.jumpDests = jumpDests } +// SetStateDB configures the state for interaction. +func (evm *EVM) SetStateDB(statedb *state.StateDB) { + evm.StateDB = statedb +} + // SetTxContext resets the EVM with a new transaction context. // This is not threadsafe and should only be done very cautiously. func (evm *EVM) SetTxContext(txCtx TxContext) {