diff --git a/build/devenv/evm/event_poller.go b/build/devenv/evm/event_poller.go index 2358ba200..f2359d1fb 100644 --- a/build/devenv/evm/event_poller.go +++ b/build/devenv/evm/event_poller.go @@ -11,6 +11,12 @@ import ( "github.com/smartcontractkit/chainlink-ccv/protocol" ) +// fallbackLookbackBlocks bounds how far back the first eth_getLogs scan reaches on a +// long-lived chain, so it neither scans from genesis nor exceeds the provider's +// getLogs range limit. The bound is in blocks, not wall-clock, since the target event +// is always recent. Mirrors verifier/pkg/sourcereader's DefaultMaxBlockRange. +const fallbackLookbackBlocks uint64 = 1500 + type eventKey struct { chainSelector uint64 msgNum uint64 @@ -177,7 +183,19 @@ func (p *eventPoller[T]) poll() { return } - events, err := p.pollFn(lastScanned+1, latestBlock) + start := lastScanned + 1 + if lastScanned == 0 { + if lookbackStart := fallbackStartBlock(latestBlock); lookbackStart > start { + start = lookbackStart + p.logger.Debug(). + Uint64("fromBlock", start). + Uint64("toBlock", latestBlock). + Str("event", p.eventName). + Msg("Using fallback start block (bounded lookback)") + } + } + + events, err := p.pollFn(start, latestBlock) if err != nil { p.logger.Warn().Err(err).Str("event", p.eventName).Msg("Failed to poll events") return @@ -220,6 +238,13 @@ func (p *eventPoller[T]) poll() { p.lastScannedBlock = latestBlock } +func fallbackStartBlock(latestBlock uint64) uint64 { + if latestBlock > fallbackLookbackBlocks { + return latestBlock - fallbackLookbackBlocks + } + return 0 +} + func (p *eventPoller[T]) addToCache(key eventKey, result pollerResult[T]) { seqKey := eventKey{chainSelector: key.chainSelector, msgNum: key.msgNum} if _, exists := p.cachedBySeqNum[seqKey]; !exists { diff --git a/build/devenv/evm/event_poller_test.go b/build/devenv/evm/event_poller_test.go index ab420dc77..b3867cc07 100644 --- a/build/devenv/evm/event_poller_test.go +++ b/build/devenv/evm/event_poller_test.go @@ -197,3 +197,16 @@ func TestEventPollerByMessageID(t *testing.T) { } }) } + +func TestFallbackStartBlock(t *testing.T) { + t.Parallel() + + // Long-lived chain: the first scan reaches back exactly the lookback window, + // independent of block time. + const latest = uint64(10_000_000) + require.Equal(t, latest-fallbackLookbackBlocks, fallbackStartBlock(latest)) + + // Chain younger than (or exactly at) the lookback window scans from genesis. + require.Equal(t, uint64(0), fallbackStartBlock(fallbackLookbackBlocks)) + require.Equal(t, uint64(0), fallbackStartBlock(100)) +} diff --git a/build/devenv/evm/impl.go b/build/devenv/evm/impl.go index 0a49ddab4..da3ead4b4 100644 --- a/build/devenv/evm/impl.go +++ b/build/devenv/evm/impl.go @@ -136,9 +136,8 @@ func extractEthClientFromBackend(client any) (*ethclient.Client, error) { // NewCCIP17EVM creates new smart-contracts wrappers with utility functions for CCIP17EVM implementation. func NewCCIP17EVM(ctx context.Context, logger zerolog.Logger, e *deployment.Environment, chainSelector uint64) (*CCIP17EVM, error) { var ( - onRamp *onramp.OnRamp - offRamp *offramp.OffRamp - onRampPoller eventPoller[cciptestinterfaces.MessageSentEvent] + onRamp *onramp.OnRamp + offRamp *offramp.OffRamp ) chainDetails, err := chainsel.GetChainDetails(chainSelector) if err != nil { @@ -192,7 +191,6 @@ func NewCCIP17EVM(ctx context.Context, logger zerolog.Logger, e *deployment.Envi ethClient: ethClient, onRamp: onRamp, offRamp: offRamp, - onRampPoller: &onRampPoller, }, nil } @@ -203,6 +201,11 @@ func (m *CCIP17EVM) ChainSelector() uint64 { func (m *CCIP17EVM) getOrCreateOnRampPoller() (*eventPoller[cciptestinterfaces.MessageSentEvent], error) { m.pollersMu.Lock() defer m.pollersMu.Unlock() + + if m.onRampPoller != nil { + return m.onRampPoller, nil + } + onRamp := m.onRamp ethClient := m.ethClient diff --git a/build/devenv/lib.go b/build/devenv/lib.go index 6d0945473..616a0f89e 100644 --- a/build/devenv/lib.go +++ b/build/devenv/lib.go @@ -32,8 +32,12 @@ type ChainImpl struct { // // Note that not all methods may be implemented by all backends. // For example, as of writing, a CLDF environment doesn't store indexer -// or aggregator endpoints, so the [Lib.Indexer] and [Lib.AllIndexers] -// methods will return an error. +// or aggregator endpoints. +// +// The plural offchain getters ([Lib.AllAggregators], [Lib.AllIndexers]) return an +// empty result with a nil error when no endpoints are configured, so callers can +// treat absence as a normal state via a length check and reserve a returned error +// for a real construction failure. type Lib interface { // Chains returns a slice of [ChainImpl] objects in an unspecified order. Chains(ctx context.Context) ([]ChainImpl, error) @@ -57,12 +61,13 @@ type Lib interface { // or an error if no indexer client is available. IndexerMonitor() (*IndexerMonitor, error) - // AllIndexers returns all indexer clients available. - // or an error if no indexer clients are available. + // AllIndexers returns all indexer clients available, or (nil, nil) if no + // indexer endpoints are configured. AllIndexers() ([]*client.IndexerClient, error) - // AllAggregators returns a mapping of qualifier name to the client of the aggregator for that qualifier. - // or an error if no aggregator clients are available. + // AllAggregators returns a mapping of qualifier name to the client of the + // aggregator for that qualifier, or an empty map (nil error) if no aggregator + // endpoints are configured. AllAggregators() (map[string]*AggregatorClient, error) } @@ -139,7 +144,7 @@ func (l *libFromCCV) AllAggregators() (map[string]*AggregatorClient, error) { } if len(l.cfg.AggregatorEndpoints) == 0 { - return nil, fmt.Errorf("no aggregator endpoints configured") + return map[string]*AggregatorClient{}, nil } aggregators := make(map[string]*AggregatorClient, len(l.cfg.AggregatorEndpoints)) @@ -194,7 +199,7 @@ func (l *libFromCCV) AllIndexers() ([]*client.IndexerClient, error) { return nil, fmt.Errorf("failed to initialize indexer client: %w", err) } if len(l.cfg.IndexerEndpoints) == 0 { - return nil, fmt.Errorf("no indexer endpoints configured") + return nil, nil } indexers := make([]*client.IndexerClient, 0, len(l.cfg.IndexerEndpoints)) httpClient := &http.Client{ @@ -229,7 +234,7 @@ type libFromCLDF struct { // AllIndexers implements [Lib]. func (l *libFromCLDF) AllIndexers() ([]*client.IndexerClient, error) { - return nil, fmt.Errorf("no indexer clients available in CLDF environment") + return nil, nil } // CLDFEnvironment implements [Lib]. @@ -307,7 +312,7 @@ func (l *libFromCLDF) IndexerMonitor() (*IndexerMonitor, error) { // AllAggregators implements [Lib]. func (l *libFromCLDF) AllAggregators() (map[string]*AggregatorClient, error) { - return nil, fmt.Errorf("no aggregator clients available in CLDF environment") + return map[string]*AggregatorClient{}, nil } // NewLibFromCLDFEnv creates a new [Lib] from a [deployment.Environment]. diff --git a/build/devenv/lib_test.go b/build/devenv/lib_test.go new file mode 100644 index 000000000..76a5ad25c --- /dev/null +++ b/build/devenv/lib_test.go @@ -0,0 +1,58 @@ +package ccv + +import ( + "testing" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func newTestLibFromCCV(cfg *Cfg) *libFromCCV { + return &libFromCCV{ + envOutFile: "test.toml", + cfg: cfg, + l: zerolog.Nop(), + } +} + +// TestLib_OffchainGetters covers the absence contract for both backends: the plural +// getters return an empty result with a nil error when nothing is configured, the +// singular getters return an error, and a real construction failure surfaces as an +// error from the plural getter too. +func TestLib_OffchainGetters(t *testing.T) { + ccvUnconfigured := newTestLibFromCCV(&Cfg{}) + ccvBadAggregatorCert := newTestLibFromCCV(&Cfg{ + AggregatorEndpoints: map[string]string{"default": "127.0.0.1:1"}, + AggregatorCACertFiles: map[string]string{"default": "/nonexistent/ca.pem"}, + }) + cldf := &libFromCLDF{l: zerolog.Nop()} + + tests := []struct { + name string + call func() (any, error) + wantErr bool // false: expect an empty result with a nil error + }{ + {"ccv unconfigured: AllAggregators", func() (any, error) { return ccvUnconfigured.AllAggregators() }, false}, + {"ccv unconfigured: AllIndexers", func() (any, error) { return ccvUnconfigured.AllIndexers() }, false}, + {"ccv unconfigured: Indexer", func() (any, error) { return ccvUnconfigured.Indexer() }, true}, + {"ccv unconfigured: IndexerMonitor", func() (any, error) { return ccvUnconfigured.IndexerMonitor() }, true}, + {"ccv bad aggregator cert: AllAggregators", func() (any, error) { return ccvBadAggregatorCert.AllAggregators() }, true}, + {"cldf: AllAggregators", func() (any, error) { return cldf.AllAggregators() }, false}, + {"cldf: AllIndexers", func() (any, error) { return cldf.AllIndexers() }, false}, + {"cldf: Indexer", func() (any, error) { return cldf.Indexer() }, true}, + {"cldf: IndexerMonitor", func() (any, error) { return cldf.IndexerMonitor() }, true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := tt.call() + if tt.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + assert.Empty(t, got) + }) + } +} diff --git a/build/devenv/tests/e2e/tcapi/basic/v3.go b/build/devenv/tests/e2e/tcapi/basic/v3.go index 590683beb..cf3152398 100644 --- a/build/devenv/tests/e2e/tcapi/basic/v3.go +++ b/build/devenv/tests/e2e/tcapi/basic/v3.go @@ -111,19 +111,9 @@ func (tc *v3TestCase) Run(ctx context.Context) error { } messageID := sendMessageResult.MessageID - aggregatorClients, err := tc.lib.AllAggregators() + aggregatorClient, indexerMonitor, err := tcapi.SetupOffchainClients(tc.lib, tc.aggregatorQualifier) if err != nil { - return fmt.Errorf("failed to get aggregator clients: %w", err) - } - aggregatorClient := aggregatorClients[common.DefaultCommitteeVerifierQualifier] - if tc.aggregatorQualifier != "" && tc.aggregatorQualifier != common.DefaultCommitteeVerifierQualifier { - if client, ok := aggregatorClients[tc.aggregatorQualifier]; ok { - aggregatorClient = client - } - } - indexerMonitor, err := tc.lib.IndexerMonitor() - if err != nil { - return fmt.Errorf("failed to get indexer monitor: %w", err) + return err } testCtx, cleanupFn := tcapi.NewTestingContext(ctx, chainMap, aggregatorClient, indexerMonitor) defer cleanupFn() @@ -138,10 +128,10 @@ func (tc *v3TestCase) Run(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to assert message: %w", err) } - if result.AggregatedResult == nil { + if aggregatorClient != nil && result.AggregatedResult == nil { return fmt.Errorf("aggregated result is nil") } - if len(result.IndexedVerifications.Results) != tc.numExpectedVerifications { + if indexerMonitor != nil && len(result.IndexedVerifications.Results) != tc.numExpectedVerifications { return fmt.Errorf("expected %d indexed verifications, got %d", tc.numExpectedVerifications, len(result.IndexedVerifications.Results)) } diff --git a/build/devenv/tests/e2e/tcapi/offchain.go b/build/devenv/tests/e2e/tcapi/offchain.go new file mode 100644 index 000000000..bd8cbde4a --- /dev/null +++ b/build/devenv/tests/e2e/tcapi/offchain.go @@ -0,0 +1,41 @@ +package tcapi + +import ( + "fmt" + + ccv "github.com/smartcontractkit/chainlink-ccv/build/devenv" + "github.com/smartcontractkit/chainlink-ccv/build/devenv/common" +) + +// SetupOffchainClients resolves the aggregator and indexer clients from the +// environment. When no offchain endpoints are configured it returns nil clients and +// a nil error; NewTestingContext skips assertion stages for nil clients. A non-nil +// error means a configured client failed to construct. +func SetupOffchainClients(lib ccv.Lib, aggregatorQualifier string) (*ccv.AggregatorClient, *ccv.IndexerMonitor, error) { + aggregatorClients, err := lib.AllAggregators() + if err != nil { + return nil, nil, fmt.Errorf("failed to get aggregator clients: %w", err) + } + var aggregatorClient *ccv.AggregatorClient + if len(aggregatorClients) > 0 { + aggregatorClient = aggregatorClients[common.DefaultCommitteeVerifierQualifier] + if aggregatorQualifier != "" && aggregatorQualifier != common.DefaultCommitteeVerifierQualifier { + if client, ok := aggregatorClients[aggregatorQualifier]; ok { + aggregatorClient = client + } + } + } + + indexers, err := lib.AllIndexers() + if err != nil { + return nil, nil, fmt.Errorf("failed to get indexer clients: %w", err) + } + var indexerMonitor *ccv.IndexerMonitor + if len(indexers) > 0 { + indexerMonitor, err = lib.IndexerMonitor() + if err != nil { + return nil, nil, fmt.Errorf("failed to get indexer monitor: %w", err) + } + } + return aggregatorClient, indexerMonitor, nil +} diff --git a/build/devenv/tests/e2e/tcapi/offchain_test.go b/build/devenv/tests/e2e/tcapi/offchain_test.go new file mode 100644 index 000000000..2aaf02daf --- /dev/null +++ b/build/devenv/tests/e2e/tcapi/offchain_test.go @@ -0,0 +1,127 @@ +package tcapi + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + ccv "github.com/smartcontractkit/chainlink-ccv/build/devenv" + "github.com/smartcontractkit/chainlink-ccv/build/devenv/cciptestinterfaces" + "github.com/smartcontractkit/chainlink-ccv/build/devenv/common" + "github.com/smartcontractkit/chainlink-ccv/indexer/pkg/client" + "github.com/smartcontractkit/chainlink-deployments-framework/datastore" + "github.com/smartcontractkit/chainlink-deployments-framework/deployment" +) + +// stubLib is a minimal ccv.Lib test double. Absence is expressed by leaving the +// plural-getter fields empty; a construction failure by setting the *Err fields. +type stubLib struct { + aggregators map[string]*ccv.AggregatorClient + aggregatorsErr error + indexers []*client.IndexerClient + allIndexersErr error + indexerMonitor *ccv.IndexerMonitor + indexerMonitorErr error +} + +func (s *stubLib) Chains(context.Context) ([]ccv.ChainImpl, error) { return nil, nil } +func (s *stubLib) ChainsMap(context.Context) (map[uint64]cciptestinterfaces.CCIP17, error) { + return nil, nil +} +func (s *stubLib) CLDFEnvironment() (*deployment.Environment, error) { return nil, nil } +func (s *stubLib) DataStore() (datastore.DataStore, error) { return nil, nil } +func (s *stubLib) Indexer() (*client.IndexerClient, error) { return nil, nil } +func (s *stubLib) IndexerMonitor() (*ccv.IndexerMonitor, error) { + return s.indexerMonitor, s.indexerMonitorErr +} + +func (s *stubLib) AllIndexers() ([]*client.IndexerClient, error) { + return s.indexers, s.allIndexersErr +} + +func (s *stubLib) AllAggregators() (map[string]*ccv.AggregatorClient, error) { + return s.aggregators, s.aggregatorsErr +} + +var _ ccv.Lib = (*stubLib)(nil) + +// presentIndexers is a non-empty indexer slice for the "configured" branch; the +// element only needs to be non-nil for the length check. +func presentIndexers() []*client.IndexerClient { return []*client.IndexerClient{{}} } + +func TestSetupOffchainClients(t *testing.T) { + defaultAgg := &ccv.AggregatorClient{} + secondaryAgg := &ccv.AggregatorClient{} + + tests := []struct { + name string + lib *stubLib + qualifier string + wantErr bool + wantAgg *ccv.AggregatorClient // nil expects a nil aggregator client + wantIdx bool // expect a non-nil indexer monitor + }{ + { + name: "both absent", + lib: &stubLib{}, + }, + { + name: "both present", + lib: &stubLib{ + aggregators: map[string]*ccv.AggregatorClient{common.DefaultCommitteeVerifierQualifier: defaultAgg}, + indexers: presentIndexers(), + indexerMonitor: &ccv.IndexerMonitor{}, + }, + wantAgg: defaultAgg, + wantIdx: true, + }, + { + name: "qualifier override, indexer absent", + lib: &stubLib{ + aggregators: map[string]*ccv.AggregatorClient{ + common.DefaultCommitteeVerifierQualifier: defaultAgg, + "secondary": secondaryAgg, + }, + }, + qualifier: "secondary", + wantAgg: secondaryAgg, + }, + { + name: "aggregator construction fails", + lib: &stubLib{aggregatorsErr: errors.New("connection refused")}, + wantErr: true, + }, + { + name: "indexer monitor construction fails", + lib: &stubLib{ + indexers: presentIndexers(), + indexerMonitorErr: errors.New("dial tcp: connection refused"), + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + agg, idx, err := SetupOffchainClients(tt.lib, tt.qualifier) + if tt.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + if tt.wantAgg == nil { + assert.Nil(t, agg) + } else { + assert.Same(t, tt.wantAgg, agg) + } + if tt.wantIdx { + assert.NotNil(t, idx) + } else { + assert.Nil(t, idx) + } + }) + } +} diff --git a/build/devenv/tests/e2e/tcapi/token_transfer/v3.go b/build/devenv/tests/e2e/tcapi/token_transfer/v3.go index ab44af377..fa00d2f65 100644 --- a/build/devenv/tests/e2e/tcapi/token_transfer/v3.go +++ b/build/devenv/tests/e2e/tcapi/token_transfer/v3.go @@ -138,14 +138,9 @@ func (tc *tokenTransferV3TestCase) Run(ctx context.Context) error { } msgID := sendRes.MessageID - aggregatorClients, err := tc.lib.AllAggregators() + aggregatorClient, indexerMonitor, err := tcapi.SetupOffchainClients(tc.lib, "") if err != nil { - return fmt.Errorf("failed to get aggregator clients: %w", err) - } - aggregatorClient := aggregatorClients[common.DefaultCommitteeVerifierQualifier] - indexerMonitor, err := tc.lib.IndexerMonitor() - if err != nil { - return fmt.Errorf("failed to get indexer monitor: %w", err) + return err } testCtx, cleanupFn := tcapi.NewTestingContext(ctx, chainMap, aggregatorClient, indexerMonitor) defer cleanupFn() @@ -160,7 +155,7 @@ func (tc *tokenTransferV3TestCase) Run(ctx context.Context) error { if err != nil { return fmt.Errorf("assert message: %w", err) } - if res.AggregatedResult == nil { + if aggregatorClient != nil && res.AggregatedResult == nil { return fmt.Errorf("aggregated result is nil") }