Skip to content
Merged
27 changes: 26 additions & 1 deletion build/devenv/evm/event_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ import (
"github.com/smartcontractkit/chainlink-ccv/protocol"
)

// fallbackLookbackBlocks bounds how far back the first eth_getLogs scan reaches on a
// long-lived chain, so it neither scans from genesis nor exceeds the provider's
// getLogs range limit — in blocks, not wall-clock, since the target event is always
// recent. Mirrors verifier/pkg/sourcereader's DefaultMaxBlockRange.
const fallbackLookbackBlocks uint64 = 1500

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kept as an independent constant rather than importing sourcereader.DefaultMaxBlockRange to avoid a devenv>verifier package dependency


type eventKey struct {
chainSelector uint64
msgNum uint64
Expand Down Expand Up @@ -177,7 +183,19 @@ func (p *eventPoller[T]) poll() {
return
}

events, err := p.pollFn(lastScanned+1, latestBlock)
start := lastScanned + 1
if lastScanned == 0 {
if lookbackStart := fallbackStartBlock(latestBlock); lookbackStart > start {
start = lookbackStart
p.logger.Debug().
Uint64("fromBlock", start).
Uint64("toBlock", latestBlock).
Str("event", p.eventName).
Msg("Using fallback start block (bounded lookback)")
}
}

events, err := p.pollFn(start, latestBlock)
if err != nil {
p.logger.Warn().Err(err).Str("event", p.eventName).Msg("Failed to poll events")
return
Expand Down Expand Up @@ -220,6 +238,13 @@ func (p *eventPoller[T]) poll() {
p.lastScannedBlock = latestBlock
}

func fallbackStartBlock(latestBlock uint64) uint64 {
if latestBlock > fallbackLookbackBlocks {
return latestBlock - fallbackLookbackBlocks
}
return 0
}

func (p *eventPoller[T]) addToCache(key eventKey, result pollerResult[T]) {
seqKey := eventKey{chainSelector: key.chainSelector, msgNum: key.msgNum}
if _, exists := p.cachedBySeqNum[seqKey]; !exists {
Expand Down
13 changes: 13 additions & 0 deletions build/devenv/evm/event_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,16 @@ func TestEventPollerByMessageID(t *testing.T) {
}
})
}

func TestFallbackStartBlock(t *testing.T) {
t.Parallel()

// Long-lived chain: the first scan reaches back exactly the lookback window,
// independent of block time.
const latest = uint64(10_000_000)
require.Equal(t, latest-fallbackLookbackBlocks, fallbackStartBlock(latest))

// Chain younger than (or exactly at) the lookback window scans from genesis.
Comment thread
jadepark-dev marked this conversation as resolved.
require.Equal(t, uint64(0), fallbackStartBlock(fallbackLookbackBlocks))
require.Equal(t, uint64(0), fallbackStartBlock(100))
}
11 changes: 7 additions & 4 deletions build/devenv/evm/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,8 @@ func extractEthClientFromBackend(client any) (*ethclient.Client, error) {
// NewCCIP17EVM creates new smart-contracts wrappers with utility functions for CCIP17EVM implementation.
func NewCCIP17EVM(ctx context.Context, logger zerolog.Logger, e *deployment.Environment, chainSelector uint64) (*CCIP17EVM, error) {
var (
onRamp *onramp.OnRamp
offRamp *offramp.OffRamp
onRampPoller eventPoller[cciptestinterfaces.MessageSentEvent]
onRamp *onramp.OnRamp
offRamp *offramp.OffRamp
)
chainDetails, err := chainsel.GetChainDetails(chainSelector)
if err != nil {
Expand Down Expand Up @@ -192,7 +191,6 @@ func NewCCIP17EVM(ctx context.Context, logger zerolog.Logger, e *deployment.Envi
ethClient: ethClient,
onRamp: onRamp,
offRamp: offRamp,
onRampPoller: &onRampPoller,
}, nil
}

Expand All @@ -203,6 +201,11 @@ func (m *CCIP17EVM) ChainSelector() uint64 {
func (m *CCIP17EVM) getOrCreateOnRampPoller() (*eventPoller[cciptestinterfaces.MessageSentEvent], error) {
m.pollersMu.Lock()
defer m.pollersMu.Unlock()

if m.onRampPoller != nil {
return m.onRampPoller, nil
}

onRamp := m.onRamp
ethClient := m.ethClient

Expand Down
49 changes: 32 additions & 17 deletions build/devenv/tests/e2e/tcapi/basic/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,20 +111,15 @@ func (tc *v3TestCase) Run(ctx context.Context) error {
}
messageID := sendMessageResult.MessageID

aggregatorClients, err := tc.lib.AllAggregators()
if err != nil {
return fmt.Errorf("failed to get aggregator clients: %w", err)
}
aggregatorClient := aggregatorClients[common.DefaultCommitteeVerifierQualifier]
if tc.aggregatorQualifier != "" && tc.aggregatorQualifier != common.DefaultCommitteeVerifierQualifier {
if client, ok := aggregatorClients[tc.aggregatorQualifier]; ok {
aggregatorClient = client
var aggregatorClient *ccv.AggregatorClient
var indexerMonitor *ccv.IndexerMonitor
if !tc.args.Run.OnchainAssertionOnly {
var setupErr error
aggregatorClient, indexerMonitor, setupErr = setupAggregatorAndIndexer(tc.lib, tc.aggregatorQualifier)
if setupErr != nil {
return setupErr

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: I had a similar idea to make aggregator/indexer assertions optional on environments where this information may not be available - was wondering if we should gate this purely based on presence (i.e. len(aggregatorClients) == 0 means no aggregator assertions - or if aggregatorClient, ok returns nil, false, skip the assertion) vs. doing it via flag (OnchainAssertionOnly).

The main advantage I see to the presence-based approach is that you don't need to update the test code or write a new test in order to run it in different environments - but curious as to what your. thoughts are.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid point, TestingContext.AssertMessage seems to be presence-based already, but iirc there was hard configuration validation. Maybe I can split that logic into absence vs. connection failure on lib construction. Let me explore a bit

}
}
indexerMonitor, err := tc.lib.IndexerMonitor()
if err != nil {
return fmt.Errorf("failed to get indexer monitor: %w", err)
}
testCtx, cleanupFn := tcapi.NewTestingContext(ctx, chainMap, aggregatorClient, indexerMonitor)
defer cleanupFn()

Expand All @@ -138,11 +133,13 @@ func (tc *v3TestCase) Run(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to assert message: %w", err)
}
if result.AggregatedResult == nil {
return fmt.Errorf("aggregated result is nil")
}
if len(result.IndexedVerifications.Results) != tc.numExpectedVerifications {
return fmt.Errorf("expected %d indexed verifications, got %d", tc.numExpectedVerifications, len(result.IndexedVerifications.Results))
if !tc.args.Run.OnchainAssertionOnly {
if result.AggregatedResult == nil {
return fmt.Errorf("aggregated result is nil")
}
if len(result.IndexedVerifications.Results) != tc.numExpectedVerifications {
return fmt.Errorf("expected %d indexed verifications, got %d", tc.numExpectedVerifications, len(result.IndexedVerifications.Results))
}
}

e, err := dst.ConfirmExecOnDest(ctx, tc.src, messageKey, execTimeout)
Expand All @@ -161,6 +158,24 @@ func (tc *v3TestCase) HavePrerequisites(ctx context.Context) bool {
return tc.ensureHydrated(ctx) == nil
}

func setupAggregatorAndIndexer(lib ccv.Lib, aggregatorQualifier string) (*ccv.AggregatorClient, *ccv.IndexerMonitor, error) {
aggregatorClients, err := lib.AllAggregators()
if err != nil {
return nil, nil, fmt.Errorf("failed to get aggregator clients: %w", err)
}
aggregatorClient := aggregatorClients[common.DefaultCommitteeVerifierQualifier]
if aggregatorQualifier != "" && aggregatorQualifier != common.DefaultCommitteeVerifierQualifier {
if client, ok := aggregatorClients[aggregatorQualifier]; ok {
aggregatorClient = client
}
}
indexerMonitor, err := lib.IndexerMonitor()
if err != nil {
return nil, nil, fmt.Errorf("failed to get indexer monitor: %w", err)
}
return aggregatorClient, indexerMonitor, nil
}

func getCommitteeCCV(resolver chainreg.AddressResolver, ds datastore.DataStore, srcChainSelector uint64, qualifier string) (protocol.CCV, error) {
addr, err := resolver.GetCommitteeCCV(ds, srcChainSelector, qualifier)
if err != nil {
Expand Down
27 changes: 17 additions & 10 deletions build/devenv/tests/e2e/tcapi/token_transfer/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,19 @@ func (tc *tokenTransferV3TestCase) Run(ctx context.Context) error {
}
msgID := sendRes.MessageID

aggregatorClients, err := tc.lib.AllAggregators()
if err != nil {
return fmt.Errorf("failed to get aggregator clients: %w", err)
}
aggregatorClient := aggregatorClients[common.DefaultCommitteeVerifierQualifier]
indexerMonitor, err := tc.lib.IndexerMonitor()
if err != nil {
return fmt.Errorf("failed to get indexer monitor: %w", err)
var aggregatorClient *ccv.AggregatorClient
var indexerMonitor *ccv.IndexerMonitor
if !tc.args.Run.OnchainAssertionOnly {
aggregatorClients, aggErr := tc.lib.AllAggregators()
if aggErr != nil {
return fmt.Errorf("failed to get aggregator clients: %w", aggErr)
}
aggregatorClient = aggregatorClients[common.DefaultCommitteeVerifierQualifier]
var monErr error
indexerMonitor, monErr = tc.lib.IndexerMonitor()
if monErr != nil {
return fmt.Errorf("failed to get indexer monitor: %w", monErr)
}
}
testCtx, cleanupFn := tcapi.NewTestingContext(ctx, chainMap, aggregatorClient, indexerMonitor)
defer cleanupFn()
Expand All @@ -160,8 +165,10 @@ func (tc *tokenTransferV3TestCase) Run(ctx context.Context) error {
if err != nil {
return fmt.Errorf("assert message: %w", err)
}
if res.AggregatedResult == nil {
return fmt.Errorf("aggregated result is nil")
if !tc.args.Run.OnchainAssertionOnly {
if res.AggregatedResult == nil {
return fmt.Errorf("aggregated result is nil")
}
}

execEvt, err := dst.ConfirmExecOnDest(ctx, tc.src, messageKey, execTimeout)
Expand Down
9 changes: 7 additions & 2 deletions build/devenv/tests/e2e/tcapi/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,18 @@ type TestCase interface {
// DefaultV3ExecutionGasLimit is the execution gas limit used when SendConfig and MessageOptions omit it.
const DefaultV3ExecutionGasLimit uint32 = 200_000

// RunConfig holds optional overrides for wait/confirm timeouts in TCAPI Run methods.
// Zero values use the fallback passed to SentTimeout or ExecTimeout.
// RunConfig holds optional overrides for wait/confirm timeouts and assertion scope
// in TCAPI Run methods. Zero values use the fallback passed to SentTimeout or
// ExecTimeout, and select the full local-devenv assertion behavior.
type RunConfig struct {
// ConfirmSentTimeout overrides ConfirmSendOnSource when non-zero.
ConfirmSentTimeout time.Duration
// ConfirmExecTimeout overrides AssertMessage and ConfirmExecOnDest when non-zero.
ConfirmExecTimeout time.Duration
// OnchainAssertionOnly skips aggregator/indexer wiring and assertion, for
// environments where those offchain services aren't reachable (e.g. persistent
// env). Default false: wire clients and require offchain assert results.
OnchainAssertionOnly bool
}

// SentTimeout returns ConfirmSentTimeout when set, otherwise fallback.
Expand Down
Loading