source/jetstream
import "github.com/stablekernel/crucible/source/jetstream"Package jetstream is a source ingress adapter that consumes a NATS JetStream stream through a durable pull consumer. It wraps the jetstream subpackage of github.com/nats-io/nats.go (NOT the legacy JetStreamContext) behind narrow seam interfaces so the real client satisfies them structurally while unit tests drive hand-rolled fakes with no live server.
Construct an Inlet with New and functional options, then drive its github.com/stablekernel/crucible/source.Subscription with a source.Hopper (or read it directly via Next/Settle).
in, err := jetstream.New( jetstream.WithURL(nats.DefaultURL), jetstream.WithStream("ORDERS"), jetstream.WithDurable("orders-worker"), jetstream.WithAckWait(30*time.Second), jetstream.WithMaxAckPending(256),)sub, err := in.Subscribe(ctx, source.SubscribeConfig{Topics: []string{"orders.>"}})m, err := sub.Next(ctx) // or drive with a source.Hoppererr = sub.Settle(ctx, m, source.Ack())JetStream divergences from the source contract
Section titled “JetStream divergences from the source contract”JetStream has no partitions and no assignment lifecycle, so this adapter does NOT implement github.com/stablekernel/crucible/source.ConsumerGroups. A durable consumer is the competing-consumer analog instead, surfaced through github.com/stablekernel/crucible/source.SharedDurable: multiple processes sharing a durable name load-balance the stream without partition assignment.
Replay is consumer-recreate, not an in-place cursor move: a github.com/stablekernel/crucible/source.Seekable seek tears down the current consumer and rebuilds it with a DeliverByStartTime/DeliverByStartSequence policy. The seek takes effect on the next Next; in-flight messages from the prior consumer are abandoned (not settled).
JetStream offers no consume-side transaction, so this adapter does NOT implement github.com/stablekernel/crucible/source.Transactional; the capability is absent rather than faked.
Stability
Section titled “Stability”Experimental (pre-v1); the API may change until the suite locks v1.0.0.
Example
Example shows the consume-and-settle loop: open a durable pull subscription, take the next message off the stream, and ack it after handling. In a real program the seam is a live NATS connection supplied via WithURL or WithConn.
package main
import ( "context" "fmt"
gonats "github.com/nats-io/nats.go" njs "github.com/nats-io/nats.go/jetstream"
"github.com/stablekernel/crucible/source" "github.com/stablekernel/crucible/source/jetstream")
// exampleJS is a stand-in jetstream.JetStream seam that yields one message, so// the example runs without a live NATS server. It embeds njs.JetStream (nil) and// overrides only the two methods the adapter uses. A real program passes// jetstream.WithURL or jetstream.WithConn instead.type exampleJS struct { njs.JetStream msg njs.Msg}
func (e exampleJS) CreateOrUpdateConsumer(context.Context, string, njs.ConsumerConfig) (njs.Consumer, error) { return exampleConsumer{msg: e.msg}, nil}
func (e exampleJS) OrderedConsumer(context.Context, string, njs.OrderedConsumerConfig) (njs.Consumer, error) { return exampleConsumer{msg: e.msg}, nil}
type exampleConsumer struct { njs.Consumer msg njs.Msg}
func (c exampleConsumer) Messages(...njs.PullMessagesOpt) (njs.MessagesContext, error) { return &exampleIter{msg: c.msg}, nil}
type exampleIter struct { msg njs.Msg done bool}
func (it *exampleIter) Next(...njs.NextOpt) (njs.Msg, error) { if it.done { return nil, njs.ErrMsgIteratorClosed } it.done = true return it.msg, nil}func (it *exampleIter) Stop() {}func (it *exampleIter) Drain() {}
// exampleMsg is a minimal njs.Msg whose Ack records that it was acknowledged.type exampleMsg struct { njs.Msg acked *bool}
func (m exampleMsg) Data() []byte { return []byte("A-1") }func (m exampleMsg) Subject() string { return "orders.placed" }func (m exampleMsg) Headers() gonats.Header { return gonats.Header{jetstream.KeyHeader: []string{"A-1"}}}
func (m exampleMsg) Metadata() (*njs.MsgMetadata, error) { return &njs.MsgMetadata{Sequence: njs.SequencePair{Stream: 1}}, nil}func (m exampleMsg) DoubleAck(context.Context) error { *m.acked = true; return nil }
// Example shows the consume-and-settle loop: open a durable pull subscription,// take the next message off the stream, and ack it after handling. In a real// program the seam is a live NATS connection supplied via WithURL or WithConn.func main() { var acked bool js := exampleJS{msg: exampleMsg{acked: &acked}}
in, err := jetstream.New( jetstream.WithStream("ORDERS"), jetstream.WithDurable("orders-worker"), jetstream.WithJetStream(js), ) if err != nil { panic(err) } defer func() { _ = in.Close() }()
sub, err := in.Subscribe(context.Background(), source.SubscribeConfig{Topics: []string{"orders.>"}}) if err != nil { panic(err) } defer func() { _ = sub.Close() }()
m, err := sub.Next(context.Background()) if err != nil { panic(err) } if err := sub.Settle(context.Background(), m, source.Ack()); err != nil { panic(err) }
fmt.Println(m.Subject()) fmt.Println(string(m.Key())) fmt.Println(acked)}Output
Section titled “Output”orders.placedA-1true- Constants
- type Inlet
- type Option
- func WithAckWait(d time.Duration) Option
- func WithConn(conn *gonats.Conn) Option
- func WithDurable(name string) Option
- func WithFilterSubjects(subjects …string) Option
- func WithJetStream(js jetstream.JetStream) Option
- func WithMaxAckPending(n int) Option
- func WithMaxDeliver(n int) Option
- func WithPullMaxMessages(n int) Option
- func WithStream(stream string) Option
- func WithURL(url string) Option
Constants
Section titled “Constants”KeyHeader is the message header an inbound message may set to carry an explicit partitioning/routing key. When present, the adapter uses its value as [source.Message.Key]; otherwise the key falls back to the message subject so the Hopper still shards deterministically.
const KeyHeader = "Crucible-Key"type Inlet
Section titled “type Inlet”Inlet is a NATS JetStream ingress adapter. It opens durable pull-consumer subscriptions onto a configured stream. Build one with New; it is safe for concurrent use.
type Inlet struct { // contains filtered or unexported fields}func New
Section titled “func New”func New(opts ...Option) (*Inlet, error)New builds an Inlet from opts. Exactly one of WithURL or WithConn must be supplied, and WithStream is required. New dials the connection (when WithURL is used) and constructs the JetStream client eagerly so misconfiguration surfaces here rather than at first Subscribe.
func (*Inlet) As
Section titled “func (*Inlet) As”func (in *Inlet) As(target any) boolAs assigns the inlet’s underlying NATS connection to target if target is a **gonats.Conn, returning whether it did. It is the escape hatch to reach the driver for operations outside this adapter’s surface.
func (*Inlet) Close
Section titled “func (*Inlet) Close”func (in *Inlet) Close() errorClose releases the inlet’s resources. It closes the NATS connection only when the inlet dialed it (via WithURL); a caller-supplied connection is left open. Close live Subscriptions first.
func (*Inlet) Subscribe
Section titled “func (*Inlet) Subscribe”func (in *Inlet) Subscribe(ctx context.Context, cfg source.SubscribeConfig) (source.Subscription, error)Subscribe opens a durable pull-consumer Subscription. cfg.Topics, when set, becomes the consumer’s filter subjects (overriding WithFilterSubjects); cfg.Group, when set, overrides WithDurable as the durable consumer name. The consumer is created lazily on the first [subscription.Next] so Subscribe stays cheap and a seek before the first read picks the start policy.
type Option
Section titled “type Option”Option configures an Inlet at construction. Options compose; later options override earlier ones for the same field.
type Option func(*config)func WithAckWait
Section titled “func WithAckWait”func WithAckWait(d time.Duration) OptionWithAckWait sets how long the server waits for an ack before redelivering a message. Zero leaves the server default.
func WithConn
Section titled “func WithConn”func WithConn(conn *gonats.Conn) OptionWithConn uses an existing NATS connection. The caller retains ownership; the Inlet does not close a connection it did not dial.
func WithDurable
Section titled “func WithDurable”func WithDurable(name string) OptionWithDurable sets the durable consumer name. A non-empty durable makes the consumer shared (competing-consumer load balancing across processes) and is what satisfies [source.SharedDurable]. Empty leaves the consumer ephemeral.
func WithFilterSubjects
Section titled “func WithFilterSubjects”func WithFilterSubjects(subjects ...string) OptionWithFilterSubjects restricts delivery to messages matching the given subject filters. Empty consumes the whole stream. When [source.SubscribeConfig.Topics] is non-empty it takes precedence over this option.
func WithJetStream
Section titled “func WithJetStream”func WithJetStream(js jetstream.JetStream) OptionWithJetStream uses an already-constructed JetStream client (anything satisfying github.com/nats-io/nats.go/jetstream.JetStream) instead of dialing one. Use it when the caller manages the connection and JetStream context itself, or to supply a fake in tests. Mutually exclusive with WithURL; the Inlet never closes a client it did not build.
func WithMaxAckPending
Section titled “func WithMaxAckPending”func WithMaxAckPending(n int) OptionWithMaxAckPending bounds outstanding unacknowledged messages: the primary backpressure knob. The server stops delivering once this many messages are in flight. Zero leaves the server default.
func WithMaxDeliver
Section titled “func WithMaxDeliver”func WithMaxDeliver(n int) OptionWithMaxDeliver caps redelivery attempts per message. Zero leaves the server default (unlimited). When exceeded the message stops being redelivered.
func WithPullMaxMessages
Section titled “func WithPullMaxMessages”func WithPullMaxMessages(n int) OptionWithPullMaxMessages sets the pull batch size the message iterator buffers. Zero leaves the client default. Combined with WithMaxAckPending it shapes fetch-side backpressure.
func WithStream
Section titled “func WithStream”func WithStream(stream string) OptionWithStream sets the JetStream stream to consume from. Required.
func WithURL
Section titled “func WithURL”func WithURL(url string) OptionWithURL dials a NATS connection at url. Mutually informative with WithConn: supply exactly one. When WithURL is used the Inlet owns the connection and closes it on Inlet.Close.
Generated by gomarkdoc