Skip to content

source/retry

import "github.com/stablekernel/crucible/source/retry"

Package retry provides a classification-aware retry [source.Middleware] for the crucible source seam. It inspects the [source.Result] a handler returns and, for a transient ([source.Retryable]) failure, converts it into a delayed redelivery whose backoff grows with the attempt count — while a permanent failure ([source.Poison]/[source.InvalidForState]) passes straight through to terminate without wasting redeliveries, and a success or drop is left untouched.

The attempt count is carried on a typed context value (WithAttempt/Attempt) rather than a magic-string header, and the next attempt is propagated to inner middleware (notably source/dlq) by the same channel. The backoff schedule is fully config-driven through functional options; nothing is hardcoded. A clock is injected for deterministic tests.

AttemptHeader is the typed header key under which the retry middleware stamps the current attempt number when it produces a redelivery [source.Result] whose backoff a downstream backend cannot itself thread back through context (the attempt is also carried on the context via WithAttempt). It is a named constant, not an inline magic string, so dead-letter middleware reads the same key the retry middleware writes.

const AttemptHeader = "crucible-retry-attempt"

func Attempt(ctx context.Context) (int, bool)

Attempt returns the attempt number carried on ctx and whether one was present. The first delivery is attempt 1; an absent value reports (1, false) so a caller can treat a fresh message as its first attempt without special-casing.

func Middleware(opts ...Option) source.Middleware

Middleware returns a [source.Middleware] that applies classification-aware retry to the handler it wraps:

  • [source.Retryable] (an [source.ActionNak]) below the attempt cap becomes a delayed redelivery ([source.NakAfter]) using the configured backoff, and the incremented attempt is threaded onto the context for the next delivery.
  • [source.Retryable] at the attempt cap is escalated to [source.ActionTerm] (classification preserved as Retryable) so it falls through to dead-letter.
  • [source.Poison] and [source.InvalidForState] ([source.ActionTerm]) pass through unchanged: a permanently-bad or wrong-state message is never retried.
  • A success ([source.ActionAck]) or [source.Drop] is returned untouched.

The attempt count is read from the context (Attempt); a fresh message is attempt 1. The middleware never sleeps — it returns a [source.Result] the engine acts on — so it adds no latency of its own and is safe under the Hopper’s per-lane concurrency.

Example

// A handler that fails transiently.
base := func(_ context.Context, _ source.Message) source.Result {
return source.Nak(errors.New("upstream timeout"))
}
mw := retry.Middleware(
retry.WithMaxAttempts(3),
retry.WithBackoff(100*time.Millisecond, time.Second, 2.0, false),
)
h := mw(base)
// First and second attempts back off; the third exhausts and terminates.
for attempt := 1; attempt <= 3; attempt++ {
ctx := retry.WithAttempt(context.Background(), attempt)
r := h(ctx, stubMsg{})
fmt.Printf("attempt %d: %s class=%s delay=%v\n", attempt, r.Action, r.Class, r.Requeue)
}
// Output:
// attempt 1: nak class=retryable delay=100ms
// attempt 2: nak class=retryable delay=200ms
// attempt 3: term class=retryable delay=0s
attempt 1: nak class=retryable delay=100ms
attempt 2: nak class=retryable delay=200ms
attempt 3: term class=retryable delay=0s

Example (Poison Passes Through)

base := func(_ context.Context, _ source.Message) source.Result {
return source.Term(errors.New("malformed"))
}
h := retry.Middleware()(base)
r := h(context.Background(), stubMsg{})
fmt.Printf("%s class=%s\n", r.Action, r.Class)
// Output: term class=poison
term class=poison

func WithAttempt(ctx context.Context, n int) context.Context

WithAttempt returns a child context carrying attempt n. The Middleware reads this on entry to decide the backoff and writes the incremented value before redelivery; tests and the dead-letter middleware read it via Attempt.

Backoff computes the redelivery delay for a given attempt. It is the seam a caller overrides through WithBackoff (or WithBackoffFunc) to shape the schedule; the default is exponential with full jitter.

type Backoff interface {
// Delay returns the wait before attempt n+1 given that attempt n just failed.
// attempt is 1-based; the returned delay is non-negative.
Delay(attempt int) time.Duration
}

BackoffFunc adapts a function to the Backoff interface.

type BackoffFunc func(attempt int) time.Duration

func (f BackoffFunc) Delay(attempt int) time.Duration

Delay implements Backoff.

Option configures the retry Middleware. Options are additive with sensible defaults; a nil or out-of-range value is ignored, leaving the default in place.

type Option func(*config)

func WithBackoff(base, maxDelay time.Duration, factor float64, jitter bool) Option

WithBackoff sets an exponential-with-optional-jitter schedule: the delay before attempt n+1 is base*factor^(n-1), capped at maxDelay. With jitter true the delay is scaled by a uniform random factor in [0,1) (full jitter) to de-correlate retries across consumers. A base <= 0 or factor < 1 is ignored.

func WithBackoffFunc(b Backoff) Option

WithBackoffFunc installs a custom Backoff, overriding the exponential default for callers who need a bespoke schedule (a fixed delay, a Fibonacci curve, a table). A nil backoff is ignored.

func WithClock(now func() time.Time) Option

WithClock injects the clock the middleware reads, for deterministic tests. It is currently used only where a future-facing schedule needs the current time; the backoff itself is relative. The default is time.Now. A nil clock is ignored.

func WithJitterSource(randF func() float64) Option

WithJitterSource injects the [0,1) random source used for full jitter, so a test can make a jittered schedule deterministic. It applies to the default and WithBackoff schedules. A nil source is ignored.

func WithMaxAttempts(n int) Option

WithMaxAttempts caps the number of delivery attempts. When a Retryable failure occurs on the final attempt, the message is terminated ([source.Term]) instead of redelivered, so it can fall through to dead-letter middleware. The default is 5. A value < 1 is ignored.

Generated by gomarkdoc