source/retry
import "github.com/stablekernel/crucible/source/retry"Package retry provides a classification-aware retry [source.Middleware] for the crucible source seam. It inspects the [source.Result] a handler returns and, for a transient ([source.Retryable]) failure, converts it into a delayed redelivery whose backoff grows with the attempt count — while a permanent failure ([source.Poison]/[source.InvalidForState]) passes straight through to terminate without wasting redeliveries, and a success or drop is left untouched.
The attempt count is carried on a typed context value (WithAttempt/Attempt) rather than a magic-string header, and the next attempt is propagated to inner middleware (notably source/dlq) by the same channel. The backoff schedule is fully config-driven through functional options; nothing is hardcoded. A clock is injected for deterministic tests.
- Constants
- func Attempt(ctx context.Context) (int, bool)
- func Middleware(opts …Option) source.Middleware
- func WithAttempt(ctx context.Context, n int) context.Context
- type Backoff
- type BackoffFunc
- type Option
Constants
Section titled “Constants”AttemptHeader is the typed header key under which the retry middleware stamps the current attempt number when it produces a redelivery [source.Result] whose backoff a downstream backend cannot itself thread back through context (the attempt is also carried on the context via WithAttempt). It is a named constant, not an inline magic string, so dead-letter middleware reads the same key the retry middleware writes.
const AttemptHeader = "crucible-retry-attempt"func Attempt
Section titled “func Attempt”func Attempt(ctx context.Context) (int, bool)Attempt returns the attempt number carried on ctx and whether one was present. The first delivery is attempt 1; an absent value reports (1, false) so a caller can treat a fresh message as its first attempt without special-casing.
func Middleware
Section titled “func Middleware”func Middleware(opts ...Option) source.MiddlewareMiddleware returns a [source.Middleware] that applies classification-aware retry to the handler it wraps:
- [source.Retryable] (an [source.ActionNak]) below the attempt cap becomes a delayed redelivery ([source.NakAfter]) using the configured backoff, and the incremented attempt is threaded onto the context for the next delivery.
- [source.Retryable] at the attempt cap is escalated to [source.ActionTerm] (classification preserved as Retryable) so it falls through to dead-letter.
- [source.Poison] and [source.InvalidForState] ([source.ActionTerm]) pass through unchanged: a permanently-bad or wrong-state message is never retried.
- A success ([source.ActionAck]) or [source.Drop] is returned untouched.
The attempt count is read from the context (Attempt); a fresh message is attempt 1. The middleware never sleeps — it returns a [source.Result] the engine acts on — so it adds no latency of its own and is safe under the Hopper’s per-lane concurrency.
Example
// A handler that fails transiently.base := func(_ context.Context, _ source.Message) source.Result { return source.Nak(errors.New("upstream timeout"))}
mw := retry.Middleware( retry.WithMaxAttempts(3), retry.WithBackoff(100*time.Millisecond, time.Second, 2.0, false),)h := mw(base)
// First and second attempts back off; the third exhausts and terminates.for attempt := 1; attempt <= 3; attempt++ { ctx := retry.WithAttempt(context.Background(), attempt) r := h(ctx, stubMsg{}) fmt.Printf("attempt %d: %s class=%s delay=%v\n", attempt, r.Action, r.Class, r.Requeue)}// Output:// attempt 1: nak class=retryable delay=100ms// attempt 2: nak class=retryable delay=200ms// attempt 3: term class=retryable delay=0sOutput
Section titled “Output”attempt 1: nak class=retryable delay=100msattempt 2: nak class=retryable delay=200msattempt 3: term class=retryable delay=0sExample (Poison Passes Through)
base := func(_ context.Context, _ source.Message) source.Result { return source.Term(errors.New("malformed"))}h := retry.Middleware()(base)r := h(context.Background(), stubMsg{})fmt.Printf("%s class=%s\n", r.Action, r.Class)// Output: term class=poisonOutput
Section titled “Output”term class=poisonfunc WithAttempt
Section titled “func WithAttempt”func WithAttempt(ctx context.Context, n int) context.ContextWithAttempt returns a child context carrying attempt n. The Middleware reads this on entry to decide the backoff and writes the incremented value before redelivery; tests and the dead-letter middleware read it via Attempt.
type Backoff
Section titled “type Backoff”Backoff computes the redelivery delay for a given attempt. It is the seam a caller overrides through WithBackoff (or WithBackoffFunc) to shape the schedule; the default is exponential with full jitter.
type Backoff interface { // Delay returns the wait before attempt n+1 given that attempt n just failed. // attempt is 1-based; the returned delay is non-negative. Delay(attempt int) time.Duration}type BackoffFunc
Section titled “type BackoffFunc”BackoffFunc adapts a function to the Backoff interface.
type BackoffFunc func(attempt int) time.Durationfunc (BackoffFunc) Delay
Section titled “func (BackoffFunc) Delay”func (f BackoffFunc) Delay(attempt int) time.DurationDelay implements Backoff.
type Option
Section titled “type Option”Option configures the retry Middleware. Options are additive with sensible defaults; a nil or out-of-range value is ignored, leaving the default in place.
type Option func(*config)func WithBackoff
Section titled “func WithBackoff”func WithBackoff(base, maxDelay time.Duration, factor float64, jitter bool) OptionWithBackoff sets an exponential-with-optional-jitter schedule: the delay before attempt n+1 is base*factor^(n-1), capped at maxDelay. With jitter true the delay is scaled by a uniform random factor in [0,1) (full jitter) to de-correlate retries across consumers. A base <= 0 or factor < 1 is ignored.
func WithBackoffFunc
Section titled “func WithBackoffFunc”func WithBackoffFunc(b Backoff) OptionWithBackoffFunc installs a custom Backoff, overriding the exponential default for callers who need a bespoke schedule (a fixed delay, a Fibonacci curve, a table). A nil backoff is ignored.
func WithClock
Section titled “func WithClock”func WithClock(now func() time.Time) OptionWithClock injects the clock the middleware reads, for deterministic tests. It is currently used only where a future-facing schedule needs the current time; the backoff itself is relative. The default is time.Now. A nil clock is ignored.
func WithJitterSource
Section titled “func WithJitterSource”func WithJitterSource(randF func() float64) OptionWithJitterSource injects the [0,1) random source used for full jitter, so a test can make a jittered schedule deterministic. It applies to the default and WithBackoff schedules. A nil source is ignored.
func WithMaxAttempts
Section titled “func WithMaxAttempts”func WithMaxAttempts(n int) OptionWithMaxAttempts caps the number of delivery attempts. When a Retryable failure occurs on the final attempt, the message is terminated ([source.Term]) instead of redelivered, so it can fall through to dead-letter middleware. The default is 5. A value < 1 is ignored.
Generated by gomarkdoc