Skip to content

source/statemachine

import "github.com/stablekernel/crucible/source/statemachine"

Package statemachine binds an inbound [source] message to a crucible/state statechart so that consuming a message *is* firing a transition, and the ack is tied to a durable transition. It is the ingress mirror of crucible/sink’s bridge: a separate module depending on both crucible/source and crucible/state, so neither core imports the other.

The binding is one declared step, not hand-wired plumbing:

decode → route to (instanceKey, event) → load instance → Fire(event) →
hand emitted effects to a sink → persist new state → ack

The ack comes only after a successful durable Store.Save (ack-after-durable-commit), so at-least-once delivery never advances the stream past an unpersisted transition.

What only a state-machine-native ingress binding can own:

  • Exactly-once into the machine. The persisted instance carries a monotonic version, and Drive records the event id of the last applied message. A redelivered (key, eventID) that was already applied returns [source.Skip] — acked, never re-fired — so redelivery is provably idempotent with no external dedup store. The machine’s own state version is the dedup key.
  • State-aware rejection. A [state.Instance.Fire] that fails because the event is illegal for the current state (no declared transition, or a failing guard/[state.Machine.Assay]) returns [source.Reject] (Term, classified InvalidForState) carrying a [*source.GuardRejection] — distinct from a transient Store or infrastructure error, which returns [source.Nak] (Retryable). “Wrong time” and “try again later” are different first-class outcomes.
  • consume → transition → emit. A transition’s emitted effects ([state.FireResult.Effects]) are handed to an injected Sink in the same step, before the ack, so the statechart is the processor and emitted effects are transition outputs.
  • Analyzable consumption. Conformance validates that a router’s event union is exhaustive against the machine’s event alphabet and reports inbound events that no state can ever handle, at build or load time.

Two binding modes share the same outcomes:

  • Durable: Drive loads and saves each instance through a Store, persisting the transition before acking. This is the exactly-once path.
  • Stateless: DriveFunc fires each message against a caller-supplied function with no persistence, for sources that drive a transient or externally-owned machine.

The bridge depends only on the small Store interface, never on a concrete durable backend; the crucible/durable module (or any store) can provide an adapter. NewMemStore is an in-memory Store for tests and single-process use.

Experimental (pre-v1); the API may change until the suite locks v1.0.0.

DefaultEventIDHeader is the header DefaultEventID reads a message’s idempotency id from when present.

const DefaultEventIDHeader = "message-id"

ErrConflict reports that a Store.Save was rejected because the persisted version no longer matched the expected version the caller loaded: another writer advanced the same key concurrently. It is a transient, retry-by-reloading condition — Drive surfaces it as [source.Nak] so the message is redelivered and re-applied against the now-current state. Match it with errors.Is.

var ErrConflict = errors.New("statemachine: store version conflict")

func DefaultEventID(m source.Message) string

DefaultEventID is the EventID used when none is configured: the DefaultEventIDHeader header value if present, else the message’s cursor string. The cursor is a stream-local coordinate, so the fallback is unique within a stream but not across re-published messages — supply WithEventID with a domain id (an order id, a CloudEvents id) for cross-stream dedup.

func Drive[K comparable, E comparable, C any](machine *state.Machine[K, E, C], store Store[K, K, E, C], router Router[K, E], opts ...Option) source.Handler

Drive binds a durable state-machine instance to a [source.Handler]: each message is routed to an instance key and event, the instance is loaded through the Store, the event is fired, the emitted effects are handed to the configured Sink, the new state is persisted, and only then is the message acked (ack-after-durable-commit). The event type E must be the machine’s event type ([state.Machine] is generic over the same E).

Outcomes:

  • Route/decode failure → [source.Term] (poison): unroutable, never retried.
  • Redelivery already applied (the message id equals the persisted LastEventID) → [source.Skip]: acked, never re-fired (exactly-once).
  • Fire rejected as illegal for the current state (no transition, or a failing guard) → [source.Reject] (Term, InvalidForState) carrying a [*source.GuardRejection].
  • Sink emit, persist, or load failure → [source.Nak] (Retryable): a transient infrastructure error, redelivered and re-applied. A persist that lost an optimistic-concurrency race (ErrConflict) is also a nak.
  • Success → [source.Ack] after the transition is durably persisted.

