Skip to content

source

import "github.com/stablekernel/crucible/source"

Package source is Crucible’s ingress seam: the symmetric counterpart to sink. Where sink fans an emitted effect out to many destinations, source funnels inbound messages from external streams into the suite, with delivery semantics (ack/nak/term, ordering, backpressure) that egress does not need.

The themed surface is exactly two names, mirroring sink one-for-one:

  • Inlet is a per-backend adapter (mirrors sink.Outlet). It opens a Subscription that yields messages and settles them. Concrete inlets (Kafka, JetStream, the in-memory test inlet) live in their own modules so vendor SDKs never enter this core’s dependency graph.
  • Hopper is the consume engine (mirrors sink.Manifold). It drives a Subscription with bounded, per-key-ordered concurrency, decodes payloads, runs the middleware chain, invokes the Handler, and settles each message according to the Result the handler returns.

Everything else is literal: Message, Handler, Result, Subscription, Cursor, and the codec, retry, dead-letter, and replay concepts.

Delivery is at-least-once by default: a message is acked only after its handler reports success, never before processing. A handler returns a ResultAck, Nak, Term, InProgress, or Manual — and the Hopper applies it to the backend. Backends differ (Kafka commits offsets per partition; JetStream acks per message), so capabilities a backend may or may not have — replay, consumer groups, transactions — are optional interfaces discovered by type assertion (Seekable, ConsumerGroups, Transactional, …) rather than a lowest-common-denominator API that lies about what a backend can do.

The core is stdlib-only. Logging is *slog.Logger (no-op by default), tracing and metrics go through the vendor-neutral telemetry interface (no-op by default), and no vendor type appears in any public signature. A zero-option Hopper is fully functional and silent.

ContentTypeHeader is the header key a Registry reads to select a Codec for a message. An inlet that carries a content type maps its backend’s native header onto this key; when it is absent or unmatched, the registry falls back to its default codec.

const ContentTypeHeader = "content-type"

ErrDrained reports that a Subscription has been closed and every delivered message settled: there is nothing more to consume. A Subscription.Next that returns ErrDrained tells the Hopper its fetch loop is done and to exit cleanly. Match it with errors.Is.

var ErrDrained = errors.New("source: subscription drained")

ErrInvalidForState classifies a well-formed message that is not legal for its target’s current state — a guard rejection from the state-machine bridge. It is the errors.Is-matchable sentinel for the InvalidForState classification, distinct from ErrPoison so “wrong time” is legible separately from “wrong message”. A GuardRejection unwraps to it.

var ErrInvalidForState = errors.New("source: event invalid for current state")

ErrNoCodec reports that a Registry holds no Codec able to decode a message — neither one keyed to the message’s content type nor a default. The Hopper treats it as a poison condition: an undecodable payload cannot be retried into legibility, so the message is terminated, not re-delivered. Match it with errors.Is.

var ErrNoCodec = errors.New("source: no codec for message")

ErrPoison classifies a permanently-bad message that retrying cannot fix (an undecodable payload, a violated invariant). It is the sentinel a handler or middleware matches against with errors.Is to recognize a poison failure regardless of the concrete error wrapping it; the Poison classification is its Classification counterpart.

var ErrPoison = errors.New("source: poison message")

ErrRetryable classifies a transient failure worth retrying (a timeout, a connection blip). It is the errors.Is-matchable sentinel for the Retryable classification, so retry middleware can recognize a retryable failure by errors.Is rather than by string-matching the underlying error.

var ErrRetryable = errors.New("source: retryable error")

func DecodeTyped[T any](r *Registry, m Message) (T, error)

DecodeTyped decodes m through r and asserts the result to T, the generic helper a typed handler uses to recover a concrete value from the registry. A decode failure returns the \*DecodeError from the registry; a type mismatch (the codec produced some other type) returns a *DecodeError wrapping ErrPoison, since a payload that decodes to the wrong shape cannot be retried into the right one.

