Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 91 additions & 50 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2111,82 +2111,122 @@ type ExecuteConfig struct {
EnableWitnessStats bool
}

// ProcessBlock executes and validates the given block. If there was no error
// it writes the block and associated state to database.
func (bc *BlockChain) ProcessBlock(ctx context.Context, parentRoot common.Hash, block *types.Block, config ExecuteConfig) (result *blockProcessingResult, blockEndErr error) {
var (
err error
startTime = time.Now()
statedb *state.StateDB
interrupt atomic.Bool
sdb state.Database
)
defer interrupt.Store(true) // terminate the prefetch at the end
// useBALExecution reports whether the block will be executed through the
// BAL-driven parallel processor.
func (bc *BlockChain) useBALExecution(block *types.Block, wantWitness bool) bool {
return supportsParallelExecution(block.AccessList(), bc.chainConfig, block.Number(), block.Time(), wantWitness, bc.cfg.VmConfig.Tracer != nil)
}

// setupExecutionState builds the state instance that block execution reads from
// and writes to.
//
// - BAL-driven parallel execution (Amsterdam blocks carrying an access list):
// a single reader(the underlying state reader wrapped with a shared cache
// and an access-list-hint prefetcher) feeds both the canonical state and
// every per-transaction state built on top of it.
//
// - Sequential execution with prefetching: the main processor and a
// speculative whole-block prefetcher share one cached reader.
//
// - No prefetching: a plain reader, with a no-op cleanup.
func (bc *BlockChain) setupExecutionState(parentRoot common.Hash, block *types.Block, config ExecuteConfig, interrupt *atomic.Bool) (*state.StateDB, func(*blockProcessingResult), error) {
noop := func(*blockProcessingResult) {}

var sdb state.Database
if bc.chainConfig.IsUBT(block.Number(), block.Time()) {
sdb = state.NewUBTDatabase(bc.triedb, bc.codedb)
} else {
sdb = state.NewMPTDatabase(bc.triedb, bc.codedb).WithSnapshot(bc.snaps)
}
// If prefetching is enabled, run that against the current state to pre-cache
// transactions and probabilistically some of the account/storage trie nodes.
//
// Note: the main processor and prefetcher share the same reader with a local
// cache for mitigating the overhead of state access.
type prewarmReader interface {
// ReadersWithCacheStats creates a pair of state readers that share the
// same underlying state reader and internal state cache, while maintaining
// separate statistics respectively.
ReadersWithCacheStats(stateRoot common.Hash) (state.Reader, state.Reader, error)
}
warmer, ok := sdb.(prewarmReader)
wantWitness := config.StatelessSelfValidation || config.MakeWitness

if bc.cfg.NoPrefetch || !ok {
statedb, err = state.New(parentRoot, sdb)
switch warmer, ok := sdb.(prewarmReader); {
case bc.useBALExecution(block, wantWitness):
base, err := sdb.Reader(parentRoot)
if err != nil {
return nil, err
return nil, nil, err
}
} else {
// If prefetching is enabled, run that against the current state to pre-cache
// transactions and probabilistically some of the account/storage trie nodes.
//
// Note: the main processor and prefetcher share the same reader with a local
// cache for mitigating the overhead of state access.
reader, stop := state.NewBlockExecutionReader(base, prefetchHint(block.AccessList()), runtime.NumCPU())
statedb, err := state.NewWithReader(parentRoot, sdb, reader)
if err != nil {
stop()
return nil, nil, err
}
return statedb, func(*blockProcessingResult) { stop() }, nil

case bc.cfg.NoPrefetch || !ok:
statedb, err := state.New(parentRoot, sdb)
if err != nil {
return nil, nil, err
}
return statedb, noop, nil

default:
// The main processor and the speculative prefetcher share the same reader
// with a local cache for mitigating the overhead of state access.
prefetch, process, err := warmer.ReadersWithCacheStats(parentRoot)
if err != nil {
return nil, err
return nil, nil, err
}
throwaway, err := state.NewWithReader(parentRoot, sdb, prefetch)
if err != nil {
return nil, err
return nil, nil, err
}
statedb, err = state.NewWithReader(parentRoot, sdb, process)
statedb, err := state.NewWithReader(parentRoot, sdb, process)
if err != nil {
return nil, err
return nil, nil, err
}
// Upload the statistics of reader at the end
defer func() {
if result != nil {
if stater, ok := prefetch.(state.ReaderStater); ok {
result.stats.StatePrefetchCacheStats = stater.GetStats()
}
if stater, ok := process.(state.ReaderStater); ok {
result.stats.StateReadCacheStats = stater.GetStats()
}
}
}()
go func(start time.Time, throwaway *state.StateDB, block *types.Block) {
go func(start time.Time) {
// Disable tracing for prefetcher executions.
vmCfg := bc.cfg.VmConfig
vmCfg.Tracer = nil
bc.prefetcher.Prefetch(block, throwaway, bc.jumpDestCache, vmCfg, &interrupt)
bc.prefetcher.Prefetch(block, throwaway, bc.jumpDestCache, vmCfg, interrupt)

blockPrefetchExecuteTimer.Update(time.Since(start))
if interrupt.Load() {
blockPrefetchInterruptMeter.Mark(1)
}
}(time.Now(), throwaway, block)
}(time.Now())

return statedb, func(result *blockProcessingResult) {
// Upload the statistics of reader at the end.
if result == nil {
return
}
if stater, ok := prefetch.(state.ReaderStater); ok {
result.stats.StatePrefetchCacheStats = stater.GetStats()
}
if stater, ok := process.(state.ReaderStater); ok {
result.stats.StateReadCacheStats = stater.GetStats()
}
}, nil
}
}

// ProcessBlock executes and validates the given block. If there was no error
// it writes the block and associated state to database.
func (bc *BlockChain) ProcessBlock(ctx context.Context, parentRoot common.Hash, block *types.Block, config ExecuteConfig) (result *blockProcessingResult, blockEndErr error) {
var (
err error
startTime = time.Now()
statedb *state.StateDB
interrupt atomic.Bool
)
defer interrupt.Store(true) // terminate the prefetch at the end

// Set up the state reader feeding execution, along with a cleanup to run once
// processing is complete (stop the prefetcher, upload reader statistics).
statedb, cleanup, err := bc.setupExecutionState(parentRoot, block, config, &interrupt)
if err != nil {
return nil, err
}
defer func() { cleanup(result) }()

// If we are past Byzantium, enable prefetching to pull in trie node paths
// while processing transactions. Before Byzantium the prefetcher is mostly
Expand All @@ -2201,9 +2241,14 @@ func (bc *BlockChain) ProcessBlock(ctx context.Context, parentRoot common.Hash,
if err != nil {
return nil, err
}
defer witness.ReportMetrics(block.NumberU64())
}
// In BAL-driven parallel execution, state hashing is performed independently
// with the transaction execution, disable the trie prefetcher explicitly.
if !bc.useBALExecution(block, witness != nil) {
statedb.StartPrefetcher("chain", witness)
defer statedb.StopPrefetcher()
}
statedb.StartPrefetcher("chain", witness)
defer statedb.StopPrefetcher()
}

// Instrument the blockchain tracing
Expand Down Expand Up @@ -2328,10 +2373,6 @@ func (bc *BlockChain) ProcessBlock(ctx context.Context, parentRoot common.Hash,
stats.DatabaseCommit = statedb.DatabaseCommits // Database commits are complete, we can mark them
stats.BlockWrite = time.Since(wstart) - max(statedb.AccountCommits, statedb.StorageCommits) /* concurrent */ - statedb.DatabaseCommits
}
// Report the collected witness statistics
if witness != nil {
witness.ReportMetrics(block.NumberU64())
}
elapsed := time.Since(startTime) + 1 // prevent zero division
stats.TotalTime = elapsed
stats.MgasPerSecond = float64(res.GasUsed) * 1000 / float64(elapsed)
Expand Down
13 changes: 13 additions & 0 deletions core/gaspool.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,19 @@ func (gp *GasPool) CumulativeUsed() uint64 {
return gp.cumulativeUsed
}

// CumulativeRegular returns the cumulative regular-dimension gas consumed
// (EIP-8037). It is used to derive the block gas used when transactions are
// charged against independent pools during parallel execution.
func (gp *GasPool) CumulativeRegular() uint64 {
return gp.cumulativeRegular
}

// CumulativeState returns the cumulative state-dimension gas consumed
// (EIP-8037). See CumulativeRegular for the rationale.
func (gp *GasPool) CumulativeState() uint64 {
return gp.cumulativeState
}

// Used returns the amount of consumed gas.
func (gp *GasPool) Used() uint64 {
// After 8037, return max(sum_regular, sum_state)
Expand Down
91 changes: 76 additions & 15 deletions core/state/reader_eip_7928.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/types/bal"
"github.com/ethereum/go-ethereum/crypto"
)

// The EIP27928 reader utilizes a hierarchical architecture to optimize state
Expand Down Expand Up @@ -86,7 +87,6 @@ type prefetchStateReader struct {
closeOnce sync.Once
}

// nolint:unused
func newPrefetchStateReader(reader StateReader, accessList map[common.Address][]common.Hash, nThreads int) *prefetchStateReader {
tasks := make([]*fetchTask, 0, len(accessList))
for addr, slots := range accessList {
Expand Down Expand Up @@ -193,13 +193,28 @@ func (r *prefetchStateReader) process(start, limit int) {
}
}

// NewBlockExecutionReader wraps base with a shared, concurrency-safe cache so
// that any state resolved once, whether by the background prefetcher or by
// transaction execution, is not fetched from the underlying reader again.
func NewBlockExecutionReader(base Reader, prefetch map[common.Address][]common.Hash, threads int) (Reader, func()) {
var (
cache = newStateReaderWithCache(base)
stop = func() {}
)
if len(prefetch) > 0 && threads > 0 {
pf := newPrefetchStateReader(cache, prefetch, threads)
stop = pf.Close
}
return newReader(base, newStateReaderWithStats(cache)), stop
}

// ReaderWithBlockLevelAccessList provides state access that reflects the
// pre-transition state combined with the mutations made by transactions
// prior to TxIndex.
type ReaderWithBlockLevelAccessList struct {
Reader
AccessList *bal.ConstructionBlockAccessList
TxIndex int
lookup *bal.Lookup
txIndex uint32
}

// NewReaderWithBlockLevelAccessList constructs a reader for accessing states
Expand All @@ -209,39 +224,85 @@ type ReaderWithBlockLevelAccessList struct {
// - 0 for pre‑execution system contract calls.
// - 1 … n for transactions (in block order).
// - n + 1 for post‑execution system contract calls.
func NewReaderWithBlockLevelAccessList(base Reader, accessList *bal.ConstructionBlockAccessList, txIndex int) *ReaderWithBlockLevelAccessList {
func NewReaderWithBlockLevelAccessList(base Reader, lookup *bal.Lookup, txIndex int) *ReaderWithBlockLevelAccessList {
return &ReaderWithBlockLevelAccessList{
Reader: base,
AccessList: accessList,
TxIndex: txIndex,
Reader: base,
lookup: lookup,
txIndex: uint32(txIndex),
}
}

// Account implements Reader, returning the account with the specific address.
//
// The returned account reflects the pre-transition state overlaid with all
// mutations made by call frames prior to the reader's TxIndex.
func (r *ReaderWithBlockLevelAccessList) Account(addr common.Address) (*types.StateAccount, error) {
panic("implement me")
base, err := r.Reader.Account(addr)
if err != nil {
return nil, err
}
balance, nonce, code, hasBalance, hasNonce, hasCode := r.lookup.AccountChanges(addr, r.txIndex)

// No mutation precedes the current call frame, return the base account as is.
if !hasBalance && !hasNonce && !hasCode {
return base, nil
}
// Overlay the mutations on top of a copy of the base account. The base
// account must not be mutated in place: with a shared cache in front of the
// underlying reader, the same instance is handed to concurrent readers.
account := types.NewEmptyStateAccount()
if base != nil {
account = base.Copy()
}
if hasBalance {
account.Balance = balance.Clone()
}
if hasNonce {
account.Nonce = nonce
}
if hasCode {
if len(code) == 0 {
account.CodeHash = types.EmptyCodeHash.Bytes()
} else {
account.CodeHash = crypto.Keccak256(code)
}
}
return account, nil
}

// Storage implements Reader, returning the storage slot with the specific
// address and slot key.
func (r *ReaderWithBlockLevelAccessList) Storage(addr common.Address, slot common.Hash) (common.Hash, error) {
panic("implement me")
if value, ok := r.lookup.Storage(addr, slot, r.txIndex); ok {
return value, nil
}
return r.Reader.Storage(addr, slot)
}

// Has implements Reader, returning the flag indicating whether the contract
// code with specified address and hash exists or not.
func (r *ReaderWithBlockLevelAccessList) Has(addr common.Address, codeHash common.Hash) bool {
panic("implement me")
if _, ok := r.lookup.Code(addr, r.txIndex); ok {
return true
}
return r.Reader.Has(addr, codeHash)
}

// Code implements Reader, returning the contract code with specified address
// and hash.
func (r *ReaderWithBlockLevelAccessList) Code(addr common.Address, codeHash common.Hash) ([]byte, error) {
panic("implement me")
// and hash. Code created earlier in the block (and therefore absent from the
// pre-transition state) is served directly from the access list.
func (r *ReaderWithBlockLevelAccessList) Code(addr common.Address, codeHash common.Hash) []byte {
if code, ok := r.lookup.Code(addr, r.txIndex); ok {
return code
}
return r.Reader.Code(addr, codeHash)
}

// CodeSize implements Reader, returning the contract code size with specified
// address and hash.
func (r *ReaderWithBlockLevelAccessList) CodeSize(addr common.Address, codeHash common.Hash) (int, error) {
panic("implement me")
func (r *ReaderWithBlockLevelAccessList) CodeSize(addr common.Address, codeHash common.Hash) int {
if code, ok := r.lookup.Code(addr, r.txIndex); ok {
return len(code)
}
return r.Reader.CodeSize(addr, codeHash)
}
18 changes: 13 additions & 5 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@ func (s *stateObject) updateTrie() (Trie, error) {
// into a shortnode. This requires `B` to be resolved from disk.
// Whereas if the created node is handled first, then the collapse is avoided, and `B` is not resolved.
var (
keys = make([][]byte, 0, len(s.uncommittedStorage))
values = make([][]byte, 0, len(s.uncommittedStorage))
deletions []common.Hash
used = make([]common.Hash, 0, len(s.uncommittedStorage))
)
Expand All @@ -360,17 +362,23 @@ func (s *stateObject) updateTrie() (Trie, error) {
continue
}
if (value != common.Hash{}) {
if err := tr.UpdateStorage(s.address, key[:], common.TrimLeftZeroes(value[:])); err != nil {
s.db.setError(err)
return nil, err
}
s.db.StorageUpdated.Add(1)
keys = append(keys, key.Bytes())
values = append(values, common.TrimLeftZeroes(value.Bytes()))
} else {
deletions = append(deletions, key)
}
// Cache the items for preloading
used = append(used, key) // Copy needed for closure
}
// Apply the updates in batch mode, allowing the trie nodes on the paths
// to be resolved concurrently.
if len(keys) > 0 {
if err := tr.UpdateStorageBatch(s.address, keys, values); err != nil {
s.db.setError(err)
return nil, err
}
s.db.StorageUpdated.Add(int64(len(keys)))
}
for _, key := range deletions {
if err := tr.DeleteStorage(s.address, key[:]); err != nil {
s.db.setError(err)
Expand Down
Loading
Loading