Drive is safe for concurrent use; the [source.Hopper] invokes it from per-key ordered lanes, so one key’s load→fire→save never interleaves with itself.

Example

ExampleDrive binds a turnstile statechart to a source.Handler: consuming a “coin” message fires the unlock transition, hands the emitted effect to a sink, persists the new state, and acks only after that durable commit. A redelivery of the same message is a no-op ack — exactly-once into the machine.

machine := buildTurnstile()
store := statemachine.NewMemStore[turnstileState, turnstileState, turnstileEvent, *turnstile]()
// Seed a funded turnstile in the locked state at version 1.
seeded := machine.Cast(&turnstile{Funded: true}, state.WithInitialState[turnstileState](locked))
_ = store.Save(context.Background(), locked,
statemachine.Record[turnstileState, turnstileEvent, *turnstile]{Snapshot: seeded.Snapshot(), Version: 1}, 0)
// Route every message to the one instance, firing coin.
router := func(source.Message) (turnstileState, turnstileEvent, error) {
return locked, coin, nil
}
sink := statemachine.SinkFunc(func(_ context.Context, eff any) error {
fmt.Printf("emit: %v\n", eff)
return nil
})
h := statemachine.Drive[turnstileState, turnstileEvent, *turnstile](
machine, store, router, statemachine.WithSink(sink),
)
first := h(context.Background(), msg("coin-1", "cursor-1"))
fmt.Println("first:", first.Action)
// Redeliver the same message id: skipped, acked, not re-applied.
again := h(context.Background(), msg("coin-1", "cursor-1"))
fmt.Println("redelivery:", again.Action, again.Class)
rec, _, _ := store.Load(context.Background(), locked)
fmt.Println("state:", rec.Snapshot.Current, "version:", rec.Version)
// Output:
// emit: {coin}
// first: ack
// redelivery: ack drop
// state: unlocked version: 2
emit: {coin}
first: ack
redelivery: ack drop
state: unlocked version: 2

func DriveFunc[K comparable, E comparable](fire FireFunc[K, E], router Router[K, E], opts ...Option) source.Handler

DriveFunc binds a stateless state-machine to a [source.Handler]: each message is routed to a key and event, then fired through a caller-supplied FireFunc with no persistence. It is the mode for a transient or externally-owned machine where there is no durable Store to commit against.

The emit hand-off, state-aware rejection, and the ack outcome match Drive, minus the load/save and minus version idempotency (a stateless binding has no persisted version to dedup against, so redelivery re-fires; supply a [Deduper] or use Drive for exactly-once):

  • Route/decode failure → [source.Term] (poison).
  • Fire rejected as illegal for the current state → [source.Reject] (InvalidForState) carrying a [*source.GuardRejection].
  • FireFunc resolution error, or sink emit failure → [source.Nak] (Retryable).
  • Success → [source.Ack].

func EventAlphabet[K comparable, E comparable, C any](machine *state.Machine[K, E, C]) []E

EventAlphabet returns every event that appears on a declared transition of machine — across all states, nested substates, and parallel regions — sorted by its string form for stable output. It is the introspection CheckEvents builds on, exported so a caller can enumerate “what events can this machine ever act on” directly.

It derives the alphabet from the machine’s serialized IR, the public definition surface, so it never reaches into kernel internals.

Conformance is the result of checking a router’s (or codec’s) accepted event union against a machine’s event alphabet: the analyzable-consumption answer to “what does consuming into this machine actually accept, and what can it never trigger?”. It is a value a caller asserts on in a build-time test or logs at load time.

type Conformance[E comparable] struct {
// Alphabet is the machine's event alphabet: every event that appears on a
// declared transition, across all states, substates, and parallel regions,
// sorted for stable reporting.
Alphabet []E
// Accepted is the event union the router/codec declares it can produce, as
// supplied to [CheckEvents].
Accepted []E
// Missing lists alphabet events the accepted union does NOT cover: events the
// machine can act on but this consumer will never deliver. A non-empty Missing
// is usually a gap — a transition no inbound message can ever drive.
Missing []E
// Unreachable lists accepted events that are NOT in the machine's alphabet: events
// the consumer can deliver but no state can ever handle, so they would always be
// rejected as invalid-for-state. A non-empty Unreachable is usually dead routing.
Unreachable []E
}