func Decoded(m Message) (any, bool)

Decoded returns the value a codec-configured Hopper decoded for m, and whether one is present. When the Hopper has no registry, or m was not produced by such a Hopper, it returns (nil, false) and the handler reads m.Value itself.

Action is the disposition the Hopper applies to a message after its handler returns. It maps onto each backend’s native settle vocabulary.

type Action uint8

const (
// ActionAck reports success: the message is acknowledged (JetStream Ack;
// Kafka offset marked for commit). This is the zero value, so a Result{}
// means "ack".
ActionAck Action = iota
// ActionNak asks for redelivery: the message failed transiently and should
// be tried again (JetStream Nak/NakWithDelay; Kafka declines to commit so the
// record is re-read). Result.Requeue is an optional backoff delay.
ActionNak
// ActionTerm rejects the message permanently: it must not be redelivered
// (JetStream Term; Kafka routes it to a dead-letter topic, then commits).
ActionTerm
// ActionInProgress reports the handler needs more time, extending the ack
// deadline without settling (JetStream InProgress). A no-op on backends with
// no ack deadline.
ActionInProgress
// ActionManual reports the handler already settled the message itself through
// Message.As and the backend client; the Hopper takes no settle action.
ActionManual
)

func (a Action) String() string

String renders the action for logs and metrics attributes.

Batched is a Subscription that can yield and settle messages in batches, amortizing per-message overhead. The Hopper uses it when present to fetch and ack in groups; an unbatched subscription is driven one message at a time.

type Batched interface {
// NextBatch returns up to limit messages, blocking for at least one, or
// ctx.Err()/ErrDrained as [Subscription.Next] would.
NextBatch(ctx context.Context, limit int) ([]Message, error)
// SettleBatch applies r to every message in ms in one call.
SettleBatch(ctx context.Context, ms []Message, r Result) error
}

Classification tags why a handler failed, so middleware (retry, dead-letter) can route without string-matching errors. It is orthogonal to Action: an ActionNak is usually Retryable, an ActionTerm usually Poison or InvalidForState.

type Classification uint8

const (
// Unclassified is the zero value: no specific class was reported.
Unclassified Classification = iota
// Retryable marks a transient failure worth retrying (a timeout, a
// connection blip). Retry middleware backs off and re-delivers.
Retryable
// Poison marks a permanently-bad message (an undecodable payload, a failed
// invariant) that retrying cannot fix. It routes straight to dead-letter.
Poison
// InvalidForState marks a message that is well-formed but not legal for the
// target's current state (a guard rejection from the state-machine bridge).
// Distinct from Poison so consumers can treat "wrong time" differently from
// "wrong message".
InvalidForState
// Drop marks a message to discard silently (a duplicate, an out-of-scope
// event): acked, not retried, not dead-lettered.
Drop
)

func (c Classification) String() string

String renders the classification for logs and metrics attributes.

Codec decodes a raw message payload into a domain value. It is the instance seam for turning bytes on the wire into the value a Handler works with; there is no package-global codec registration (the global-registry anti-pattern is deliberately avoided), so every Codec is constructed and registered into a Registry that is injected, never shared by import.

Implementations must be safe for concurrent use: the Hopper decodes from per-lane worker goroutines.

type Codec interface {
// Decode turns raw bytes and their headers into a domain value. A failure is
// reported plainly; the Hopper wraps it in a [*DecodeError] with the selecting
// content type and subject before routing the message to dead-letter.
Decode(data []byte, h Headers) (any, error)
}

CodecFunc adapts a plain function to a Codec.

type CodecFunc func(data []byte, h Headers) (any, error)

func (f CodecFunc) Decode(data []byte, h Headers) (any, error)

Decode calls the underlying function.

ConsumerGroups is a Subscription that participates in competing-consumer rebalancing across a partitioned backend and exposes the assignment lifecycle so a consumer can drain and commit before partitions move. Satisfied by Kafka only. JetStream’s durable consumer is the grouping analog but has no partitions and no assignment callbacks, so JetStream does NOT satisfy this — it satisfies SharedDurable instead.

