Skip to content

source/dlq

import "github.com/stablekernel/crucible/source/dlq"

Package dlq provides dead-letter routing for the crucible source seam: a [source.Middleware] that captures a terminal failure (a [source.ActionTerm]) and publishes it, with typed reason metadata, to a caller-supplied DeadLetter destination. The core stays backend-neutral — the package never names Kafka, a DLQ topic, or a database. The caller injects whatever publishes the parked message; this package only decides what to park and stamps a typed DeadLetterRecord onto it.

A parking store is itself a replayable [source.Inlet]: park a message here, and later drain the parking destination back through the same [source.Handler] with its attempt count reset. MemDeadLetter is an in-memory implementation that is both a DeadLetter sink and a [source.Inlet], so the round-trip is unit-testable with no infrastructure.

func Middleware(dl DeadLetter, opts ...Option) source.Middleware

Middleware returns a [source.Middleware] that routes terminal failures to dl. When the wrapped handler returns a [source.ActionTerm], the middleware builds a DeadLetterRecord — stamping the classification, reason, attempt count (read from the retry context counter), original headers, last error, and original subject — and calls DeadLetter.Park. Non-terminal results (ack, nak, drop, in-progress, manual) pass through untouched.

If Park fails, the middleware returns a [source.ActionNak] so the engine can re-attempt settlement rather than losing the message; the original error is joined with the park error. A nil dl makes the middleware a pass-through.

Example

store := dlq.NewMemDeadLetter()
// A handler that rejects a malformed message as poison.
base := func(_ context.Context, _ source.Message) source.Result {
return source.Term(errors.New("malformed payload"))
}
h := dlq.Middleware(store)(base)
h(context.Background(), stubMsg{value: []byte("bad"), subject: "orders"})
rec := store.Records()[0]
fmt.Printf("parked subject=%s reason=%s last=%q\n", rec.Subject, rec.Reason, rec.LastError)
// Output: parked subject=orders reason=poison last="malformed payload"
parked subject=orders reason=poison last="malformed payload"

DeadLetter is the caller-provided destination a terminal message is parked to. It is the single dependency the middleware reaches through, so the core stays neutral: a Kafka producer to a DLQ topic, a database table writer, or the in-memory MemDeadLetter all satisfy it. Implementations must be safe for concurrent use; the Hopper may publish from several lanes at once.

type DeadLetter interface {
// Park persists one dead-lettered record. Returning an error tells the
// middleware the park failed, which it surfaces so the engine can decide
// whether to retry settlement rather than silently dropping the message.
Park(ctx context.Context, rec DeadLetterRecord) error
}

DeadLetterRecord is the typed envelope stamped onto a parked message. It captures everything needed to triage the failure and to replay the message later: the original payload and routing, the failure [source.Classification] and reason, the attempt count reached, and the last error’s text. It is a value type with named fields — never a magic-string metadata map.

type DeadLetterRecord struct {
// Key is the original message key (partition/routing key), copied so the
// record does not alias inlet state.
Key []byte
// Value is the original raw payload bytes.
Value []byte
// Headers are the original message headers.
Headers source.Headers
// Subject is the topic or subject the message originally arrived on.
Subject string
// PartitionKey is the original ordering domain, preserved so a replay can
// re-shard onto the same lane.
PartitionKey string
// Class is the failure classification that sent the message to dead-letter
// ([source.Poison], [source.InvalidForState], or [source.Retryable] when retries
// were exhausted).
Class source.Classification
// Reason is a stable, human-readable summary of why the message was parked
// (the classification name), for dashboards and triage without parsing Err.
Reason string
// Attempts is the number of delivery attempts the message reached before it
// was parked, read from the retry attempt counter.
Attempts int
// LastError is the text of the final error, captured for triage. The package
// stores the rendered string rather than the error value so a record can be
// serialized to any backend without a custom codec.
LastError string
}

MemDeadLetter is an in-memory dead-letter store that is both a DeadLetter sink and a replayable [source.Inlet]: the concrete realization of “the DLQ is itself an Inlet”. Park terminal failures into it through the dead-letter Middleware, then drain the parked records back through the same [source.Handler] — with each replayed message starting at attempt 1 again — by subscribing to it like any other inlet. It exists for tests and for small single-process replay; production backends supply their own topic/table-backed DeadLetter and a matching replay [source.Inlet].

The zero value is not usable; construct one with NewMemDeadLetter. It is safe for concurrent Park and a single draining [source.Subscription].

type MemDeadLetter struct {
// contains filtered or unexported fields
}
Example (Replay)

store := dlq.NewMemDeadLetter()
// Park a failure.
park := dlq.Middleware(store)(func(_ context.Context, _ source.Message) source.Result {
return source.Term(errors.New("downstream down"))
})
park(context.Background(), stubMsg{value: []byte("order-1"), subject: "orders"})
// Later, drain the parking store back through a now-healthy handler. The DLQ
// is itself an Inlet, so replay uses the same consume-and-settle loop.
sub, _ := store.Subscribe(context.Background(), source.SubscribeConfig{})
for {
m, err := sub.Next(context.Background())
if errors.Is(err, source.ErrDrained) {
break
}
fmt.Printf("replaying %s\n", m.Value())
_ = sub.Settle(context.Background(), m, source.Ack())
}
fmt.Printf("remaining parked: %d\n", store.Len())
// Output:
// replaying order-1
// remaining parked: 0
replaying order-1
remaining parked: 0

func NewMemDeadLetter() *MemDeadLetter

NewMemDeadLetter returns an empty in-memory dead-letter store.

func (d *MemDeadLetter) Close() error

Close releases the inlet. The in-memory store holds nothing to release.

func (d *MemDeadLetter) Len() int

Len reports how many records are currently parked.

func (d *MemDeadLetter) Park(_ context.Context, rec DeadLetterRecord) error

Park appends rec to the store. It implements DeadLetter and never fails.

func (d *MemDeadLetter) Records() []DeadLetterRecord

Records returns a snapshot copy of the parked records, for assertions. The returned slice is independent of the store.

func (d *MemDeadLetter) Subscribe(_ context.Context, _ source.SubscribeConfig) (source.Subscription, error)

Subscribe drains the parked records as a [source.Subscription]: each parked record is reconstituted into a [source.Message] and yielded once, in park order, after which Next reports [source.ErrDrained]. The cfg is accepted for interface symmetry but ignored — a parking store has no topics or groups to honor. Draining is destructive: a record settled with [source.ActionAck] (a successful replay) is consumed; a record settled with [source.ActionNak] or [source.ActionTerm] is re-parked so a still-failing replay is not lost.

This is the replay path: wire the returned subscription to a Hopper running the same handler, and the parking topic re-flows through the pipeline with attempt counts reset (the replayed message carries no retry attempt, so it is attempt 1 again).

Option configures the dead-letter Middleware. Options are additive with sensible defaults.

type Option func(*config)

func WithParkRetryable(park bool) Option

WithParkRetryable controls whether a terminal failure still classified [source.Retryable] (a retry-exhausted message escalated to Term) is parked. The default is true. Set false to park only genuinely permanent failures ([source.Poison]/[source.InvalidForState]) and let exhausted retries terminate without a parked record.

Generated by gomarkdoc