func CheckEvents[K comparable, E comparable, C any](machine *state.Machine[K, E, C], accepted []E) Conformance[E]

CheckEvents validates that accepted — the event union a router or codec can produce — is exhaustive against machine’s event alphabet, and reports both the alphabet events the union misses and the accepted events the machine can never handle (which would always be rejected as invalid-for-state). It reads the alphabet from the machine’s serialized definition through the state module’s IR (no private state access), so it works on any built or loaded [state.Machine].

A machine whose definition cannot be serialized (an impossible state for a Quenched machine) yields an empty alphabet and reports every accepted event as unreachable, surfacing the problem rather than masking it.

Example

ExampleCheckEvents validates that a consumer’s accepted event union is exhaustive against the machine’s event alphabet, reporting an event the machine can never handle (which would always be rejected as invalid-for-state).

machine := buildTurnstile()
c := statemachine.CheckEvents(machine, []turnstileEvent{coin, push, maintenance})
fmt.Println("exhaustive:", c.Exhaustive())
fmt.Println("unreachable:", c.Unreachable)
// Output:
// exhaustive: false
// unreachable: [maintenance]
exhaustive: false
unreachable: [maintenance]

func (c Conformance[E]) Err() error

Err returns a non-nil error describing the gaps when the consumption is not Conformance.Exhaustive, so a conformance test can fail with one check:

if err := statemachine.CheckEvents(m, accepted).Err(); err != nil {
t.Fatal(err)
}

It returns nil when exhaustive.

func (c Conformance[E]) Exhaustive() bool

Exhaustive reports whether the accepted union exactly covers the machine’s alphabet with nothing unreachable: every event the machine handles is deliverable, and every deliverable event is handled.

EventID extracts the idempotency id of an inbound message: the stable per-message identifier the exactly-once dedup keys on. A redelivery of the same logical message must yield the same id. The default reads the “message-id” header (DefaultEventIDHeader) and falls back to the message [source.Cursor] string; override it with WithEventID to read a different header or derive an id from the decoded value.

type EventID func(m source.Message) string

FireFunc fires a routed event against a caller-owned instance and returns the transition result. It is the stateless mode’s escape from the Store: the host owns instance lifecycle (a transient instance, an externally-persisted one, or one resolved from the message itself) and only the transition outcome flows back to the binding.

A nil error and a result whose Err is nil is a successful transition; a result carrying a [state.ErrInvalidTransition] or [state.ErrGuardFailed] is the state-aware rejection. Returning a non-nil error (not the FireResult.Err) is a transient failure resolving the instance — a nak.

type FireFunc[K comparable, E comparable] func(ctx context.Context, key K, event E) (state.FireResult[K], error)

MemStore is an in-memory Store for tests and single-process use. It enforces the same optimistic-concurrency contract as a durable backend, returning ErrConflict when a Save’s expected version does not match the stored version, so the exactly-once and conflict paths are exercised without infrastructure. It is safe for concurrent use.

type MemStore[K comparable, S comparable, E comparable, C any] struct {
// contains filtered or unexported fields
}

func NewMemStore[K comparable, S comparable, E comparable, C any]() *MemStore[K, S, E, C]

NewMemStore returns an empty in-memory Store.

func (s *MemStore[K, S, E, C]) Load(_ context.Context, key K) (Record[S, E, C], bool, error)

Load returns the stored record for key and whether one exists.

func (s *MemStore[K, S, E, C]) Save(_ context.Context, key K, rec Record[S, E, C], expectedVersion int64) error

Save persists rec for key, rejecting a stale write with ErrConflict when the stored version no longer matches expectedVersion.

Option configures a Drive or DriveFunc binding. Options are additive: a new capability arrives as a new option, never a changed signature.

type Option func(*config)

func WithEventID(fn EventID) Option

WithEventID sets the EventID extractor used for exactly-once dedup. The default reads the DefaultEventIDHeader header and falls back to the cursor; supply a domain id for dedup across re-published streams. A nil extractor is ignored.

func WithSink(s Sink) Option

WithSink sets the Sink a transition’s emitted effects are handed to, in the same step, before the ack. The default discards effects. A nil sink is ignored.