type ConsumerGroups interface {
// GroupID returns the consumer group the subscription belongs to.
GroupID() string
// OnAssigned registers a callback invoked when partitions are assigned to this
// member, before their messages are delivered.
OnAssigned(func(ctx context.Context, assigned []Partition))
// OnRevoked registers a callback invoked before partitions are revoked, the
// window in which the consumer drains in-flight work and commits.
OnRevoked(func(ctx context.Context, revoked []Partition))
}

Cursor is an opaque, resumable position within a single stream: a Kafka offset, a JetStream stream sequence, a Redis entry ID. It is deliberately minimal — a stream-local coordinate that a Seekable inlet can resume from. Cursors are only meaningful within the stream that produced them; they are not comparable across inlets or topics.

type Cursor interface {
// String renders the cursor for logs and diagnostics. It carries no
// semantics beyond being stable for a given position.
String() string
}

DecodeError wraps a codec failure with the content type that selected the codec and the underlying error. It is errors.Is / errors.As friendly via Unwrap and reports ErrPoison from Is, so a decode failure is recognized as poison without string-matching; never match on its Error string.

type DecodeError struct {
// ContentType is the content type (or header value) that selected the codec,
// or "" when the default codec was used.
ContentType string
// Subject is the topic or subject the undecodable message arrived on.
Subject string
// Err is the wrapped underlying codec error.
Err error
}

func (e *DecodeError) Error() string

Error implements error.

func (e *DecodeError) Is(target error) bool

Is reports a *DecodeError as matching ErrPoison, so middleware can route a decode failure to dead-letter with a single errors.Is(err, ErrPoison) check.

func (e *DecodeError) Unwrap() error

Unwrap returns the wrapped codec error so errors.As reaches it.

Deduper is a Subscription (or an inlet seam) that suppresses re-delivery of an already-processed message by an idempotency key. The no-op default is “no deduplication”; the state-machine bridge supplies the machine’s state version as the key so redelivery is provably idempotent with no external store.

type Deduper interface {
// Seen reports whether key has already been processed (and records it if not),
// so the Hopper can skip a duplicate by acking without re-running the handler.
Seen(ctx context.Context, key string) (bool, error)
}

GuardRejection wraps a guard/Assay rejection: a well-formed event that is not legal for the target’s current state. It is errors.Is / errors.As friendly via Unwrap and reports ErrInvalidForState from Is, so a guard rejection is recognized as state-invalid (and routed to dead-letter as a distinct, “wrong time” outcome) without string-matching; never match on its Error string.

type GuardRejection struct {
// Event names the inbound event that was rejected, for diagnostics.
Event string
// State names the target's current state at rejection time.
State string
// Err is the wrapped underlying rejection error.
Err error
}

func (e *GuardRejection) Error() string

Error implements error.

func (e *GuardRejection) Is(target error) bool

Is reports a *GuardRejection as matching ErrInvalidForState, so middleware can route a guard rejection with a single errors.Is(err, ErrInvalidForState) check.

func (e *GuardRejection) Unwrap() error

Unwrap returns the wrapped rejection error so errors.As reaches it.

Handler processes one decoded message and returns a Result describing how the Hopper should settle it. A handler performs the work; it does not ack, nak, or commit — returning the Result is how it asks the engine to. Handlers must be safe for concurrent use: the Hopper invokes them from per-lane worker goroutines.

type Handler func(ctx context.Context, m Message) Result

func Chain(h Handler, mw ...Middleware) Handler

Chain composes middleware around h and returns the resulting Handler. The first middleware in the list is the outermost: it runs first on the way in and last on the way out. Composing

Chain(h, A, B, C)

yields A(B(C(h))), so a message flows A → B → C → h and the Result returns h → C → B → A. With no middleware, Chain returns h unchanged.

