diff --git a/relayer/chainreader/reader/chainreader.go b/relayer/chainreader/reader/chainreader.go index eefc77c99..325f4b10b 100644 --- a/relayer/chainreader/reader/chainreader.go +++ b/relayer/chainreader/reader/chainreader.go @@ -484,59 +484,97 @@ func (s *suiChainReader) QueryKeyWithMetadata(ctx context.Context, contract pkgt func (s *suiChainReader) BatchGetLatestValues(ctx context.Context, request pkgtypes.BatchGetLatestValuesRequest) (pkgtypes.BatchGetLatestValuesResult, error) { result := make(pkgtypes.BatchGetLatestValuesResult) + // Process each contract's batch concurrently. Reads *within* a contract are already + // parallelized in processContractBatch; parallelizing *across* contracts as well keeps the + // whole batch within the caller's (often tight) deadline — e.g. the CCIP commit observation + // budget, where a serial per-contract loop over the offRamp/RMN config reads was exceeding + // the deadline and causing OffRampNextSeqNums consensus to fail. + var ( + mu sync.Mutex + wg sync.WaitGroup + firstErr error + ) + for contract, batch := range request { - batchResults := make(pkgtypes.ContractBatchResults, len(batch)) - resultChan := make(chan struct { - index int - result pkgtypes.BatchReadResult - }, len(batch)) - - var waitgroup sync.WaitGroup - waitgroup.Add(len(batch)) - - for i, read := range batch { - go func(index int, read pkgtypes.BatchRead, contract pkgtypes.BoundContract) { - defer waitgroup.Done() - readResult := pkgtypes.BatchReadResult{ReadName: read.ReadName} - - err := s.GetLatestValue(ctx, contract.ReadIdentifier(read.ReadName), primitives.Finalized, read.Params, read.ReturnVal) - readResult.SetResult(read.ReturnVal, err) - - select { - case resultChan <- struct { - index int - result pkgtypes.BatchReadResult - }{index, readResult}: - case <-ctx.Done(): - return + wg.Add(1) + go func(contract pkgtypes.BoundContract, batch pkgtypes.ContractBatch) { + defer wg.Done() + + batchResults, err := s.processContractBatch(ctx, contract, batch) + + mu.Lock() + defer mu.Unlock() + if err != nil { + if firstErr == nil { + firstErr = err } - }(i, read, contract) - } + return + } + result[contract] = batchResults + }(contract, batch) + } - // wait for all the results to be processed then close the channel - go func() { - waitgroup.Wait() - close(resultChan) - }() + wg.Wait() - resultsReceived := 0 - for res := range resultChan { - batchResults[res.index] = res.result - resultsReceived++ - } + if firstErr != nil { + return nil, firstErr + } - // check if all the results were received - if resultsReceived != len(batch) { - if err := ctx.Err(); err != nil { - return nil, err + return result, nil +} + +// processContractBatch executes all reads for a single bound contract concurrently and returns +// the results in request order. +func (s *suiChainReader) processContractBatch(ctx context.Context, contract pkgtypes.BoundContract, batch pkgtypes.ContractBatch) (pkgtypes.ContractBatchResults, error) { + batchResults := make(pkgtypes.ContractBatchResults, len(batch)) + resultChan := make(chan struct { + index int + result pkgtypes.BatchReadResult + }, len(batch)) + + var waitgroup sync.WaitGroup + waitgroup.Add(len(batch)) + + for i, read := range batch { + go func(index int, read pkgtypes.BatchRead, contract pkgtypes.BoundContract) { + defer waitgroup.Done() + readResult := pkgtypes.BatchReadResult{ReadName: read.ReadName} + + err := s.GetLatestValue(ctx, contract.ReadIdentifier(read.ReadName), primitives.Finalized, read.Params, read.ReturnVal) + readResult.SetResult(read.ReturnVal, err) + + select { + case resultChan <- struct { + index int + result pkgtypes.BatchReadResult + }{index, readResult}: + case <-ctx.Done(): + return } - return nil, fmt.Errorf("batch processing failed: expected %d results, received %d", len(batch), resultsReceived) - } + }(i, read, contract) + } - result[contract] = batchResults + // wait for all the results to be processed then close the channel + go func() { + waitgroup.Wait() + close(resultChan) + }() + + resultsReceived := 0 + for res := range resultChan { + batchResults[res.index] = res.result + resultsReceived++ } - return result, nil + // check if all the results were received + if resultsReceived != len(batch) { + if err := ctx.Err(); err != nil { + return nil, err + } + return nil, fmt.Errorf("batch processing failed: expected %d results, received %d", len(batch), resultsReceived) + } + + return batchResults, nil } func (s *suiChainReader) CreateContractType(readName string, forEncoding bool) (any, error) { diff --git a/relayer/client/ptb_client.go b/relayer/client/ptb_client.go index 9eb139b40..e2257b868 100644 --- a/relayer/client/ptb_client.go +++ b/relayer/client/ptb_client.go @@ -19,6 +19,7 @@ import ( "github.com/block-vision/sui-go-sdk/transaction" cache "github.com/patrickmn/go-cache" "golang.org/x/sync/semaphore" + "golang.org/x/sync/singleflight" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/loop" @@ -42,8 +43,20 @@ const ( DefaultCacheExpiration time.Duration = 120 * time.Minute DefaultCacheCleanupInterval time.Duration = 240 * time.Minute DefaultHTTPTimeout time.Duration = 30 * time.Second + // PackageIDCacheExpiration bounds how long a resolved (package,module)->package-IDs mapping + // is cached. Package IDs only change on a package upgrade, but the chain reader resolves the + // latest package ID on *every* read (loadModulePackageIdsInternal does ~4 RPCs each), which + // dominates the dest-config batch latency in CCIP commit observation. Caching it removes that + // fan-out and self-heals within the TTL after an upgrade; call InvalidatePackageIDCache for an + // immediate refresh. + PackageIDCacheExpiration time.Duration = 30 * time.Second ) +// packageIDCacheKey builds the cache key for a resolved (package,module)->package-IDs mapping. +func packageIDCacheKey(packageID, module string) string { + return fmt.Sprintf("modulePackageIds::%s::%s", packageID, module) +} + var RateLimitWeights = map[string]int64{ "MoveCall": 1, "SendTransaction": 1, @@ -128,6 +141,10 @@ type PTBClient struct { normalizedModules map[string]map[string]models.GetNormalizedMoveModuleResponse cache *cache.Cache // used for caching object IDs (e.g. offramp state object ID or state pointers) + + // pkgIDGroup de-duplicates concurrent package-ID resolutions for the same (package,module) + // so a cold cache / TTL expiry doesn't let parallel chain-reader reads stampede the resolution. + pkgIDGroup singleflight.Group } var _ SuiPTBClient = (*PTBClient)(nil) @@ -1073,8 +1090,58 @@ func (c *PTBClient) LoadModulePackageIds(ctx context.Context, packageId string, return result, err } -// loadModulePackageIdsInternal is the internal implementation without rate limiting +// loadModulePackageIdsInternal is the internal implementation without rate limiting. It caches the +// resolved package-ID chain per (packageId, module) for PackageIDCacheExpiration to avoid the +// ~4 RPCs that loadModulePackageIdsUncached performs on every chain-reader read. The cached slice +// is copied on the way in and out so callers can't mutate shared state. func (c *PTBClient) loadModulePackageIdsInternal(ctx context.Context, packageId string, module string) ([]string, error) { + cacheKey := packageIDCacheKey(packageId, module) + if cached, found := c.cache.Get(cacheKey); found { + if ids, ok := cached.([]string); ok { + return append([]string(nil), ids...), nil + } + } + + // Single-flight the resolution so a cold cache / TTL expiry doesn't let concurrent + // chain-reader reads each run the ~3-RPC package-ID resolution for the same module. + resolved, err, _ := c.pkgIDGroup.Do(cacheKey, func() (any, error) { + // Another flight may have populated the cache while we were queued. + if cached, found := c.cache.Get(cacheKey); found { + if ids, ok := cached.([]string); ok { + return ids, nil + } + } + + ids, err := c.loadModulePackageIdsUncached(ctx, packageId, module) + if err != nil { + return nil, err + } + + c.cache.Set(cacheKey, append([]string(nil), ids...), PackageIDCacheExpiration) + + return ids, nil + }) + if err != nil { + return nil, err + } + + // Copy: the single-flight result is shared across all waiters. + ids, _ := resolved.([]string) + + return append([]string(nil), ids...), nil +} + +// InvalidatePackageIDCache evicts the cached package-ID chain for (packageId, module). Call this +// immediately after a package upgrade so the next read re-resolves the upgrade chain instead of +// waiting for PackageIDCacheExpiration to elapse. +func (c *PTBClient) InvalidatePackageIDCache(packageId string, module string) { + c.cache.Delete(packageIDCacheKey(packageId, module)) +} + +// loadModulePackageIdsUncached resolves the full package-ID upgrade chain for a module by reading +// on-chain state (normalized module + owned objects + pointer/state objects). It performs several +// RPCs and is therefore wrapped by loadModulePackageIdsInternal for caching. +func (c *PTBClient) loadModulePackageIdsUncached(ctx context.Context, packageId string, module string) ([]string, error) { // Ensure that the module keeps track of its package IDs by checking that it has `add_package_id` function normalizedModule, err := c.getNormalizedModuleInternal(ctx, packageId, module) if err != nil { diff --git a/relayer/client/ptb_client_pkgid_cache_test.go b/relayer/client/ptb_client_pkgid_cache_test.go new file mode 100644 index 000000000..7ac3b9b21 --- /dev/null +++ b/relayer/client/ptb_client_pkgid_cache_test.go @@ -0,0 +1,63 @@ +package client + +import ( + "context" + "testing" + + cache "github.com/patrickmn/go-cache" + "github.com/stretchr/testify/require" +) + +// newPkgIDCacheTestClient builds a minimal PTBClient with only the cache wired. The cache-hit and +// invalidation paths under test never touch the network or the logger, so the other fields can be +// left zero-valued. +func newPkgIDCacheTestClient() *PTBClient { + return &PTBClient{ + cache: cache.New(DefaultCacheExpiration, DefaultCacheCleanupInterval), + } +} + +func TestLoadModulePackageIdsInternal_CacheHitSkipsResolution(t *testing.T) { + c := newPkgIDCacheTestClient() + const pkg, module = "0xpkg", "offramp" + want := []string{"0xv1", "0xv2"} + + // Pre-populate the cache; a hit must return without hitting loadModulePackageIdsUncached + // (which would dial the RPC via the nil client and panic/error). + c.cache.Set(packageIDCacheKey(pkg, module), append([]string(nil), want...), PackageIDCacheExpiration) + + got, err := c.loadModulePackageIdsInternal(context.Background(), pkg, module) + require.NoError(t, err) + require.Equal(t, want, got) +} + +func TestLoadModulePackageIdsInternal_ReturnsCopy(t *testing.T) { + c := newPkgIDCacheTestClient() + const pkg, module = "0xpkg", "offramp" + c.cache.Set(packageIDCacheKey(pkg, module), []string{"0xv1", "0xv2"}, PackageIDCacheExpiration) + + first, err := c.loadModulePackageIdsInternal(context.Background(), pkg, module) + require.NoError(t, err) + + // Mutating the returned slice must not corrupt the cached value. + first[0] = "0xMUTATED" + + second, err := c.loadModulePackageIdsInternal(context.Background(), pkg, module) + require.NoError(t, err) + require.Equal(t, []string{"0xv1", "0xv2"}, second, "cached slice must be isolated from caller mutations") +} + +func TestInvalidatePackageIDCache(t *testing.T) { + c := newPkgIDCacheTestClient() + const pkg, module = "0xpkg", "offramp" + key := packageIDCacheKey(pkg, module) + c.cache.Set(key, []string{"0xv1"}, PackageIDCacheExpiration) + + _, found := c.cache.Get(key) + require.True(t, found) + + c.InvalidatePackageIDCache(pkg, module) + + _, found = c.cache.Get(key) + require.False(t, found, "InvalidatePackageIDCache must evict the cached entry") +}