Skip to content

source/memsource

import "github.com/stablekernel/crucible/source/memsource"

Package memsource is an in-memory, deterministic [source.Inlet] for tests. It is the zero-infra substrate the rest of the suite leans on: scripted messages drive a [source.Hopper] with no broker, an injected clock and ID function make every run reproducible, and every settle is recorded so a test can assert exactly which messages were acked, nak’d, termed, or dropped.

The killer feature — drive a statechart from a stream, ack on a durable transition — is therefore unit-testable: feed an Inlet, run a Hopper, and read the Ledger.

Counts is the tally Ledger.Counts returns.

type Counts struct {
// Acked is plain successful acks.
Acked int
// Dropped is acks classified Drop.
Dropped int
// Nak is redelivery requests.
Nak int
// Term is permanent rejections (dead-letter).
Term int
}

Entry is one recorded settlement: the message ID, the [source.Result] the Hopper applied to it, and the time it settled (read from the inlet’s injected clock). Entries are recorded in settle order, which for a single ordered lane is the message’s delivery order.

type Entry struct {
// ID is the message's sequential ID (from the inlet's ID func).
ID string
// Result is the disposition applied to the message.
Result source.Result
// At is the settle time, stamped from the inlet's injected clock.
At time.Time
}

Harness drives a [source.Hopper] against an in-memory Inlet and exposes assertion helpers over the resulting Ledger. It is the one-call test entry point: queue messages, Run a handler, and assert outcomes — no broker, no goroutine bookkeeping in the test.

Construct with NewHarness; it registers a t.Cleanup that closes the Hopper, so a test never leaks the consume loop.

type Harness struct {
// contains filtered or unexported fields
}
Example

ExampleHarness shows the zero-infra test pattern: queue messages, run a handler, and assert the settle ledger — no broker, no goroutine bookkeeping.

package main
import (
"context"
"errors"
"fmt"
"github.com/stablekernel/crucible/source"
"github.com/stablekernel/crucible/source/memsource"
)
func main() {
// A *testing.T would normally come from the test; nil stands in for the doc.
in := memsource.New(memsource.WithMessages(
memsource.Msg{Key: "ok", Value: []byte("good")},
memsource.Msg{Key: "bad", Value: []byte("poison")},
))
hp := source.New()
sub, _ := in.Subscribe(context.Background(), source.SubscribeConfig{})
_ = sub.Close()
_ = hp.Run(context.Background(), sub, func(_ context.Context, m source.Message) source.Result {
if string(m.Value()) == "poison" {
return source.Term(errors.New("undecodable"))
}
return source.Ack()
})
c := in.Ledger().Counts()
fmt.Printf("acked=%d term=%d\n", c.Acked, c.Term)
}
acked=1 term=1

func NewHarness(tb testing.TB, opts []source.Option, msgs ...Msg) *Harness

NewHarness builds a Harness around a fresh Inlet and a [source.Hopper] configured with opts. Pass msgs to pre-queue them. The Hopper is closed via t.Cleanup.

func (h *Harness) AssertCounts(want Counts)

AssertCounts fails the test unless the recorded settlements match want exactly.

func (h *Harness) AssertSettled(n int)

AssertSettled fails the test unless exactly n messages were settled.

func (h *Harness) Hopper() *source.Hopper

Hopper returns the underlying Hopper, for direct Run/Close control.

func (h *Harness) Inlet() *Inlet

Inlet returns the underlying in-memory inlet, for queueing more messages mid run or reading its ledger directly.

func (h *Harness) Ledger() *Ledger

Ledger returns the settle ledger the run records outcomes on.

func (h *Harness) Run(handler source.Handler)

Run drives the queued messages through handler and blocks until every queued message has been settled, then returns. It closes the inlet’s subscription so the Hopper drains cleanly once the queue empties, and fails the test on an unexpected run error.