Example

ExampleChain shows middleware composition: the first middleware listed is the outermost, so a message flows outer to inner and the result returns inner to outer.

package main
import (
"context"
"fmt"
"github.com/stablekernel/crucible/source"
)
func main() {
tag := func(label string) source.Middleware {
return func(next source.Handler) source.Handler {
return func(ctx context.Context, m source.Message) source.Result {
fmt.Println("enter", label)
return next(ctx, m)
}
}
}
base := func(context.Context, source.Message) source.Result {
fmt.Println("handle")
return source.Ack()
}
h := source.Chain(base, tag("outer"), tag("inner"))
_ = h(context.Background(), nil)
}
enter outer
enter inner
handle

Header is a single inbound metadata entry. Headers are typed key/value pairs, not a magic-string map: an inlet maps its backend’s native headers (Kafka record headers, NATS headers) onto this shape, and a handler reads them through Headers.Get rather than indexing an untyped map.

type Header struct {
Key string
Value string
}

Headers is the immutable metadata carried alongside a Message. It is a value type: a copy is independent of the message it came from, so a handler may hold or forward headers without aliasing inlet state.

type Headers []Header

func (h Headers) Get(key string) (string, bool)

Get returns the value of the first header with the given key and whether it was present. Keys are matched exactly (case-sensitive); inlets normalize backend casing when they build the slice.

func (h Headers) Keys() []string

Keys returns the header keys in order, including duplicates. The result is a fresh slice the caller may retain.

Hopper is the consume engine: it drives a Subscription with bounded, per-key-ordered concurrency, decodes each payload, runs the middleware chain, invokes the Handler, and settles the message according to the Result the handler returns. It mirrors sink.Manifold — the one orchestrator that owns the hard parts (ordering, backpressure, settle) so every Inlet adapter stays thin.

Ordering. Each message is routed to an ordered lane keyed by its Message.PartitionKey (or, when that is empty, by a hash of Message.Key). A lane is a single goroutine that processes its queue strictly in arrival order, so two messages with the same key are never reordered, while distinct keys run in parallel up to WithConcurrency. This is the guarantee a statechart instance needs: its events arrive in order.

Backpressure. WithMaxInFlight bounds the messages delivered but not yet settled; when the window is full the fetch loop blocks before pulling the next message, so a slow handler throttles the subscription rather than buffering unboundedly.

Lifecycle. Run consumes until the context is canceled, the subscription drains (ErrDrained), or Close is called. On a clean drain it returns nil; in every shutdown path it stops fetching, lets in-flight messages finish and settle, and then returns. Construct with New; the zero value is unusable.

type Hopper struct {
// contains filtered or unexported fields
}
Example

ExampleHopper shows the core loop: an in-memory inlet feeds scripted messages to a Hopper, which decodes each with a JSON codec, runs the handler, and settles by the returned Result.

package main
import (
"context"
"fmt"
"github.com/stablekernel/crucible/source"
"github.com/stablekernel/crucible/source/memsource"
)
// orderPlaced is a sample domain event arriving on a stream.
type orderPlaced struct {
ID string `json:"id"`
Qty int `json:"qty"`
}
func main() {
in := memsource.New(memsource.WithMessages(
memsource.Msg{Key: "A", Value: []byte(`{"id":"A","qty":2}`)},
memsource.Msg{Key: "B", Value: []byte(`{"id":"B","qty":5}`)},
))
hp := source.New(source.WithCodec(source.NewJSONCodec[orderPlaced]()))
sub, _ := in.Subscribe(context.Background(), source.SubscribeConfig{Topics: []string{"orders"}})
_ = sub.Close() // drain once the two queued messages settle
_ = hp.Run(context.Background(), sub, func(_ context.Context, m source.Message) source.Result {
v, _ := source.Decoded(m)
o := v.(orderPlaced)
fmt.Printf("order %s qty %d\n", o.ID, o.Qty)
return source.Ack()
})
}
order A qty 2
order B qty 5

