source/schema
import "github.com/stablekernel/crucible/source/schema"Package schema provides a validating [source.Middleware] for the crucible source seam: it checks a message against a caller-supplied Validator before the handler runs, and rejects an invalid message as poison ([source.Term] with an Error, which reports [source.ErrPoison]) so it routes straight to dead-letter instead of wasting redeliveries.
The package defines the Validator interface and a simple ContentTypeValidator example; real validators (proto, Avro, JSON-Schema, a CloudEvents schema registry) are caller-provided so the core takes on no schema-library dependency.
- Constants
- func Middleware(v Validator) source.Middleware
- type ContentTypeValidator
- type Error
- type Validator
- type ValidatorFunc
Constants
Section titled “Constants”DefaultContentTypeHeader is the header key ContentTypeValidator reads when its Header field is empty.
const DefaultContentTypeHeader = "content-type"func Middleware
Section titled “func Middleware”func Middleware(v Validator) source.MiddlewareMiddleware returns a [source.Middleware] that validates each message with v before invoking the wrapped handler. A message that fails validation is rejected with [source.Term] carrying an Error (classified [source.Poison]); the handler is never invoked. A valid message flows to the handler unchanged. A nil validator makes the middleware a pass-through.
Example
// Require a JSON content type before the handler runs.v := schema.ContentTypeValidator{Allowed: []string{"application/json"}}
base := func(_ context.Context, _ source.Message) source.Result { fmt.Println("handler ran") return source.Ack()}h := schema.Middleware(v)(base)
// A valid message reaches the handler.valid := stubMsg{headers: source.Headers{{Key: "content-type", Value: "application/json"}}}r1 := h(context.Background(), valid)fmt.Printf("valid: %s\n", r1.Action)
// An invalid message is rejected as poison and never reaches the handler.invalid := stubMsg{subject: "orders", headers: source.Headers{{Key: "content-type", Value: "text/csv"}}}r2 := h(context.Background(), invalid)fmt.Printf("invalid: %s/%s poison=%v\n", r2.Action, r2.Class, errors.Is(r2.Err, source.ErrPoison))// Output:// handler ran// valid: ack// invalid: term/poison poison=trueOutput
Section titled “Output”handler ranvalid: ackinvalid: term/poison poison=truetype ContentTypeValidator
Section titled “type ContentTypeValidator”ContentTypeValidator is a simple example Validator: it requires a message to carry a content-type header whose value is in an allow-list, and (optionally) to have a non-empty payload. It is intended as a starting point and a test double, not a substitute for a real schema validator.
type ContentTypeValidator struct { // Header is the header key the content type is read from. If empty, // [DefaultContentTypeHeader] is used. Header string // Allowed is the set of acceptable content-type values. An empty set accepts // any present content type. Allowed []string // RequireValue rejects a message with an empty [source.Message.Value] when true. RequireValue bool}func (ContentTypeValidator) Validate
Section titled “func (ContentTypeValidator) Validate”func (v ContentTypeValidator) Validate(_ context.Context, m source.Message) errorValidate implements Validator: it checks the content-type header is present and (when an allow-list is set) permitted, and that the payload is non-empty when RequireValue is set.
type Error
Section titled “type Error”Error wraps a validation failure with the subject the message arrived on and the underlying reason. It is errors.Is / errors.As friendly via Unwrap and reports [source.ErrPoison] from Is, so a schema rejection is recognized as poison (and dead-lettered) with a single errors.Is(err, source.ErrPoison) check; never match on its Error string. It mirrors the shape of source.DecodeError.
type Error struct { // Subject is the topic or subject the invalid message arrived on. Subject string // Err is the wrapped underlying validation error. Err error}func (*Error) Error
Section titled “func (*Error) Error”func (e *Error) Error() stringError implements error.
func (*Error) Is
Section titled “func (*Error) Is”func (e *Error) Is(target error) boolIs reports an *Error as matching [source.ErrPoison], so middleware can route a schema rejection to dead-letter as poison without string-matching.
func (*Error) Unwrap
Section titled “func (*Error) Unwrap”func (e *Error) Unwrap() errorUnwrap returns the wrapped validation error so errors.As reaches it.
type Validator
Section titled “type Validator”Validator decides whether a message is well-formed before it reaches the handler. A nil error means valid; a non-nil error rejects the message as poison. Implementations must be safe for concurrent use; the Hopper validates from several lanes at once.
type Validator interface { // Validate checks m and returns nil if it conforms, or an error describing why // it does not. The middleware wraps a non-nil error in an [Error]. Validate(ctx context.Context, m source.Message) error}type ValidatorFunc
Section titled “type ValidatorFunc”ValidatorFunc adapts a function to the Validator interface.
type ValidatorFunc func(ctx context.Context, m source.Message) errorfunc (ValidatorFunc) Validate
Section titled “func (ValidatorFunc) Validate”func (f ValidatorFunc) Validate(ctx context.Context, m source.Message) errorValidate implements Validator.
Generated by gomarkdoc