Run uses a bounded timeout (defaulting to 5s) so a stuck handler fails the test rather than hanging the suite; override it with [RunFor].

func (h *Harness) RunFor(timeout time.Duration, handler source.Handler)

RunFor is [Run] with an explicit timeout.

Inlet is an in-memory [source.Inlet]. Messages queued with Inlet.Queue (or at construction with WithMessages) are delivered in order by the [source.Subscription] it opens; once the queue empties and the subscription is closed, [source.Subscription.Next] returns [source.ErrDrained]. Every settle is recorded on the shared Ledger.

Inlet is safe for concurrent use; a single Subscribe is the expected pattern (the Hopper drives one subscription), and a second Subscribe shares the same queue and ledger.

type Inlet struct {
// contains filtered or unexported fields
}

func New(opts ...Option) *Inlet

New constructs an Inlet with the given options. With no options it uses time.Now and a monotonic decimal ID counter, and starts with an empty queue.

func (in *Inlet) Close() error

Close releases the Inlet. It is a no-op for the in-memory implementation; the queue and ledger remain readable for post-run assertions.

func (in *Inlet) Ledger() *Ledger

Ledger returns the shared settle ledger, the record of every Settle the subscription applied. Read it after a run to assert outcomes.

func (in *Inlet) Queue(msgs ...Msg)

Queue appends messages to the Inlet’s delivery queue. It is safe to call before or during consumption; messages are delivered in queue order.

func (in *Inlet) Subscribe(_ context.Context, _ source.SubscribeConfig) (source.Subscription, error)

Subscribe opens a [source.Subscription] over the Inlet’s queue. cfg is accepted for interface conformance but not otherwise interpreted (a single in-memory stream has no topics or groups to honor).

Ledger records every [source.Subscription.Settle] the in-memory subscription applied, the assertion surface a test reads after a run. It is safe for concurrent use: the Hopper settles from per-lane worker goroutines.

type Ledger struct {
// contains filtered or unexported fields
}

func (l *Ledger) Counts() Counts

Counts tallies the recorded settlements by outcome: acked (a plain ack), dropped (an ack classified Drop), nak’d, and termed. The four are mutually exclusive and sum to Len.

func (l *Ledger) Entries() []Entry

Entries returns a copy of every recorded settlement in settle order. The returned slice is the caller’s to retain.

func (l *Ledger) IDs() []string

IDs returns the settled message IDs in settle order, the sequence a test asserts to confirm per-key in-order processing. Since IDs are assigned in queue order, an in-order lane settles them in ascending ID order.

func (l *Ledger) Len() int

Len returns the number of settlements recorded.

Msg is a programmable inbound message: the scripted input a test feeds an Inlet. All fields are optional; an empty Msg is a keyless, headerless, empty-payload message on the default subject.

type Msg struct {
// Key is the routing/partition key. The Inlet hashes it into a PartitionKey
// when PartitionKey is empty, so messages sharing a Key share an ordered lane.
Key string
// Value is the raw payload bytes the codec decodes.
Value []byte
// Headers are the message's metadata.
Headers source.Headers
// Subject is the topic/subject; defaults to "memsource" when empty.
Subject string
// PartitionKey overrides the ordering domain; when empty the Inlet derives one
// from Key (so same-Key messages stay ordered) or leaves it empty.
PartitionKey string
}

Option configures an Inlet.

type Option func(*Inlet)

func WithClock(now func() time.Time) Option

WithClock injects the clock the Inlet stamps cursors and records settle times with, for deterministic tests. The default is time.Now. A nil clock is ignored.

func WithIDFunc(next func() string) Option

WithIDFunc injects the function the Inlet assigns sequential message IDs and cursor labels from. It is called once per message; the default is a monotonic counter rendered as a decimal string. A nil func is ignored.

func WithMessages(msgs ...Msg) Option

WithMessages is an Option that pre-queues msgs at construction, equivalent to a subsequent Inlet.Queue.

Generated by gomarkdoc