func New(opts ...Option) *Hopper

New constructs a Hopper with the given options. With no options it runs a single ordered lane, an unbounded in-flight window, no codec (the raw message reaches the handler), and is silent and untraced.

func (hp *Hopper) Close() error

Close begins a graceful drain: the running Hopper stops fetching, finishes and settles in-flight messages, and Run returns. Close is idempotent and never blocks on the drain; it signals, and Run completes the drain.

func (hp *Hopper) Receive(ctx context.Context, in Inlet, cfg SubscribeConfig, h Handler) error

Receive subscribes to in with cfg and drives the resulting subscription with h, a convenience for the common Subscribe-then-Run path. It closes the subscription on return.

func (hp *Hopper) Run(ctx context.Context, sub Subscription, h Handler) error

Run drives sub with h until the context is canceled, the subscription drains, or the Hopper is closed. It returns nil on a clean drain (ErrDrained) or a context cancellation, and the underlying error otherwise. Run does not close sub; the caller (or [Receive]) owns that.

Inlet is a per-backend ingress adapter: it opens subscriptions onto an external stream. It mirrors sink.Outlet — thin, vendor-specific, and not itself the consume engine. The Hopper drives whatever an Inlet returns.

Concrete inlets (source/kafka, source/jetstream, source/memsource) live in their own modules so their vendor SDKs never enter this core’s dependency graph. An inlet may also implement optional capability interfaces (Seekable, ConsumerGroups, …) that the Hopper detects by type assertion.

type Inlet interface {
// Subscribe opens a Subscription for cfg. The returned Subscription is
// driven by the Hopper; the caller closes it (or the Inlet) to drain.
Subscribe(ctx context.Context, cfg SubscribeConfig) (Subscription, error)
// Close releases the inlet's resources (connections, clients). It does not
// settle in-flight messages; close live Subscriptions first.
Close() error
}

JSONCodec is a built-in Codec that decodes a JSON payload into a fresh value of type T. It is the zero-dependency default: register it under “application/json” or as the registry default.

Construct it with NewJSONCodec; the zero value works too, since it holds no state.

type JSONCodec[T any] struct{}

func NewJSONCodec[T any]() JSONCodec[T]

NewJSONCodec returns a JSONCodec that decodes payloads into values of type T.

func (JSONCodec[T]) Decode(data []byte, _ Headers) (any, error)

Decode unmarshals data as JSON into a new T and returns it. Headers are ignored; JSON carries its own shape. A malformed payload returns the json.Unmarshal error, which the Registry wraps in a \*DecodeError.

LagReporter is a Subscription that can report how far behind the tail it is, the headline health signal for a consumer. The Hopper feeds it into a lag gauge when present. Satisfied by backends that expose a high-water mark (Kafka end offsets, JetStream pending counts).

type LagReporter interface {
// Lag returns the number of unconsumed messages between the committed position
// and the stream tail, across all assigned partitions/subjects.
Lag(ctx context.Context) (int64, error)
}

Message is a backend-neutral inbound message. An inlet adapts its native record or message onto this interface; the underlying vendor value never appears in the public surface and is reachable only through As, the documented escape hatch for power users who must drop to the driver.

Implementations are read-only from a handler’s perspective: ack/nak/term is driven by the Result a Handler returns, which the Hopper applies via Subscription.Settle — a handler never mutates or settles a message itself (except deliberately, via As + a Manual result).

type Message interface {
// Key is the partitioning/routing key, or nil if the producer set none.
Key() []byte
// Value is the raw payload bytes, pre-decode.
Value() []byte
// Headers is the message metadata.
Headers() Headers
// Subject is the topic (Kafka) or subject (JetStream) the message arrived on.
Subject() string
// PartitionKey identifies the ordering domain the message belongs to: the
// Kafka "topic/partition", or "" on backends without partitions (the Hopper
// then shards by Key). Two messages with the same non-empty PartitionKey are
// delivered to the same ordered lane.
PartitionKey() string
// Cursor is the message's resumable position within its stream.
Cursor() Cursor
// As assigns the underlying backend message to target if the dynamic types
// match, returning whether it did. It is the escape hatch to reach a vendor
// value (for a manual ack, say); prefer the neutral surface above.
As(target any) bool
}

