Skip to content

source/jetstream

import "github.com/stablekernel/crucible/source/jetstream"

Package jetstream is a source ingress adapter that consumes a NATS JetStream stream through a durable pull consumer. It wraps the jetstream subpackage of github.com/nats-io/nats.go (NOT the legacy JetStreamContext) behind narrow seam interfaces so the real client satisfies them structurally while unit tests drive hand-rolled fakes with no live server.

Construct an Inlet with New and functional options, then drive its github.com/stablekernel/crucible/source.Subscription with a source.Hopper (or read it directly via Next/Settle).

in, err := jetstream.New(
jetstream.WithURL(nats.DefaultURL),
jetstream.WithStream("ORDERS"),
jetstream.WithDurable("orders-worker"),
jetstream.WithAckWait(30*time.Second),
jetstream.WithMaxAckPending(256),
)
sub, err := in.Subscribe(ctx, source.SubscribeConfig{Topics: []string{"orders.>"}})
m, err := sub.Next(ctx) // or drive with a source.Hopper
err = sub.Settle(ctx, m, source.Ack())

JetStream divergences from the source contract

Section titled “JetStream divergences from the source contract”

JetStream has no partitions and no assignment lifecycle, so this adapter does NOT implement github.com/stablekernel/crucible/source.ConsumerGroups. A durable consumer is the competing-consumer analog instead, surfaced through github.com/stablekernel/crucible/source.SharedDurable: multiple processes sharing a durable name load-balance the stream without partition assignment.

Replay is consumer-recreate, not an in-place cursor move: a github.com/stablekernel/crucible/source.Seekable seek tears down the current consumer and rebuilds it with a DeliverByStartTime/DeliverByStartSequence policy. The seek takes effect on the next Next; in-flight messages from the prior consumer are abandoned (not settled).

JetStream offers no consume-side transaction, so this adapter does NOT implement github.com/stablekernel/crucible/source.Transactional; the capability is absent rather than faked.

Experimental (pre-v1); the API may change until the suite locks v1.0.0.

Example

Example shows the consume-and-settle loop: open a durable pull subscription, take the next message off the stream, and ack it after handling. In a real program the seam is a live NATS connection supplied via WithURL or WithConn.

Apache-2.0
package main
import (
"context"
"fmt"
gonats "github.com/nats-io/nats.go"
njs "github.com/nats-io/nats.go/jetstream"
"github.com/stablekernel/crucible/source"
"github.com/stablekernel/crucible/source/jetstream"
)
// exampleJS is a stand-in jetstream.JetStream seam that yields one message, so
// the example runs without a live NATS server. It embeds njs.JetStream (nil) and
// overrides only the two methods the adapter uses. A real program passes
// jetstream.WithURL or jetstream.WithConn instead.
type exampleJS struct {
njs.JetStream
msg njs.Msg
}
func (e exampleJS) CreateOrUpdateConsumer(context.Context, string, njs.ConsumerConfig) (njs.Consumer, error) {
return exampleConsumer{msg: e.msg}, nil
}
func (e exampleJS) OrderedConsumer(context.Context, string, njs.OrderedConsumerConfig) (njs.Consumer, error) {
return exampleConsumer{msg: e.msg}, nil
}
type exampleConsumer struct {
njs.Consumer
msg njs.Msg
}
func (c exampleConsumer) Messages(...njs.PullMessagesOpt) (njs.MessagesContext, error) {
return &exampleIter{msg: c.msg}, nil
}
type exampleIter struct {
msg njs.Msg
done bool
}
func (it *exampleIter) Next(...njs.NextOpt) (njs.Msg, error) {
if it.done {
return nil, njs.ErrMsgIteratorClosed
}
it.done = true
return it.msg, nil
}
func (it *exampleIter) Stop() {}
func (it *exampleIter) Drain() {}
// exampleMsg is a minimal njs.Msg whose Ack records that it was acknowledged.
type exampleMsg struct {
njs.Msg
acked *bool
}
func (m exampleMsg) Data() []byte { return []byte("A-1") }
func (m exampleMsg) Subject() string { return "orders.placed" }
func (m exampleMsg) Headers() gonats.Header {
return gonats.Header{jetstream.KeyHeader: []string{"A-1"}}
}
func (m exampleMsg) Metadata() (*njs.MsgMetadata, error) {
return &njs.MsgMetadata{Sequence: njs.SequencePair{Stream: 1}}, nil
}
func (m exampleMsg) DoubleAck(context.Context) error { *m.acked = true; return nil }
// Example shows the consume-and-settle loop: open a durable pull subscription,
// take the next message off the stream, and ack it after handling. In a real
// program the seam is a live NATS connection supplied via WithURL or WithConn.
func main() {
var acked bool
js := exampleJS{msg: exampleMsg{acked: &acked}}
in, err := jetstream.New(
jetstream.WithStream("ORDERS"),
jetstream.WithDurable("orders-worker"),
jetstream.WithJetStream(js),
)
if err != nil {
panic(err)
}
defer func() { _ = in.Close() }()
sub, err := in.Subscribe(context.Background(), source.SubscribeConfig{Topics: []string{"orders.>"}})
if err != nil {
panic(err)
}
defer func() { _ = sub.Close() }()
m, err := sub.Next(context.Background())
if err != nil {
panic(err)
}
if err := sub.Settle(context.Background(), m, source.Ack()); err != nil {
panic(err)
}
fmt.Println(m.Subject())
fmt.Println(string(m.Key()))
fmt.Println(acked)
}
orders.placed
A-1
true

