Skip to content

The Inlet and Hopper model

Two types carry the structural model, mirroring sink’s Outlet/Manifold one-for-one. Everything else is literal.

An Inlet is one place messages come from. It opens subscriptions and acks; the vendor record stays private behind an As() escape hatch.

type Inlet interface {
Subscribe(ctx context.Context, cfg SubscribeConfig) (Subscription, error)
Close() error
}
type Subscription interface {
Receive(ctx context.Context, h Handler) error // runs until ctx cancel
Stream(ctx context.Context) iter.Seq2[Message, error] // the read-loop alternative
Close() error // graceful drain
}

Every message is backend-neutral. No vendor type appears in the public surface; the rare reach-through goes through As():

type Message interface {
Key() []byte
Value() []byte
Headers() Headers // typed accessors, not a magic-string map
Subject() string // topic (Kafka) or subject (JetStream)
PartitionKey() string // partition identity (Kafka) or "" (JetStream)
Cursor() Cursor // opaque, resumable position (offset or stream seq)
As(target any) bool // documented escape hatch to the vendor message
}

Handler and Result: the engine acts, you decide

Section titled “Handler and Result: the engine acts, you decide”

A Handler is your business logic. It returns a Result; the Hopper does the acking. There are no stateful message objects with a hidden ack channel.

type Handler func(ctx context.Context, m Message) Result
type Result struct {
Action Action // Ack | Nak | Term | InProgress | Manual
Requeue time.Duration // Nak delay
Class Classification // Retryable | Poison | Drop | Throttle | InvalidForState
Err error
}

The ack is handler-return-driven, with a manual override and a Term outcome. The contract is ack always after durable success: at-least-once is the default, never commit-before-process.

ActionMeaning
Ackprocessed successfully; commit past this message
Nak(delay)transient failure; redeliver, optionally after a delay
Termpoison or invalid-for-state; do not retry, route to DLQ
InProgresslong handler still working; extend the ack deadline
Manualthe handler acked itself through As() (batched commit, double-ack)

A typed handler resolves the generic T through the instance-scoped codec registry:

type TypedHandler[T any] func(ctx context.Context, m Typed[T]) Result

A Hopper owns the consume loop. Construct it with functional options, the same shape as sink’s Manifold, and every seam has a no-op default so a zero-option build is fully functional and silent.

h := source.NewHopper(
source.WithLogger(log), // *slog.Logger; default discards
source.WithTracer(tracer), // telemetry.Tracer; default no-op
source.WithMeter(meter), // telemetry.Meter; default no-op
source.WithMaxInFlight(256),
)

The Hopper drives ordered-key concurrency, the codec decode, the reliability middleware chain, and the lifecycle (graceful drain on ctx cancel).

Optional capabilities, detected by interface

Section titled “Optional capabilities, detected by interface”

Backend-specific behavior is an optional capability interface, discovered once by type assertion inside the engine, never a lowest-common-denominator lie and never an unwrap ladder at your call site. The core Inlet is the honest common path; an adapter satisfies only the capabilities its backend truly supports.

CapabilityWhat it adds
Seekablereplay by time or position
ConsumerGroupsrebalance hooks (OnAssigned/OnRevoked)
SharedDurableshared durable consumer (the JetStream grouping analog)
PartitionOrdered · OrderedDeliveryper-partition or single-threaded ordered reads
Batchedbatched fetch and handling
Transactionaltransactional consume-to-produce
Dedupera dedup seam
LagReporterconsumer-lag reporting

Which adapter satisfies which is spelled out on the adapters page. The next page covers how the Hopper keeps a statechart instance’s events in order while running other keys in parallel.