Middleware decorates a Handler, returning a new Handler that wraps the original. It is the composition seam for cross-cutting concerns — retry classification, dead-letter routing, idempotency, schema validation, tracing — each shipped as its own opt-in decorator rather than baked into the engine. A middleware may run logic before the inner handler, after it (by inspecting the returned Result), or short-circuit it entirely.

type Middleware func(Handler) Handler

Option configures a Hopper. Options are additive and have no-op defaults; a nil value passed to a With* option is ignored, leaving the default in place.

type Option func(*config)

func WithCodec(codec Codec) Option

WithCodec sets a single default Codec the Hopper decodes every message with, a shorthand for a registry holding only a default. It builds a fresh Registry with codec as its default; combine with WithRegistry only when you need content-type routing. A nil codec is ignored.

func WithConcurrency(n int) Option

WithConcurrency caps the number of ordered lanes that run in parallel: at most n distinct partition keys are processed concurrently, while messages sharing a key always run on one lane in order. The default is 1 (strict global order). A value < 1 is ignored, leaving the default.

func WithLogger(l *slog.Logger) Option

WithLogger sets the structured logger the Hopper writes processing failures to. The default discards all records. A nil logger is ignored.

func WithMaxInFlight(n int) Option

WithMaxInFlight bounds the number of messages delivered but not yet settled. When the window is full the fetch loop blocks, applying backpressure to the subscription. The default (0) is unbounded. A value < 0 is ignored.

func WithMeter(m telemetry.Meter) Option

WithMeter sets the meter the Hopper records its counters and lag gauge on. The default is telemetry.NopMeter(). A nil meter is ignored.

func WithMiddleware(mw ...Middleware) Option

WithMiddleware appends middleware to wrap the handler, additive across repeated uses. The first middleware supplied is the outermost (see Chain). Nil entries are skipped.

func WithName(name string) Option

WithName sets the name the Hopper reports in logs and telemetry attributes. The default is “hopper”. An empty name is ignored.

func WithRegistry(r *Registry) Option

WithRegistry sets the Registry the Hopper resolves a per-message Codec from by content type. The default is no registry, in which case the raw Message reaches the handler undecoded. A nil registry is ignored.

func WithTracer(t telemetry.Tracer) Option

WithTracer sets the tracer the Hopper starts per-message spans on. The default is telemetry.NopTracer(). A nil tracer is ignored.

OrderedDelivery is a Subscription that guarantees total, single-stream delivery order at the cost of concurrency: every message arrives strictly in order on one logical flow. Satisfied by a JetStream OrderedConsumer. A Hopper driving such a subscription runs a single lane (no cross-key parallelism), so it is mutually exclusive with high concurrency.

type OrderedDelivery interface {
// OrderedDelivery is a marker; its presence is the guarantee.
OrderedDelivery()
}

Partition identifies one ordering domain within a topic on a partitioned backend (a Kafka topic/partition). It is the unit of assignment a ConsumerGroups subscription is granted or has revoked. Backends without partitions (JetStream) do not produce Partitions.

type Partition struct {
// Topic is the topic the partition belongs to.
Topic string
// ID is the partition number within the topic.
ID int32
}

PartitionOrdered is a Subscription that guarantees per-partition delivery order: messages within a partition arrive in the order they were produced. Satisfied by Kafka. It is the structural guarantee the Hopper relies on to key its ordered lanes by partition.

type PartitionOrdered interface {
// PartitionOrdered is a marker; its presence is the guarantee.
PartitionOrdered()
}

