Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/executor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func (f *Factory) Start(ctx context.Context, spec bootstrap.JobSpec, deps bootst
executorConfig.MaxRetryDuration,
timeProvider,
executorConfig.WorkerCount,
executorConfig.DataNotReadyRetryInterval,
)
if err != nil {
return fmt.Errorf("failed to create execution coordinator: %w", err)
Expand Down
1 change: 1 addition & 0 deletions cmd/executor/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func TestFactory_Stop_WithCoordinator(t *testing.T) {
8*time.Hour,
mocks.NewMockTimeProvider(t),
1,
time.Second,
)
require.NoError(t, err)

Expand Down
26 changes: 17 additions & 9 deletions executor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ import (
)

const (
backoffDurationDefault = 15 * time.Second
lookbackWindowDefault = 1 * time.Hour
readerCacheExpiryDefault = 5 * time.Minute
maxRetryDurationDefault = 8 * time.Hour
executionIntervalDefault = 1 * time.Minute
ntpServerDefault = "time.google.com"
workerCountDefault = 100
IndexerQueryLimitDefault = 100
IndexerQueryLimitMax = 10000
backoffDurationDefault = 15 * time.Second
lookbackWindowDefault = 1 * time.Hour
readerCacheExpiryDefault = 5 * time.Minute
maxRetryDurationDefault = 8 * time.Hour
executionIntervalDefault = 1 * time.Minute
dataNotReadyRetryIntervalDefault = 1 * time.Second
ntpServerDefault = "time.google.com"
workerCountDefault = 100
IndexerQueryLimitDefault = 100
IndexerQueryLimitMax = 10000
)

type ConfigWithBlockchainInfo[T any] struct {
Expand Down Expand Up @@ -58,6 +59,9 @@ type Configuration struct {
// MaxRetryDuration is the maximum duration the executor cluster will retry a message before giving up.
// Defaults to 8 hours.
MaxRetryDuration time.Duration `toml:"max_retry_duration"`
// DataNotReadyRetryInterval is the base delay between retries when verifier data is not yet available.
// Defaults to 1 second. Exponential backoff applies up to the executor stagger interval.
DataNotReadyRetryInterval time.Duration `toml:"data_not_ready_retry_interval"`
// NtpServer is the NTP server to use for time synchronization.
// Defaults to time.google.com
NtpServer string `toml:"ntp_server"`
Expand Down Expand Up @@ -124,6 +128,9 @@ func (c *Configuration) Validate() error {
if c.MaxRetryDuration < 0 {
return fmt.Errorf("max_retry_duration must not be negative")
}
if c.DataNotReadyRetryInterval < 0 {
return fmt.Errorf("data_not_ready_retry_interval must not be negative")
}
if c.IndexerQueryLimit > IndexerQueryLimitMax {
return fmt.Errorf("indexer_query_limit must not exceed %d, got %d", IndexerQueryLimitMax, c.IndexerQueryLimit)
}
Expand Down Expand Up @@ -180,6 +187,7 @@ func (c *Configuration) GetNormalizedConfig() (*Configuration, error) {
normalized.LookbackWindow = parseOrDefault(c.LookbackWindow, lookbackWindowDefault)
normalized.ReaderCacheExpiry = parseOrDefault(c.ReaderCacheExpiry, readerCacheExpiryDefault)
normalized.MaxRetryDuration = parseOrDefault(c.MaxRetryDuration, maxRetryDurationDefault)
normalized.DataNotReadyRetryInterval = parseOrDefault(c.DataNotReadyRetryInterval, dataNotReadyRetryIntervalDefault)
if c.IndexerQueryLimit == 0 {
normalized.IndexerQueryLimit = IndexerQueryLimitDefault
}
Expand Down
117 changes: 62 additions & 55 deletions executor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,33 +255,35 @@ func TestConfiguration_Validate(t *testing.T) {

func TestConfiguration_GetNormalizedConfig(t *testing.T) {
cases := []struct {
name string
config Configuration
wantErr bool
wantErrContains string
wantIndexerAddressCount int
wantBackoffDuration time.Duration
wantLookbackWindow time.Duration
wantReaderCacheExpiry time.Duration
wantMaxRetryDuration time.Duration
wantExecutionInterval time.Duration
wantNtpServer string
wantWorkerCount int
wantIndexerQueryLimit uint64
name string
config Configuration
wantErr bool
wantErrContains string
wantIndexerAddressCount int
wantBackoffDuration time.Duration
wantLookbackWindow time.Duration
wantReaderCacheExpiry time.Duration
wantMaxRetryDuration time.Duration
wantDataNotReadyRetryInterval time.Duration
wantExecutionInterval time.Duration
wantNtpServer string
wantWorkerCount int
wantIndexerQueryLimit uint64
}{
{
name: "single_indexer_address_with_defaults",
config: validConfig(),
wantErr: false,
wantIndexerAddressCount: 1,
wantBackoffDuration: backoffDurationDefault,
wantLookbackWindow: lookbackWindowDefault,
wantReaderCacheExpiry: readerCacheExpiryDefault,
wantMaxRetryDuration: maxRetryDurationDefault,
wantExecutionInterval: executionIntervalDefault,
wantNtpServer: ntpServerDefault,
wantWorkerCount: workerCountDefault,
wantIndexerQueryLimit: IndexerQueryLimitDefault,
name: "single_indexer_address_with_defaults",
config: validConfig(),
wantErr: false,
wantIndexerAddressCount: 1,
wantBackoffDuration: backoffDurationDefault,
wantLookbackWindow: lookbackWindowDefault,
wantReaderCacheExpiry: readerCacheExpiryDefault,
wantMaxRetryDuration: maxRetryDurationDefault,
wantDataNotReadyRetryInterval: dataNotReadyRetryIntervalDefault,
wantExecutionInterval: executionIntervalDefault,
wantNtpServer: ntpServerDefault,
wantWorkerCount: workerCountDefault,
wantIndexerQueryLimit: IndexerQueryLimitDefault,
},
{
name: "multiple_indexer_addresses",
Expand All @@ -290,16 +292,17 @@ func TestConfiguration_GetNormalizedConfig(t *testing.T) {
c.IndexerAddress = []string{"http://indexer1:8100", "http://indexer2:8100"}
return c
}(),
wantErr: false,
wantIndexerAddressCount: 2,
wantBackoffDuration: backoffDurationDefault,
wantLookbackWindow: lookbackWindowDefault,
wantReaderCacheExpiry: readerCacheExpiryDefault,
wantMaxRetryDuration: maxRetryDurationDefault,
wantExecutionInterval: executionIntervalDefault,
wantNtpServer: ntpServerDefault,
wantWorkerCount: workerCountDefault,
wantIndexerQueryLimit: IndexerQueryLimitDefault,
wantErr: false,
wantIndexerAddressCount: 2,
wantBackoffDuration: backoffDurationDefault,
wantLookbackWindow: lookbackWindowDefault,
wantReaderCacheExpiry: readerCacheExpiryDefault,
wantMaxRetryDuration: maxRetryDurationDefault,
wantDataNotReadyRetryInterval: dataNotReadyRetryIntervalDefault,
wantExecutionInterval: executionIntervalDefault,
wantNtpServer: ntpServerDefault,
wantWorkerCount: workerCountDefault,
wantIndexerQueryLimit: IndexerQueryLimitDefault,
},
{
name: "custom_values_preserved",
Expand All @@ -310,6 +313,7 @@ func TestConfiguration_GetNormalizedConfig(t *testing.T) {
c.LookbackWindow = 2 * time.Hour
c.ReaderCacheExpiry = 10 * time.Minute
c.MaxRetryDuration = 12 * time.Hour
c.DataNotReadyRetryInterval = 2 * time.Second
c.NtpServer = "custom.ntp.com"
c.WorkerCount = 200
c.IndexerQueryLimit = 500
Expand All @@ -319,16 +323,17 @@ func TestConfiguration_GetNormalizedConfig(t *testing.T) {
c.ChainConfiguration = map[string]ChainConfiguration{"1": cc}
return c
}(),
wantErr: false,
wantIndexerAddressCount: 1,
wantBackoffDuration: 30 * time.Second,
wantLookbackWindow: 2 * time.Hour,
wantReaderCacheExpiry: 10 * time.Minute,
wantMaxRetryDuration: 12 * time.Hour,
wantExecutionInterval: 2 * time.Minute,
wantNtpServer: "custom.ntp.com",
wantWorkerCount: 200,
wantIndexerQueryLimit: 500,
wantErr: false,
wantIndexerAddressCount: 1,
wantBackoffDuration: 30 * time.Second,
wantLookbackWindow: 2 * time.Hour,
wantReaderCacheExpiry: 10 * time.Minute,
wantMaxRetryDuration: 12 * time.Hour,
wantDataNotReadyRetryInterval: 2 * time.Second,
wantExecutionInterval: 2 * time.Minute,
wantNtpServer: "custom.ntp.com",
wantWorkerCount: 200,
wantIndexerQueryLimit: 500,
},
{
name: "defaults_applied_when_zero",
Expand All @@ -339,16 +344,17 @@ func TestConfiguration_GetNormalizedConfig(t *testing.T) {
c.ChainConfiguration = map[string]ChainConfiguration{"1": cc}
return c
}(),
wantErr: false,
wantIndexerAddressCount: 1,
wantBackoffDuration: backoffDurationDefault,
wantLookbackWindow: lookbackWindowDefault,
wantReaderCacheExpiry: readerCacheExpiryDefault,
wantMaxRetryDuration: maxRetryDurationDefault,
wantExecutionInterval: executionIntervalDefault,
wantNtpServer: ntpServerDefault,
wantWorkerCount: workerCountDefault,
wantIndexerQueryLimit: IndexerQueryLimitDefault,
wantErr: false,
wantIndexerAddressCount: 1,
wantBackoffDuration: backoffDurationDefault,
wantLookbackWindow: lookbackWindowDefault,
wantReaderCacheExpiry: readerCacheExpiryDefault,
wantMaxRetryDuration: maxRetryDurationDefault,
wantDataNotReadyRetryInterval: dataNotReadyRetryIntervalDefault,
wantExecutionInterval: executionIntervalDefault,
wantNtpServer: ntpServerDefault,
wantWorkerCount: workerCountDefault,
wantIndexerQueryLimit: IndexerQueryLimitDefault,
},
{
name: "validation_errors_propagated",
Expand Down Expand Up @@ -378,6 +384,7 @@ func TestConfiguration_GetNormalizedConfig(t *testing.T) {
require.Equal(t, tc.wantLookbackWindow, normalized.LookbackWindow)
require.Equal(t, tc.wantReaderCacheExpiry, normalized.ReaderCacheExpiry)
require.Equal(t, tc.wantMaxRetryDuration, normalized.MaxRetryDuration)
require.Equal(t, tc.wantDataNotReadyRetryInterval, normalized.DataNotReadyRetryInterval)
require.Equal(t, tc.wantNtpServer, normalized.NtpServer)
require.Equal(t, tc.wantWorkerCount, normalized.WorkerCount)
require.Equal(t, tc.wantIndexerQueryLimit, normalized.IndexerQueryLimit)
Expand Down
35 changes: 35 additions & 0 deletions executor/coordinator_retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package executor

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestCoordinator_dataNotReadyBackoff(t *testing.T) {
t.Parallel()

ec := &Coordinator{dataNotReadyRetryInterval: time.Second}
stagger := 30 * time.Second

tests := []struct {
name string
attempt int
want time.Duration
}{
{name: "first attempt", attempt: 1, want: time.Second},
{name: "second attempt", attempt: 2, want: 2 * time.Second},
{name: "third attempt", attempt: 3, want: 4 * time.Second},
{name: "capped at stagger", attempt: 10, want: stagger},
{name: "zero attempt normalized", attempt: 0, want: time.Second},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
got := ec.dataNotReadyBackoff(tc.attempt, stagger)
require.Equal(t, tc.want, got)
})
}
}
71 changes: 52 additions & 19 deletions executor/executor_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,22 @@ import (
// dispatching them to workers for execution, and retrying if necessary.
type Coordinator struct {
services.StateMachine
wg sync.WaitGroup
executor Executor
messageSubscriber MessageSubscriber
leaderElector LeaderElector
lggr logger.Logger
monitoring Monitoring
workerPoolTasks chan message_heap.MessageWithTimestamps
cancel context.CancelFunc
delayedMessageHeap message_heap.MessageHeap
inFlight map[protocol.Bytes32]struct{}
inFlightMu sync.RWMutex
running atomic.Bool
expiryDuration time.Duration
timeProvider common.TimeProvider
workerCount int
wg sync.WaitGroup
executor Executor
messageSubscriber MessageSubscriber
leaderElector LeaderElector
lggr logger.Logger
monitoring Monitoring
workerPoolTasks chan message_heap.MessageWithTimestamps
cancel context.CancelFunc
delayedMessageHeap message_heap.MessageHeap
inFlight map[protocol.Bytes32]struct{}
inFlightMu sync.RWMutex
running atomic.Bool
expiryDuration time.Duration
timeProvider common.TimeProvider
workerCount int
dataNotReadyRetryInterval time.Duration
}

// NewCoordinator creates a new executor coordinator.
Expand All @@ -47,7 +48,11 @@ func NewCoordinator(
expiryDuration time.Duration,
timeProvider common.TimeProvider,
workerCount int,
dataNotReadyRetryInterval time.Duration,
) (*Coordinator, error) {
if dataNotReadyRetryInterval <= 0 {
dataNotReadyRetryInterval = DefaultDataNotReadyRetryInterval
}
ec := &Coordinator{
lggr: lggr,
executor: executor,
Expand All @@ -57,9 +62,10 @@ func NewCoordinator(
workerPoolTasks: make(chan message_heap.MessageWithTimestamps),
// cancel and delayedMessageHeap are initialized in Start()
// running, wg, and services.StateMachine default initialization is fine.
expiryDuration: expiryDuration,
timeProvider: timeProvider,
workerCount: workerCount,
expiryDuration: expiryDuration,
timeProvider: timeProvider,
workerCount: workerCount,
dataNotReadyRetryInterval: dataNotReadyRetryInterval,
}

if err := ec.validate(); err != nil {
Expand Down Expand Up @@ -212,6 +218,7 @@ func (ec *Coordinator) runStorageStream(ctx context.Context) {
ExpiryTime: readyTimestamp.Add(ec.expiryDuration),
RetryInterval: retryDelay,
MessageID: id,
Attempt: 0,
}) {
ec.lggr.Debugw("duplicate message rejected by heap", protocol.LogKeyMessageID, id)
}
Expand Down Expand Up @@ -287,12 +294,24 @@ func (ec *Coordinator) processPayload(ctx context.Context, payload message_heap.
shouldRetry, err := ec.executor.HandleMessage(ctx, message)
if shouldRetry {
ec.lggr.Debugw("message should be retried, putting back in heap", protocol.LogKeyMessageID, id)
attempt := payload.Attempt
var delay time.Duration
if errors.Is(err, ErrExecutionContended) {
// Post-transmit: preserve anti-duplication stagger.
delay = payload.RetryInterval
attempt = 0
} else {
// Pre-transmit (data/state not ready): fast exponential backoff capped at the stagger.
attempt++
delay = ec.dataNotReadyBackoff(attempt, payload.RetryInterval)
}
if !ec.delayedMessageHeap.Push(message_heap.MessageWithTimestamps{
Message: &message,
ReadyTime: payload.ReadyTime.Add(payload.RetryInterval),
ReadyTime: currentTime.Add(delay),
ExpiryTime: payload.ExpiryTime,
RetryInterval: payload.RetryInterval,
MessageID: id,
Attempt: attempt,
}) {
ec.lggr.Warnw("retry push rejected, message already in heap", protocol.LogKeyMessageID, id)
}
Expand Down Expand Up @@ -323,6 +342,20 @@ func (ec *Coordinator) inFlightHas(id protocol.Bytes32) bool {
return ok
}

func (ec *Coordinator) dataNotReadyBackoff(attempt int, capDuration time.Duration) time.Duration {
if attempt < 1 {
attempt = 1
}
if attempt > 30 {
attempt = 30
}
d := ec.dataNotReadyRetryInterval << (attempt - 1)
if d <= 0 || (capDuration > 0 && d > capDuration) {
return capDuration
}
return d
}

// validate checks that all required components are configured.
func (ec *Coordinator) validate() error {
var errs []error
Expand Down
Loading
Loading