Skip to content

source/schema

import "github.com/stablekernel/crucible/source/schema"

Package schema provides a validating [source.Middleware] for the crucible source seam: it checks a message against a caller-supplied Validator before the handler runs, and rejects an invalid message as poison ([source.Term] with an Error, which reports [source.ErrPoison]) so it routes straight to dead-letter instead of wasting redeliveries.

The package defines the Validator interface and a simple ContentTypeValidator example; real validators (proto, Avro, JSON-Schema, a CloudEvents schema registry) are caller-provided so the core takes on no schema-library dependency.

DefaultContentTypeHeader is the header key ContentTypeValidator reads when its Header field is empty.

const DefaultContentTypeHeader = "content-type"

func Middleware(v Validator) source.Middleware

Middleware returns a [source.Middleware] that validates each message with v before invoking the wrapped handler. A message that fails validation is rejected with [source.Term] carrying an Error (classified [source.Poison]); the handler is never invoked. A valid message flows to the handler unchanged. A nil validator makes the middleware a pass-through.

Example

// Require a JSON content type before the handler runs.
v := schema.ContentTypeValidator{Allowed: []string{"application/json"}}
base := func(_ context.Context, _ source.Message) source.Result {
fmt.Println("handler ran")
return source.Ack()
}
h := schema.Middleware(v)(base)
// A valid message reaches the handler.
valid := stubMsg{headers: source.Headers{{Key: "content-type", Value: "application/json"}}}
r1 := h(context.Background(), valid)
fmt.Printf("valid: %s\n", r1.Action)
// An invalid message is rejected as poison and never reaches the handler.
invalid := stubMsg{subject: "orders", headers: source.Headers{{Key: "content-type", Value: "text/csv"}}}
r2 := h(context.Background(), invalid)
fmt.Printf("invalid: %s/%s poison=%v\n", r2.Action, r2.Class, errors.Is(r2.Err, source.ErrPoison))
// Output:
// handler ran
// valid: ack
// invalid: term/poison poison=true
handler ran
valid: ack
invalid: term/poison poison=true

ContentTypeValidator is a simple example Validator: it requires a message to carry a content-type header whose value is in an allow-list, and (optionally) to have a non-empty payload. It is intended as a starting point and a test double, not a substitute for a real schema validator.

type ContentTypeValidator struct {
// Header is the header key the content type is read from. If empty,
// [DefaultContentTypeHeader] is used.
Header string
// Allowed is the set of acceptable content-type values. An empty set accepts
// any present content type.
Allowed []string
// RequireValue rejects a message with an empty [source.Message.Value] when true.
RequireValue bool
}

func (v ContentTypeValidator) Validate(_ context.Context, m source.Message) error

Validate implements Validator: it checks the content-type header is present and (when an allow-list is set) permitted, and that the payload is non-empty when RequireValue is set.

Error wraps a validation failure with the subject the message arrived on and the underlying reason. It is errors.Is / errors.As friendly via Unwrap and reports [source.ErrPoison] from Is, so a schema rejection is recognized as poison (and dead-lettered) with a single errors.Is(err, source.ErrPoison) check; never match on its Error string. It mirrors the shape of source.DecodeError.

type Error struct {
// Subject is the topic or subject the invalid message arrived on.
Subject string
// Err is the wrapped underlying validation error.
Err error
}

func (e *Error) Error() string

Error implements error.

func (e *Error) Is(target error) bool

Is reports an *Error as matching [source.ErrPoison], so middleware can route a schema rejection to dead-letter as poison without string-matching.

func (e *Error) Unwrap() error

Unwrap returns the wrapped validation error so errors.As reaches it.

Validator decides whether a message is well-formed before it reaches the handler. A nil error means valid; a non-nil error rejects the message as poison. Implementations must be safe for concurrent use; the Hopper validates from several lanes at once.

type Validator interface {
// Validate checks m and returns nil if it conforms, or an error describing why
// it does not. The middleware wraps a non-nil error in an [Error].
Validate(ctx context.Context, m source.Message) error
}

ValidatorFunc adapts a function to the Validator interface.

type ValidatorFunc func(ctx context.Context, m source.Message) error

func (f ValidatorFunc) Validate(ctx context.Context, m source.Message) error

Validate implements Validator.

Generated by gomarkdoc