Position is an opaque seek target for a Seekable subscription: a stream-local coordinate (a Kafka offset, a JetStream stream sequence) the backend produced and can resume from. Like a Cursor it is meaningful only within the stream that issued it and is not comparable across inlets or topics; unlike a Cursor, which marks where a delivered message sat, a Position is a request to resume delivery from a chosen point.

type Position interface {
// String renders the position for logs and diagnostics. It carries no
// semantics beyond being stable for a given coordinate.
String() string
}

Registry maps a content type to a Codec, with an optional default for messages that carry no content type or one the registry does not know. There is no package-level registry and no init-time state: every Registry is constructed with NewRegistry and injected. It is safe for concurrent Register, SetDefault, and Decode.

type Registry struct {
// contains filtered or unexported fields
}

func NewRegistry() *Registry

NewRegistry returns an empty Registry with no default codec. Register codecs by content type and optionally SetDefault before use, or pass it to WithRegistry; a registry with a single default codec behaves like an always-decode-this-way pipeline.

func (r *Registry) Decode(m Message) (any, error)

Decode resolves a codec for m’s headers and decodes its value. A resolution miss returns a \*DecodeError wrapping ErrNoCodec; a codec failure returns a *DecodeError wrapping the codec’s error. Both report ErrPoison via errors.Is.

Example

ExampleRegistry_Decode shows content-type routing: a registry selects a codec by the message’s content-type header, falling back to its default otherwise.

package main
import (
"context"
"fmt"
"github.com/stablekernel/crucible/source"
"github.com/stablekernel/crucible/source/memsource"
)
// orderPlaced is a sample domain event arriving on a stream.
type orderPlaced struct {
ID string `json:"id"`
Qty int `json:"qty"`
}
func main() {
reg := source.NewRegistry().
Register("application/json", source.NewJSONCodec[orderPlaced]())
in := memsource.New()
in.Queue(memsource.Msg{
Value: []byte(`{"id":"Z","qty":1}`),
Headers: source.Headers{{Key: "content-type", Value: "application/json"}},
})
sub, _ := in.Subscribe(context.Background(), source.SubscribeConfig{})
m, _ := sub.Next(context.Background())
v, _ := reg.Decode(m)
fmt.Println(v.(orderPlaced).ID)
}
Z

func (r *Registry) Register(contentType string, codec Codec) *Registry

Register binds contentType to codec, overwriting any prior codec for that type. It returns the registry for chaining.

func (r *Registry) SetDefault(codec Codec) *Registry

SetDefault sets the codec used when a message carries no content type, or one no registered codec matches. It returns the registry for chaining.

Result is what a Handler returns: the Action the Hopper should apply, an optional Classification of the failure, an optional redelivery delay, and the underlying error. The zero Result is a successful ack, so a handler that returns Result{} (or uses Ack) acknowledges the message.

type Result struct {
// Action is the disposition to apply. The zero value is ActionAck.
Action Action
// Class classifies a failure for middleware routing. Ignored for ActionAck.
Class Classification
// Requeue is an optional backoff before redelivery, honored on backends that
// support delayed nak (JetStream); best-effort elsewhere.
Requeue time.Duration
// Err is the underlying error, surfaced to telemetry and dead-letter
// metadata. Nil for a successful ack.
Err error
}

func Ack() Result

Ack returns a successful Result: the message is acknowledged.

func InProgress() Result

InProgress returns a Result that extends the ack deadline without settling.

func Manual() Result

Manual returns a Result reporting the handler settled the message itself.

func Nak(err error) Result

Nak returns a Result asking for immediate redelivery, classified Retryable.

func NakAfter(d time.Duration, err error) Result

NakAfter returns a Result asking for redelivery after delay d, classified Retryable. The delay is honored where the backend supports it.

func Reject(err error) Result

Reject returns a Result rejecting the message as invalid for the target’s current state, classified InvalidForState. It dead-letters like Term but carries the distinct class so a “wrong time” rejection is legible.

func Skip() Result

