Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 82 additions & 44 deletions relayer/chainreader/reader/chainreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,59 +484,97 @@ func (s *suiChainReader) QueryKeyWithMetadata(ctx context.Context, contract pkgt
func (s *suiChainReader) BatchGetLatestValues(ctx context.Context, request pkgtypes.BatchGetLatestValuesRequest) (pkgtypes.BatchGetLatestValuesResult, error) {
result := make(pkgtypes.BatchGetLatestValuesResult)

// Process each contract's batch concurrently. Reads *within* a contract are already
// parallelized in processContractBatch; parallelizing *across* contracts as well keeps the
// whole batch within the caller's (often tight) deadline — e.g. the CCIP commit observation
// budget, where a serial per-contract loop over the offRamp/RMN config reads was exceeding
// the deadline and causing OffRampNextSeqNums consensus to fail.
var (
mu sync.Mutex
wg sync.WaitGroup
firstErr error
)

for contract, batch := range request {
batchResults := make(pkgtypes.ContractBatchResults, len(batch))
resultChan := make(chan struct {
index int
result pkgtypes.BatchReadResult
}, len(batch))

var waitgroup sync.WaitGroup
waitgroup.Add(len(batch))

for i, read := range batch {
go func(index int, read pkgtypes.BatchRead, contract pkgtypes.BoundContract) {
defer waitgroup.Done()
readResult := pkgtypes.BatchReadResult{ReadName: read.ReadName}

err := s.GetLatestValue(ctx, contract.ReadIdentifier(read.ReadName), primitives.Finalized, read.Params, read.ReturnVal)
readResult.SetResult(read.ReturnVal, err)

select {
case resultChan <- struct {
index int
result pkgtypes.BatchReadResult
}{index, readResult}:
case <-ctx.Done():
return
wg.Add(1)
go func(contract pkgtypes.BoundContract, batch pkgtypes.ContractBatch) {
defer wg.Done()

batchResults, err := s.processContractBatch(ctx, contract, batch)

mu.Lock()
defer mu.Unlock()
if err != nil {
if firstErr == nil {
firstErr = err
}
}(i, read, contract)
}
return
}
result[contract] = batchResults
}(contract, batch)
}

// wait for all the results to be processed then close the channel
go func() {
waitgroup.Wait()
close(resultChan)
}()
wg.Wait()

resultsReceived := 0
for res := range resultChan {
batchResults[res.index] = res.result
resultsReceived++
}
if firstErr != nil {
return nil, firstErr
}

// check if all the results were received
if resultsReceived != len(batch) {
if err := ctx.Err(); err != nil {
return nil, err
return result, nil
}

// processContractBatch executes all reads for a single bound contract concurrently and returns
// the results in request order.
func (s *suiChainReader) processContractBatch(ctx context.Context, contract pkgtypes.BoundContract, batch pkgtypes.ContractBatch) (pkgtypes.ContractBatchResults, error) {
batchResults := make(pkgtypes.ContractBatchResults, len(batch))
resultChan := make(chan struct {
index int
result pkgtypes.BatchReadResult
}, len(batch))

var waitgroup sync.WaitGroup
waitgroup.Add(len(batch))

for i, read := range batch {
go func(index int, read pkgtypes.BatchRead, contract pkgtypes.BoundContract) {
defer waitgroup.Done()
readResult := pkgtypes.BatchReadResult{ReadName: read.ReadName}

err := s.GetLatestValue(ctx, contract.ReadIdentifier(read.ReadName), primitives.Finalized, read.Params, read.ReturnVal)
readResult.SetResult(read.ReturnVal, err)

select {
case resultChan <- struct {
index int
result pkgtypes.BatchReadResult
}{index, readResult}:
case <-ctx.Done():
return
}
return nil, fmt.Errorf("batch processing failed: expected %d results, received %d", len(batch), resultsReceived)
}
}(i, read, contract)
}

result[contract] = batchResults
// wait for all the results to be processed then close the channel
go func() {
waitgroup.Wait()
close(resultChan)
}()

resultsReceived := 0
for res := range resultChan {
batchResults[res.index] = res.result
resultsReceived++
}

return result, nil
// check if all the results were received
if resultsReceived != len(batch) {
if err := ctx.Err(); err != nil {
return nil, err
}
return nil, fmt.Errorf("batch processing failed: expected %d results, received %d", len(batch), resultsReceived)
}

return batchResults, nil
}

func (s *suiChainReader) CreateContractType(readName string, forEncoding bool) (any, error) {
Expand Down
69 changes: 68 additions & 1 deletion relayer/client/ptb_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/block-vision/sui-go-sdk/transaction"
cache "github.com/patrickmn/go-cache"
"golang.org/x/sync/semaphore"
"golang.org/x/sync/singleflight"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/loop"
Expand All @@ -42,8 +43,20 @@ const (
DefaultCacheExpiration time.Duration = 120 * time.Minute
DefaultCacheCleanupInterval time.Duration = 240 * time.Minute
DefaultHTTPTimeout time.Duration = 30 * time.Second
// PackageIDCacheExpiration bounds how long a resolved (package,module)->package-IDs mapping
// is cached. Package IDs only change on a package upgrade, but the chain reader resolves the
// latest package ID on *every* read (loadModulePackageIdsInternal does ~4 RPCs each), which
// dominates the dest-config batch latency in CCIP commit observation. Caching it removes that
// fan-out and self-heals within the TTL after an upgrade; call InvalidatePackageIDCache for an
// immediate refresh.
PackageIDCacheExpiration time.Duration = 30 * time.Second
)

// packageIDCacheKey builds the cache key for a resolved (package,module)->package-IDs mapping.
func packageIDCacheKey(packageID, module string) string {
return fmt.Sprintf("modulePackageIds::%s::%s", packageID, module)
}

var RateLimitWeights = map[string]int64{
"MoveCall": 1,
"SendTransaction": 1,
Expand Down Expand Up @@ -128,6 +141,10 @@ type PTBClient struct {
normalizedModules map[string]map[string]models.GetNormalizedMoveModuleResponse

cache *cache.Cache // used for caching object IDs (e.g. offramp state object ID or state pointers)

// pkgIDGroup de-duplicates concurrent package-ID resolutions for the same (package,module)
// so a cold cache / TTL expiry doesn't let parallel chain-reader reads stampede the resolution.
pkgIDGroup singleflight.Group
}

var _ SuiPTBClient = (*PTBClient)(nil)
Expand Down Expand Up @@ -1073,8 +1090,58 @@ func (c *PTBClient) LoadModulePackageIds(ctx context.Context, packageId string,
return result, err
}

// loadModulePackageIdsInternal is the internal implementation without rate limiting
// loadModulePackageIdsInternal is the internal implementation without rate limiting. It caches the
// resolved package-ID chain per (packageId, module) for PackageIDCacheExpiration to avoid the
// ~4 RPCs that loadModulePackageIdsUncached performs on every chain-reader read. The cached slice
// is copied on the way in and out so callers can't mutate shared state.
func (c *PTBClient) loadModulePackageIdsInternal(ctx context.Context, packageId string, module string) ([]string, error) {
cacheKey := packageIDCacheKey(packageId, module)
if cached, found := c.cache.Get(cacheKey); found {
if ids, ok := cached.([]string); ok {
return append([]string(nil), ids...), nil
}
}

// Single-flight the resolution so a cold cache / TTL expiry doesn't let concurrent
// chain-reader reads each run the ~3-RPC package-ID resolution for the same module.
resolved, err, _ := c.pkgIDGroup.Do(cacheKey, func() (any, error) {
// Another flight may have populated the cache while we were queued.
if cached, found := c.cache.Get(cacheKey); found {
if ids, ok := cached.([]string); ok {
return ids, nil
}
}

ids, err := c.loadModulePackageIdsUncached(ctx, packageId, module)
if err != nil {
return nil, err
}

c.cache.Set(cacheKey, append([]string(nil), ids...), PackageIDCacheExpiration)

return ids, nil
})
if err != nil {
return nil, err
}

// Copy: the single-flight result is shared across all waiters.
ids, _ := resolved.([]string)

return append([]string(nil), ids...), nil
}

// InvalidatePackageIDCache evicts the cached package-ID chain for (packageId, module). Call this
// immediately after a package upgrade so the next read re-resolves the upgrade chain instead of
// waiting for PackageIDCacheExpiration to elapse.
func (c *PTBClient) InvalidatePackageIDCache(packageId string, module string) {
c.cache.Delete(packageIDCacheKey(packageId, module))
}

// loadModulePackageIdsUncached resolves the full package-ID upgrade chain for a module by reading
// on-chain state (normalized module + owned objects + pointer/state objects). It performs several
// RPCs and is therefore wrapped by loadModulePackageIdsInternal for caching.
func (c *PTBClient) loadModulePackageIdsUncached(ctx context.Context, packageId string, module string) ([]string, error) {
// Ensure that the module keeps track of its package IDs by checking that it has `add_package_id` function
normalizedModule, err := c.getNormalizedModuleInternal(ctx, packageId, module)
if err != nil {
Expand Down
63 changes: 63 additions & 0 deletions relayer/client/ptb_client_pkgid_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package client

import (
"context"
"testing"

cache "github.com/patrickmn/go-cache"
"github.com/stretchr/testify/require"
)

// newPkgIDCacheTestClient builds a minimal PTBClient with only the cache wired. The cache-hit and
// invalidation paths under test never touch the network or the logger, so the other fields can be
// left zero-valued.
func newPkgIDCacheTestClient() *PTBClient {
return &PTBClient{
cache: cache.New(DefaultCacheExpiration, DefaultCacheCleanupInterval),
}
}

func TestLoadModulePackageIdsInternal_CacheHitSkipsResolution(t *testing.T) {
c := newPkgIDCacheTestClient()
const pkg, module = "0xpkg", "offramp"
want := []string{"0xv1", "0xv2"}

// Pre-populate the cache; a hit must return without hitting loadModulePackageIdsUncached
// (which would dial the RPC via the nil client and panic/error).
c.cache.Set(packageIDCacheKey(pkg, module), append([]string(nil), want...), PackageIDCacheExpiration)

got, err := c.loadModulePackageIdsInternal(context.Background(), pkg, module)
require.NoError(t, err)
require.Equal(t, want, got)
}

func TestLoadModulePackageIdsInternal_ReturnsCopy(t *testing.T) {
c := newPkgIDCacheTestClient()
const pkg, module = "0xpkg", "offramp"
c.cache.Set(packageIDCacheKey(pkg, module), []string{"0xv1", "0xv2"}, PackageIDCacheExpiration)

first, err := c.loadModulePackageIdsInternal(context.Background(), pkg, module)
require.NoError(t, err)

// Mutating the returned slice must not corrupt the cached value.
first[0] = "0xMUTATED"

second, err := c.loadModulePackageIdsInternal(context.Background(), pkg, module)
require.NoError(t, err)
require.Equal(t, []string{"0xv1", "0xv2"}, second, "cached slice must be isolated from caller mutations")
}

func TestInvalidatePackageIDCache(t *testing.T) {
c := newPkgIDCacheTestClient()
const pkg, module = "0xpkg", "offramp"
key := packageIDCacheKey(pkg, module)
c.cache.Set(key, []string{"0xv1"}, PackageIDCacheExpiration)

_, found := c.cache.Get(key)
require.True(t, found)

c.InvalidatePackageIDCache(pkg, module)

_, found = c.cache.Get(key)
require.False(t, found, "InvalidatePackageIDCache must evict the cached entry")
}
Loading