Skip to content
Merged
27 changes: 26 additions & 1 deletion build/devenv/evm/event_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ import (
"github.com/smartcontractkit/chainlink-ccv/protocol"
)

// fallbackLookbackBlocks bounds how far back the first eth_getLogs scan reaches on a
// long-lived chain, so it neither scans from genesis nor exceeds the provider's
// getLogs range limit. The bound is in blocks, not wall-clock, since the target event
// is always recent. Mirrors verifier/pkg/sourcereader's DefaultMaxBlockRange.
const fallbackLookbackBlocks uint64 = 1500

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kept as an independent constant rather than importing sourcereader.DefaultMaxBlockRange to avoid a devenv>verifier package dependency


type eventKey struct {
chainSelector uint64
msgNum uint64
Expand Down Expand Up @@ -177,7 +183,19 @@ func (p *eventPoller[T]) poll() {
return
}

events, err := p.pollFn(lastScanned+1, latestBlock)
start := lastScanned + 1
if lastScanned == 0 {
if lookbackStart := fallbackStartBlock(latestBlock); lookbackStart > start {
start = lookbackStart
p.logger.Debug().
Uint64("fromBlock", start).
Uint64("toBlock", latestBlock).
Str("event", p.eventName).
Msg("Using fallback start block (bounded lookback)")
}
}

events, err := p.pollFn(start, latestBlock)
if err != nil {
p.logger.Warn().Err(err).Str("event", p.eventName).Msg("Failed to poll events")
return
Expand Down Expand Up @@ -220,6 +238,13 @@ func (p *eventPoller[T]) poll() {
p.lastScannedBlock = latestBlock
}

func fallbackStartBlock(latestBlock uint64) uint64 {
if latestBlock > fallbackLookbackBlocks {
return latestBlock - fallbackLookbackBlocks
}
return 0
}

func (p *eventPoller[T]) addToCache(key eventKey, result pollerResult[T]) {
seqKey := eventKey{chainSelector: key.chainSelector, msgNum: key.msgNum}
if _, exists := p.cachedBySeqNum[seqKey]; !exists {
Expand Down
13 changes: 13 additions & 0 deletions build/devenv/evm/event_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,16 @@ func TestEventPollerByMessageID(t *testing.T) {
}
})
}

func TestFallbackStartBlock(t *testing.T) {
t.Parallel()

// Long-lived chain: the first scan reaches back exactly the lookback window,
// independent of block time.
const latest = uint64(10_000_000)
require.Equal(t, latest-fallbackLookbackBlocks, fallbackStartBlock(latest))

// Chain younger than (or exactly at) the lookback window scans from genesis.
Comment thread
jadepark-dev marked this conversation as resolved.
require.Equal(t, uint64(0), fallbackStartBlock(fallbackLookbackBlocks))
require.Equal(t, uint64(0), fallbackStartBlock(100))
}
11 changes: 7 additions & 4 deletions build/devenv/evm/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,8 @@ func extractEthClientFromBackend(client any) (*ethclient.Client, error) {
// NewCCIP17EVM creates new smart-contracts wrappers with utility functions for CCIP17EVM implementation.
func NewCCIP17EVM(ctx context.Context, logger zerolog.Logger, e *deployment.Environment, chainSelector uint64) (*CCIP17EVM, error) {
var (
onRamp *onramp.OnRamp
offRamp *offramp.OffRamp
onRampPoller eventPoller[cciptestinterfaces.MessageSentEvent]
onRamp *onramp.OnRamp
offRamp *offramp.OffRamp
)
chainDetails, err := chainsel.GetChainDetails(chainSelector)
if err != nil {
Expand Down Expand Up @@ -192,7 +191,6 @@ func NewCCIP17EVM(ctx context.Context, logger zerolog.Logger, e *deployment.Envi
ethClient: ethClient,
onRamp: onRamp,
offRamp: offRamp,
onRampPoller: &onRampPoller,
}, nil
}

Expand All @@ -203,6 +201,11 @@ func (m *CCIP17EVM) ChainSelector() uint64 {
func (m *CCIP17EVM) getOrCreateOnRampPoller() (*eventPoller[cciptestinterfaces.MessageSentEvent], error) {
m.pollersMu.Lock()
defer m.pollersMu.Unlock()

if m.onRampPoller != nil {
return m.onRampPoller, nil
}

onRamp := m.onRamp
ethClient := m.ethClient

Expand Down
25 changes: 15 additions & 10 deletions build/devenv/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ type ChainImpl struct {
//
// Note that not all methods may be implemented by all backends.
// For example, as of writing, a CLDF environment doesn't store indexer
// or aggregator endpoints, so the [Lib.Indexer] and [Lib.AllIndexers]
// methods will return an error.
// or aggregator endpoints.
//
// The plural offchain getters ([Lib.AllAggregators], [Lib.AllIndexers]) return an
// empty result with a nil error when no endpoints are configured, so callers can
// treat absence as a normal state via a length check and reserve a returned error
// for a real construction failure.
type Lib interface {
// Chains returns a slice of [ChainImpl] objects in an unspecified order.
Chains(ctx context.Context) ([]ChainImpl, error)
Expand All @@ -57,12 +61,13 @@ type Lib interface {
// or an error if no indexer client is available.
IndexerMonitor() (*IndexerMonitor, error)

// AllIndexers returns all indexer clients available.
// or an error if no indexer clients are available.
// AllIndexers returns all indexer clients available, or (nil, nil) if no
// indexer endpoints are configured.
AllIndexers() ([]*client.IndexerClient, error)

// AllAggregators returns a mapping of qualifier name to the client of the aggregator for that qualifier.
// or an error if no aggregator clients are available.
// AllAggregators returns a mapping of qualifier name to the client of the
// aggregator for that qualifier, or an empty map (nil error) if no aggregator
// endpoints are configured.
AllAggregators() (map[string]*AggregatorClient, error)
}

Expand Down Expand Up @@ -139,7 +144,7 @@ func (l *libFromCCV) AllAggregators() (map[string]*AggregatorClient, error) {
}

if len(l.cfg.AggregatorEndpoints) == 0 {
return nil, fmt.Errorf("no aggregator endpoints configured")
return map[string]*AggregatorClient{}, nil
}

aggregators := make(map[string]*AggregatorClient, len(l.cfg.AggregatorEndpoints))
Expand Down Expand Up @@ -194,7 +199,7 @@ func (l *libFromCCV) AllIndexers() ([]*client.IndexerClient, error) {
return nil, fmt.Errorf("failed to initialize indexer client: %w", err)
}
if len(l.cfg.IndexerEndpoints) == 0 {
return nil, fmt.Errorf("no indexer endpoints configured")
return nil, nil
}
indexers := make([]*client.IndexerClient, 0, len(l.cfg.IndexerEndpoints))
httpClient := &http.Client{
Expand Down Expand Up @@ -229,7 +234,7 @@ type libFromCLDF struct {

// AllIndexers implements [Lib].
func (l *libFromCLDF) AllIndexers() ([]*client.IndexerClient, error) {
return nil, fmt.Errorf("no indexer clients available in CLDF environment")
return nil, nil
}

// CLDFEnvironment implements [Lib].
Expand Down Expand Up @@ -307,7 +312,7 @@ func (l *libFromCLDF) IndexerMonitor() (*IndexerMonitor, error) {

// AllAggregators implements [Lib].
func (l *libFromCLDF) AllAggregators() (map[string]*AggregatorClient, error) {
return nil, fmt.Errorf("no aggregator clients available in CLDF environment")
return map[string]*AggregatorClient{}, nil
}

// NewLibFromCLDFEnv creates a new [Lib] from a [deployment.Environment].
Expand Down
58 changes: 58 additions & 0 deletions build/devenv/lib_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package ccv

import (
"testing"

"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func newTestLibFromCCV(cfg *Cfg) *libFromCCV {
return &libFromCCV{
envOutFile: "test.toml",
cfg: cfg,
l: zerolog.Nop(),
}
}

// TestLib_OffchainGetters covers the absence contract for both backends: the plural
// getters return an empty result with a nil error when nothing is configured, the
// singular getters return an error, and a real construction failure surfaces as an
// error from the plural getter too.
func TestLib_OffchainGetters(t *testing.T) {
ccvUnconfigured := newTestLibFromCCV(&Cfg{})
ccvBadAggregatorCert := newTestLibFromCCV(&Cfg{
AggregatorEndpoints: map[string]string{"default": "127.0.0.1:1"},
AggregatorCACertFiles: map[string]string{"default": "/nonexistent/ca.pem"},
})
cldf := &libFromCLDF{l: zerolog.Nop()}

tests := []struct {
name string
call func() (any, error)
wantErr bool // false: expect an empty result with a nil error
}{
{"ccv unconfigured: AllAggregators", func() (any, error) { return ccvUnconfigured.AllAggregators() }, false},
{"ccv unconfigured: AllIndexers", func() (any, error) { return ccvUnconfigured.AllIndexers() }, false},
{"ccv unconfigured: Indexer", func() (any, error) { return ccvUnconfigured.Indexer() }, true},
{"ccv unconfigured: IndexerMonitor", func() (any, error) { return ccvUnconfigured.IndexerMonitor() }, true},
{"ccv bad aggregator cert: AllAggregators", func() (any, error) { return ccvBadAggregatorCert.AllAggregators() }, true},
{"cldf: AllAggregators", func() (any, error) { return cldf.AllAggregators() }, false},
{"cldf: AllIndexers", func() (any, error) { return cldf.AllIndexers() }, false},
{"cldf: Indexer", func() (any, error) { return cldf.Indexer() }, true},
{"cldf: IndexerMonitor", func() (any, error) { return cldf.IndexerMonitor() }, true},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := tt.call()
if tt.wantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
assert.Empty(t, got)
})
}
}
18 changes: 4 additions & 14 deletions build/devenv/tests/e2e/tcapi/basic/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,19 +111,9 @@ func (tc *v3TestCase) Run(ctx context.Context) error {
}
messageID := sendMessageResult.MessageID

aggregatorClients, err := tc.lib.AllAggregators()
aggregatorClient, indexerMonitor, err := tcapi.SetupOffchainClients(tc.lib, tc.aggregatorQualifier)
if err != nil {
return fmt.Errorf("failed to get aggregator clients: %w", err)
}
aggregatorClient := aggregatorClients[common.DefaultCommitteeVerifierQualifier]
if tc.aggregatorQualifier != "" && tc.aggregatorQualifier != common.DefaultCommitteeVerifierQualifier {
if client, ok := aggregatorClients[tc.aggregatorQualifier]; ok {
aggregatorClient = client
}
}
indexerMonitor, err := tc.lib.IndexerMonitor()
if err != nil {
return fmt.Errorf("failed to get indexer monitor: %w", err)
return err
}
testCtx, cleanupFn := tcapi.NewTestingContext(ctx, chainMap, aggregatorClient, indexerMonitor)
defer cleanupFn()
Expand All @@ -138,10 +128,10 @@ func (tc *v3TestCase) Run(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to assert message: %w", err)
}
if result.AggregatedResult == nil {
if aggregatorClient != nil && result.AggregatedResult == nil {
return fmt.Errorf("aggregated result is nil")
}
if len(result.IndexedVerifications.Results) != tc.numExpectedVerifications {
if indexerMonitor != nil && len(result.IndexedVerifications.Results) != tc.numExpectedVerifications {
return fmt.Errorf("expected %d indexed verifications, got %d", tc.numExpectedVerifications, len(result.IndexedVerifications.Results))
}

Expand Down
41 changes: 41 additions & 0 deletions build/devenv/tests/e2e/tcapi/offchain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package tcapi

import (
"fmt"

ccv "github.com/smartcontractkit/chainlink-ccv/build/devenv"
"github.com/smartcontractkit/chainlink-ccv/build/devenv/common"
)

// SetupOffchainClients resolves the aggregator and indexer clients from the
// environment. When no offchain endpoints are configured it returns nil clients and
// a nil error; NewTestingContext skips assertion stages for nil clients. A non-nil
// error means a configured client failed to construct.
func SetupOffchainClients(lib ccv.Lib, aggregatorQualifier string) (*ccv.AggregatorClient, *ccv.IndexerMonitor, error) {
aggregatorClients, err := lib.AllAggregators()
if err != nil {
return nil, nil, fmt.Errorf("failed to get aggregator clients: %w", err)
}
var aggregatorClient *ccv.AggregatorClient
if len(aggregatorClients) > 0 {
aggregatorClient = aggregatorClients[common.DefaultCommitteeVerifierQualifier]
if aggregatorQualifier != "" && aggregatorQualifier != common.DefaultCommitteeVerifierQualifier {
if client, ok := aggregatorClients[aggregatorQualifier]; ok {
aggregatorClient = client
}
}
}

indexers, err := lib.AllIndexers()
if err != nil {
return nil, nil, fmt.Errorf("failed to get indexer clients: %w", err)
}
var indexerMonitor *ccv.IndexerMonitor
if len(indexers) > 0 {
indexerMonitor, err = lib.IndexerMonitor()
if err != nil {
return nil, nil, fmt.Errorf("failed to get indexer monitor: %w", err)
}
}
return aggregatorClient, indexerMonitor, nil
}
Loading
Loading