source/memsource
import "github.com/stablekernel/crucible/source/memsource"Package memsource is an in-memory, deterministic [source.Inlet] for tests. It is the zero-infra substrate the rest of the suite leans on: scripted messages drive a [source.Hopper] with no broker, an injected clock and ID function make every run reproducible, and every settle is recorded so a test can assert exactly which messages were acked, nak’d, termed, or dropped.
The killer feature — drive a statechart from a stream, ack on a durable transition — is therefore unit-testable: feed an Inlet, run a Hopper, and read the Ledger.
- type Counts
- type Entry
- type Harness
- func NewHarness(tb testing.TB, opts []source.Option, msgs …Msg) *Harness
- func (h *Harness) AssertCounts(want Counts)
- func (h *Harness) AssertSettled(n int)
- func (h *Harness) Hopper() *source.Hopper
- func (h *Harness) Inlet() *Inlet
- func (h *Harness) Ledger() *Ledger
- func (h *Harness) Run(handler source.Handler)
- func (h *Harness) RunFor(timeout time.Duration, handler source.Handler)
- type Inlet
- type Ledger
- type Msg
- type Option
type Counts
Section titled “type Counts”Counts is the tally Ledger.Counts returns.
type Counts struct { // Acked is plain successful acks. Acked int // Dropped is acks classified Drop. Dropped int // Nak is redelivery requests. Nak int // Term is permanent rejections (dead-letter). Term int}type Entry
Section titled “type Entry”Entry is one recorded settlement: the message ID, the [source.Result] the Hopper applied to it, and the time it settled (read from the inlet’s injected clock). Entries are recorded in settle order, which for a single ordered lane is the message’s delivery order.
type Entry struct { // ID is the message's sequential ID (from the inlet's ID func). ID string // Result is the disposition applied to the message. Result source.Result // At is the settle time, stamped from the inlet's injected clock. At time.Time}type Harness
Section titled “type Harness”Harness drives a [source.Hopper] against an in-memory Inlet and exposes assertion helpers over the resulting Ledger. It is the one-call test entry point: queue messages, Run a handler, and assert outcomes — no broker, no goroutine bookkeeping in the test.
Construct with NewHarness; it registers a t.Cleanup that closes the Hopper, so a test never leaks the consume loop.
type Harness struct { // contains filtered or unexported fields}Example
ExampleHarness shows the zero-infra test pattern: queue messages, run a handler, and assert the settle ledger — no broker, no goroutine bookkeeping.
package main
import ( "context" "errors" "fmt"
"github.com/stablekernel/crucible/source" "github.com/stablekernel/crucible/source/memsource")
func main() { // A *testing.T would normally come from the test; nil stands in for the doc. in := memsource.New(memsource.WithMessages( memsource.Msg{Key: "ok", Value: []byte("good")}, memsource.Msg{Key: "bad", Value: []byte("poison")}, ))
hp := source.New() sub, _ := in.Subscribe(context.Background(), source.SubscribeConfig{}) _ = sub.Close()
_ = hp.Run(context.Background(), sub, func(_ context.Context, m source.Message) source.Result { if string(m.Value()) == "poison" { return source.Term(errors.New("undecodable")) } return source.Ack() })
c := in.Ledger().Counts() fmt.Printf("acked=%d term=%d\n", c.Acked, c.Term)}Output
Section titled “Output”acked=1 term=1func NewHarness
Section titled “func NewHarness”func NewHarness(tb testing.TB, opts []source.Option, msgs ...Msg) *HarnessNewHarness builds a Harness around a fresh Inlet and a [source.Hopper] configured with opts. Pass msgs to pre-queue them. The Hopper is closed via t.Cleanup.
func (*Harness) AssertCounts
Section titled “func (*Harness) AssertCounts”func (h *Harness) AssertCounts(want Counts)AssertCounts fails the test unless the recorded settlements match want exactly.
func (*Harness) AssertSettled
Section titled “func (*Harness) AssertSettled”func (h *Harness) AssertSettled(n int)AssertSettled fails the test unless exactly n messages were settled.
func (*Harness) Hopper
Section titled “func (*Harness) Hopper”func (h *Harness) Hopper() *source.HopperHopper returns the underlying Hopper, for direct Run/Close control.
func (*Harness) Inlet
Section titled “func (*Harness) Inlet”func (h *Harness) Inlet() *InletInlet returns the underlying in-memory inlet, for queueing more messages mid run or reading its ledger directly.
func (*Harness) Ledger
Section titled “func (*Harness) Ledger”func (h *Harness) Ledger() *LedgerLedger returns the settle ledger the run records outcomes on.
func (*Harness) Run
Section titled “func (*Harness) Run”func (h *Harness) Run(handler source.Handler)Run drives the queued messages through handler and blocks until every queued message has been settled, then returns. It closes the inlet’s subscription so the Hopper drains cleanly once the queue empties, and fails the test on an unexpected run error.
Run uses a bounded timeout (defaulting to 5s) so a stuck handler fails the test rather than hanging the suite; override it with [RunFor].
func (*Harness) RunFor
Section titled “func (*Harness) RunFor”func (h *Harness) RunFor(timeout time.Duration, handler source.Handler)RunFor is [Run] with an explicit timeout.
type Inlet
Section titled “type Inlet”Inlet is an in-memory [source.Inlet]. Messages queued with Inlet.Queue (or at construction with WithMessages) are delivered in order by the [source.Subscription] it opens; once the queue empties and the subscription is closed, [source.Subscription.Next] returns [source.ErrDrained]. Every settle is recorded on the shared Ledger.
Inlet is safe for concurrent use; a single Subscribe is the expected pattern (the Hopper drives one subscription), and a second Subscribe shares the same queue and ledger.
type Inlet struct { // contains filtered or unexported fields}func New
Section titled “func New”func New(opts ...Option) *InletNew constructs an Inlet with the given options. With no options it uses time.Now and a monotonic decimal ID counter, and starts with an empty queue.
func (*Inlet) Close
Section titled “func (*Inlet) Close”func (in *Inlet) Close() errorClose releases the Inlet. It is a no-op for the in-memory implementation; the queue and ledger remain readable for post-run assertions.
func (*Inlet) Ledger
Section titled “func (*Inlet) Ledger”func (in *Inlet) Ledger() *LedgerLedger returns the shared settle ledger, the record of every Settle the subscription applied. Read it after a run to assert outcomes.
func (*Inlet) Queue
Section titled “func (*Inlet) Queue”func (in *Inlet) Queue(msgs ...Msg)Queue appends messages to the Inlet’s delivery queue. It is safe to call before or during consumption; messages are delivered in queue order.
func (*Inlet) Subscribe
Section titled “func (*Inlet) Subscribe”func (in *Inlet) Subscribe(_ context.Context, _ source.SubscribeConfig) (source.Subscription, error)Subscribe opens a [source.Subscription] over the Inlet’s queue. cfg is accepted for interface conformance but not otherwise interpreted (a single in-memory stream has no topics or groups to honor).
type Ledger
Section titled “type Ledger”Ledger records every [source.Subscription.Settle] the in-memory subscription applied, the assertion surface a test reads after a run. It is safe for concurrent use: the Hopper settles from per-lane worker goroutines.
type Ledger struct { // contains filtered or unexported fields}func (*Ledger) Counts
Section titled “func (*Ledger) Counts”func (l *Ledger) Counts() CountsCounts tallies the recorded settlements by outcome: acked (a plain ack), dropped (an ack classified Drop), nak’d, and termed. The four are mutually exclusive and sum to Len.
func (*Ledger) Entries
Section titled “func (*Ledger) Entries”func (l *Ledger) Entries() []EntryEntries returns a copy of every recorded settlement in settle order. The returned slice is the caller’s to retain.
func (*Ledger) IDs
Section titled “func (*Ledger) IDs”func (l *Ledger) IDs() []stringIDs returns the settled message IDs in settle order, the sequence a test asserts to confirm per-key in-order processing. Since IDs are assigned in queue order, an in-order lane settles them in ascending ID order.
func (*Ledger) Len
Section titled “func (*Ledger) Len”func (l *Ledger) Len() intLen returns the number of settlements recorded.
type Msg
Section titled “type Msg”Msg is a programmable inbound message: the scripted input a test feeds an Inlet. All fields are optional; an empty Msg is a keyless, headerless, empty-payload message on the default subject.
type Msg struct { // Key is the routing/partition key. The Inlet hashes it into a PartitionKey // when PartitionKey is empty, so messages sharing a Key share an ordered lane. Key string // Value is the raw payload bytes the codec decodes. Value []byte // Headers are the message's metadata. Headers source.Headers // Subject is the topic/subject; defaults to "memsource" when empty. Subject string // PartitionKey overrides the ordering domain; when empty the Inlet derives one // from Key (so same-Key messages stay ordered) or leaves it empty. PartitionKey string}type Option
Section titled “type Option”Option configures an Inlet.
type Option func(*Inlet)func WithClock
Section titled “func WithClock”func WithClock(now func() time.Time) OptionWithClock injects the clock the Inlet stamps cursors and records settle times with, for deterministic tests. The default is time.Now. A nil clock is ignored.
func WithIDFunc
Section titled “func WithIDFunc”func WithIDFunc(next func() string) OptionWithIDFunc injects the function the Inlet assigns sequential message IDs and cursor labels from. It is called once per message; the default is a monotonic counter rendered as a decimal string. A nil func is ignored.
func WithMessages
Section titled “func WithMessages”func WithMessages(msgs ...Msg) OptionWithMessages is an Option that pre-queues msgs at construction, equivalent to a subsequent Inlet.Queue.
Generated by gomarkdoc