Skip returns a Result that acknowledges and discards the message, classified Drop: used for duplicates or out-of-scope events that should neither retry nor dead-letter.

func Term(err error) Result

Term returns a Result rejecting the message permanently (dead-letter), classified Poison.

func (r Result) Failed() bool

Failed reports whether the result carries a failure (anything that is not a plain ack/skip).

Seekable is a Subscription that can reposition its read cursor to replay or skip ahead: the basis for replay-driven state reconstruction. Seeking takes effect on the next Subscription.Next. Satisfied by Kafka (live SetOffsets) and JetStream (by recreating the consumer at the target). A backend that cannot reposition simply does not implement it.

type Seekable interface {
// SeekToTime repositions delivery to the first message at or after t.
SeekToTime(ctx context.Context, t time.Time) error
// SeekToCursor repositions delivery to resume from a previously observed
// [Cursor] (re-delivering from just after it, per the backend's convention).
SeekToCursor(ctx context.Context, c Cursor) error
// SeekToStart repositions delivery to the earliest retained message.
SeekToStart(ctx context.Context) error
// SeekToEnd repositions delivery to the tail, skipping the backlog so only
// messages produced after the seek are delivered.
SeekToEnd(ctx context.Context) error
}

SharedDurable is a Subscription backed by a named durable consumer that load-balances across processes without partition assignment: the competing-consumer analog on a backend that has no partitions. Satisfied by JetStream (a named durable consumer). Distinct from ConsumerGroups precisely because there are no partitions and no assignment lifecycle to observe.

type SharedDurable interface {
// Durable returns the durable consumer name the subscription shares.
Durable() string
}

SubscribeConfig is the backend-neutral subscription request an Inlet resolves into a live Subscription. Backend-specific tuning (Kafka balancer, JetStream ack-wait, pull batch sizes) is supplied through the concrete inlet’s own functional options at construction time, never here — this struct carries only what every backend understands.

type SubscribeConfig struct {
// Topics are the topics (Kafka) or subjects (JetStream) to consume. At least
// one is required.
Topics []string
// Group is the consumer group (Kafka) or durable consumer name (JetStream)
// for competing-consumer load balancing. Empty means a standalone/ephemeral
// subscription that receives every message on its own.
Group string
}

Subscription is a live stream of inbound messages from an Inlet. It is a thin pull-and-settle surface the Hopper drives: the engine owns the consume loop, concurrency, decoding, and the middleware chain, while the Subscription only fetches the next message and applies the engine’s settle decision to the backend. This split keeps every adapter small and uniform — the hard parts (ordering, backpressure, retry) live once, in the Hopper.

A Subscription is single-consumer: Next is called from one goroutine (the Hopper’s fetch loop). Settle may be called concurrently from worker goroutines and must be safe for that.

type Subscription interface {
// Next returns the next message. It blocks until one is available, returns
// ctx.Err() if ctx is canceled, or returns ErrDrained once the subscription
// has been closed and all delivered messages settled.
Next(ctx context.Context) (Message, error)
// Settle applies a handler [Result] to a message previously returned by Next:
// ack/commit, schedule redelivery, route to dead-letter, or extend the
// deadline, per Result.Action. It is the single point where a delivery
// decision reaches the backend.
Settle(ctx context.Context, m Message, r Result) error
// Close begins a graceful drain: Next stops yielding new messages, and once
// in-flight messages are settled, Next returns ErrDrained. Close is
// idempotent.
Close() error
}

Transactional is a Subscription that can fence message settlement inside a transaction, so consume-process-produce is atomic (exactly-once into a sink). Satisfied by Kafka (EOS) only. JetStream has no equivalent and does NOT implement it — the capability is absent rather than faked.

type Transactional interface {
// Begin starts a transaction; settlement of messages received during it is
// committed or aborted atomically with the work done inside fn.
Begin(ctx context.Context, fn func(ctx context.Context) error) error
}

Generated by gomarkdoc