source/dlq
import "github.com/stablekernel/crucible/source/dlq"Package dlq provides dead-letter routing for the crucible source seam: a [source.Middleware] that captures a terminal failure (a [source.ActionTerm]) and publishes it, with typed reason metadata, to a caller-supplied DeadLetter destination. The core stays backend-neutral — the package never names Kafka, a DLQ topic, or a database. The caller injects whatever publishes the parked message; this package only decides what to park and stamps a typed DeadLetterRecord onto it.
A parking store is itself a replayable [source.Inlet]: park a message here, and later drain the parking destination back through the same [source.Handler] with its attempt count reset. MemDeadLetter is an in-memory implementation that is both a DeadLetter sink and a [source.Inlet], so the round-trip is unit-testable with no infrastructure.
- func Middleware(dl DeadLetter, opts …Option) source.Middleware
- type DeadLetter
- type DeadLetterRecord
- type MemDeadLetter
- func NewMemDeadLetter() *MemDeadLetter
- func (d *MemDeadLetter) Close() error
- func (d *MemDeadLetter) Len() int
- func (d *MemDeadLetter) Park(_ context.Context, rec DeadLetterRecord) error
- func (d *MemDeadLetter) Records() []DeadLetterRecord
- func (d *MemDeadLetter) Subscribe(_ context.Context, _ source.SubscribeConfig) (source.Subscription, error)
- type Option
func Middleware
Section titled “func Middleware”func Middleware(dl DeadLetter, opts ...Option) source.MiddlewareMiddleware returns a [source.Middleware] that routes terminal failures to dl. When the wrapped handler returns a [source.ActionTerm], the middleware builds a DeadLetterRecord — stamping the classification, reason, attempt count (read from the retry context counter), original headers, last error, and original subject — and calls DeadLetter.Park. Non-terminal results (ack, nak, drop, in-progress, manual) pass through untouched.
If Park fails, the middleware returns a [source.ActionNak] so the engine can re-attempt settlement rather than losing the message; the original error is joined with the park error. A nil dl makes the middleware a pass-through.
Example
store := dlq.NewMemDeadLetter()
// A handler that rejects a malformed message as poison.base := func(_ context.Context, _ source.Message) source.Result { return source.Term(errors.New("malformed payload"))}h := dlq.Middleware(store)(base)
h(context.Background(), stubMsg{value: []byte("bad"), subject: "orders"})
rec := store.Records()[0]fmt.Printf("parked subject=%s reason=%s last=%q\n", rec.Subject, rec.Reason, rec.LastError)// Output: parked subject=orders reason=poison last="malformed payload"Output
Section titled “Output”parked subject=orders reason=poison last="malformed payload"type DeadLetter
Section titled “type DeadLetter”DeadLetter is the caller-provided destination a terminal message is parked to. It is the single dependency the middleware reaches through, so the core stays neutral: a Kafka producer to a DLQ topic, a database table writer, or the in-memory MemDeadLetter all satisfy it. Implementations must be safe for concurrent use; the Hopper may publish from several lanes at once.
type DeadLetter interface { // Park persists one dead-lettered record. Returning an error tells the // middleware the park failed, which it surfaces so the engine can decide // whether to retry settlement rather than silently dropping the message. Park(ctx context.Context, rec DeadLetterRecord) error}type DeadLetterRecord
Section titled “type DeadLetterRecord”DeadLetterRecord is the typed envelope stamped onto a parked message. It captures everything needed to triage the failure and to replay the message later: the original payload and routing, the failure [source.Classification] and reason, the attempt count reached, and the last error’s text. It is a value type with named fields — never a magic-string metadata map.
type DeadLetterRecord struct { // Key is the original message key (partition/routing key), copied so the // record does not alias inlet state. Key []byte // Value is the original raw payload bytes. Value []byte // Headers are the original message headers. Headers source.Headers // Subject is the topic or subject the message originally arrived on. Subject string // PartitionKey is the original ordering domain, preserved so a replay can // re-shard onto the same lane. PartitionKey string // Class is the failure classification that sent the message to dead-letter // ([source.Poison], [source.InvalidForState], or [source.Retryable] when retries // were exhausted). Class source.Classification // Reason is a stable, human-readable summary of why the message was parked // (the classification name), for dashboards and triage without parsing Err. Reason string // Attempts is the number of delivery attempts the message reached before it // was parked, read from the retry attempt counter. Attempts int // LastError is the text of the final error, captured for triage. The package // stores the rendered string rather than the error value so a record can be // serialized to any backend without a custom codec. LastError string}type MemDeadLetter
Section titled “type MemDeadLetter”MemDeadLetter is an in-memory dead-letter store that is both a DeadLetter sink and a replayable [source.Inlet]: the concrete realization of “the DLQ is itself an Inlet”. Park terminal failures into it through the dead-letter Middleware, then drain the parked records back through the same [source.Handler] — with each replayed message starting at attempt 1 again — by subscribing to it like any other inlet. It exists for tests and for small single-process replay; production backends supply their own topic/table-backed DeadLetter and a matching replay [source.Inlet].
The zero value is not usable; construct one with NewMemDeadLetter. It is safe for concurrent Park and a single draining [source.Subscription].
type MemDeadLetter struct { // contains filtered or unexported fields}Example (Replay)
store := dlq.NewMemDeadLetter()
// Park a failure.park := dlq.Middleware(store)(func(_ context.Context, _ source.Message) source.Result { return source.Term(errors.New("downstream down"))})park(context.Background(), stubMsg{value: []byte("order-1"), subject: "orders"})
// Later, drain the parking store back through a now-healthy handler. The DLQ// is itself an Inlet, so replay uses the same consume-and-settle loop.sub, _ := store.Subscribe(context.Background(), source.SubscribeConfig{})for { m, err := sub.Next(context.Background()) if errors.Is(err, source.ErrDrained) { break } fmt.Printf("replaying %s\n", m.Value()) _ = sub.Settle(context.Background(), m, source.Ack())}fmt.Printf("remaining parked: %d\n", store.Len())// Output:// replaying order-1// remaining parked: 0Output
Section titled “Output”replaying order-1remaining parked: 0func NewMemDeadLetter
Section titled “func NewMemDeadLetter”func NewMemDeadLetter() *MemDeadLetterNewMemDeadLetter returns an empty in-memory dead-letter store.
func (*MemDeadLetter) Close
Section titled “func (*MemDeadLetter) Close”func (d *MemDeadLetter) Close() errorClose releases the inlet. The in-memory store holds nothing to release.
func (*MemDeadLetter) Len
Section titled “func (*MemDeadLetter) Len”func (d *MemDeadLetter) Len() intLen reports how many records are currently parked.
func (*MemDeadLetter) Park
Section titled “func (*MemDeadLetter) Park”func (d *MemDeadLetter) Park(_ context.Context, rec DeadLetterRecord) errorPark appends rec to the store. It implements DeadLetter and never fails.
func (*MemDeadLetter) Records
Section titled “func (*MemDeadLetter) Records”func (d *MemDeadLetter) Records() []DeadLetterRecordRecords returns a snapshot copy of the parked records, for assertions. The returned slice is independent of the store.
func (*MemDeadLetter) Subscribe
Section titled “func (*MemDeadLetter) Subscribe”func (d *MemDeadLetter) Subscribe(_ context.Context, _ source.SubscribeConfig) (source.Subscription, error)Subscribe drains the parked records as a [source.Subscription]: each parked record is reconstituted into a [source.Message] and yielded once, in park order, after which Next reports [source.ErrDrained]. The cfg is accepted for interface symmetry but ignored — a parking store has no topics or groups to honor. Draining is destructive: a record settled with [source.ActionAck] (a successful replay) is consumed; a record settled with [source.ActionNak] or [source.ActionTerm] is re-parked so a still-failing replay is not lost.
This is the replay path: wire the returned subscription to a Hopper running the same handler, and the parking topic re-flows through the pipeline with attempt counts reset (the replayed message carries no retry attempt, so it is attempt 1 again).
type Option
Section titled “type Option”Option configures the dead-letter Middleware. Options are additive with sensible defaults.
type Option func(*config)func WithParkRetryable
Section titled “func WithParkRetryable”func WithParkRetryable(park bool) OptionWithParkRetryable controls whether a terminal failure still classified [source.Retryable] (a retry-exhausted message escalated to Term) is parked. The default is true. Set false to park only genuinely permanent failures ([source.Poison]/[source.InvalidForState]) and let exhausted retries terminate without a parked record.
Generated by gomarkdoc