func WithSpanName(name string) Option

WithSpanName overrides the per-message span name (default “statemachine.drive”).

func WithTracer(t telemetry.Tracer) Option

WithTracer sets the tracer the binding starts a per-message span on (decode, route, fire, emit, persist). The default is [telemetry.NopTracer]. Wire the same tracer the Sink uses so emit spans nest under the drive span. A nil tracer is ignored.

Record is the durable form of a state-machine instance the Store persists: a [state.Snapshot] (the lossless instance state), the monotonic Version that advances by one on every applied transition, and LastEventID, the id of the most recently applied message. Version and LastEventID together are the exactly-once dedup key — a redelivered message whose id equals LastEventID was already folded into Version, so it is skipped rather than re-fired.

A Record round-trips through JSON whenever its [state.Snapshot] does (a JSON-marshalable context C, or a context codec on the store side), so a Store may persist it verbatim.

type Record[S comparable, E comparable, C any] struct {
// Snapshot is the instance's persisted state, restorable with
// [state.Machine.Restore].
Snapshot state.Snapshot[S, E, C] `json:"snapshot"`
// Version is the monotonic sequence advanced once per applied transition,
// starting at zero for a never-fired instance. It is the optimistic-concurrency
// token a [Store.Save] checks against the expected version.
Version int64 `json:"version"`
// LastEventID is the id of the most recently applied inbound message (empty for
// a never-fired instance). A redelivery carrying this id is a no-op ack.
LastEventID string `json:"lastEventId,omitempty"`
}

Router resolves an inbound message to the instance key it targets and the event to fire. A decode/route failure returns a non-nil error, which the binding treats as poison ([source.Term]): a message that cannot be routed cannot be retried into legibility. The decoded event is the typed event the machine fires on.

A Router typically delegates to a [source.Registry] (DecodeTyped) to recover the domain value, then projects it to a key and event.

type Router[K comparable, E any] func(m source.Message) (key K, event E, err error)

Sink receives the effects a transition emitted, in emission order, in the same step that fired the transition and before the message is acked. It is the consume→transition→emit hand-off, declared as a tiny interface so this module never hard-depends on crucible/sink: a crucible/sink Manifold, a publisher, or any effect handler can be adapted to it. The default is a no-op ([discardSink]); wire one with WithSink.

Emit is called once per emitted effect. A non-nil error fails the step before the ack, so the message is redelivered ([source.Nak]); an emit that must not block the transition should swallow its own errors.

type Sink interface {
// Emit hands one transition effect to the sink. The effect's concrete type is a
// crucible/state effect value (for example state.SendTo); a handler type-switches
// on it. Returning an error nak's the message.
Emit(ctx context.Context, effect any) error
}

SinkFunc adapts a plain function to a Sink.

type SinkFunc func(ctx context.Context, effect any) error

func (f SinkFunc) Emit(ctx context.Context, effect any) error

Emit calls the underlying function.

Store is the durable seam Drive depends on to load and persist a state-machine instance, keyed by an instance key K. It is deliberately small so the bridge never hard-depends on a concrete backend: the crucible/durable module, a SQL store, or any key/value store can supply an adapter, and NewMemStore provides an in-memory implementation for tests.

Implementations must be safe for concurrent use across keys; Drive serializes load→fire→save per key through the [source.Hopper]‘s ordered lanes, but different keys are driven in parallel.

type Store[K comparable, S comparable, E comparable, C any] interface {
// Load returns the persisted [Record] for key and whether one exists. A missing
// key returns ok=false and a nil error so [Drive] can fire a fresh instance from
// its initial state; a backend failure returns a non-nil error, which [Drive]
// treats as transient (nak).
Load(ctx context.Context, key K) (rec Record[S, E, C], ok bool, err error)
// Save persists rec for key under optimistic concurrency: expectedVersion is the
// version the caller loaded (zero for a first write). A Save whose persisted
// version no longer matches expectedVersion must return an error matching
// [ErrConflict]; any other failure is a transient backend error. On success the
// persisted version becomes rec.Version.
Save(ctx context.Context, key K, rec Record[S, E, C], expectedVersion int64) error
}

Generated by gomarkdoc