KeyHeader is the message header an inbound message may set to carry an explicit partitioning/routing key. When present, the adapter uses its value as [source.Message.Key]; otherwise the key falls back to the message subject so the Hopper still shards deterministically.

const KeyHeader = "Crucible-Key"

Inlet is a NATS JetStream ingress adapter. It opens durable pull-consumer subscriptions onto a configured stream. Build one with New; it is safe for concurrent use.

type Inlet struct {
// contains filtered or unexported fields
}

func New(opts ...Option) (*Inlet, error)

New builds an Inlet from opts. Exactly one of WithURL or WithConn must be supplied, and WithStream is required. New dials the connection (when WithURL is used) and constructs the JetStream client eagerly so misconfiguration surfaces here rather than at first Subscribe.

func (in *Inlet) As(target any) bool

As assigns the inlet’s underlying NATS connection to target if target is a **gonats.Conn, returning whether it did. It is the escape hatch to reach the driver for operations outside this adapter’s surface.

func (in *Inlet) Close() error

Close releases the inlet’s resources. It closes the NATS connection only when the inlet dialed it (via WithURL); a caller-supplied connection is left open. Close live Subscriptions first.

func (in *Inlet) Subscribe(ctx context.Context, cfg source.SubscribeConfig) (source.Subscription, error)

Subscribe opens a durable pull-consumer Subscription. cfg.Topics, when set, becomes the consumer’s filter subjects (overriding WithFilterSubjects); cfg.Group, when set, overrides WithDurable as the durable consumer name. The consumer is created lazily on the first [subscription.Next] so Subscribe stays cheap and a seek before the first read picks the start policy.

Option configures an Inlet at construction. Options compose; later options override earlier ones for the same field.

type Option func(*config)

func WithAckWait(d time.Duration) Option

WithAckWait sets how long the server waits for an ack before redelivering a message. Zero leaves the server default.

func WithConn(conn *gonats.Conn) Option

WithConn uses an existing NATS connection. The caller retains ownership; the Inlet does not close a connection it did not dial.

func WithDurable(name string) Option

WithDurable sets the durable consumer name. A non-empty durable makes the consumer shared (competing-consumer load balancing across processes) and is what satisfies [source.SharedDurable]. Empty leaves the consumer ephemeral.

func WithFilterSubjects(subjects ...string) Option

WithFilterSubjects restricts delivery to messages matching the given subject filters. Empty consumes the whole stream. When [source.SubscribeConfig.Topics] is non-empty it takes precedence over this option.

func WithJetStream(js jetstream.JetStream) Option

WithJetStream uses an already-constructed JetStream client (anything satisfying github.com/nats-io/nats.go/jetstream.JetStream) instead of dialing one. Use it when the caller manages the connection and JetStream context itself, or to supply a fake in tests. Mutually exclusive with WithURL; the Inlet never closes a client it did not build.

func WithMaxAckPending(n int) Option

WithMaxAckPending bounds outstanding unacknowledged messages: the primary backpressure knob. The server stops delivering once this many messages are in flight. Zero leaves the server default.

func WithMaxDeliver(n int) Option

WithMaxDeliver caps redelivery attempts per message. Zero leaves the server default (unlimited). When exceeded the message stops being redelivered.

func WithPullMaxMessages(n int) Option

WithPullMaxMessages sets the pull batch size the message iterator buffers. Zero leaves the client default. Combined with WithMaxAckPending it shapes fetch-side backpressure.

func WithStream(stream string) Option

WithStream sets the JetStream stream to consume from. Required.

func WithURL(url string) Option

WithURL dials a NATS connection at url. Mutually informative with WithConn: supply exactly one. When WithURL is used the Inlet owns the connection and closes it on Inlet.Close.

Generated by gomarkdoc