source/statemachine
import "github.com/stablekernel/crucible/source/statemachine"Package statemachine binds an inbound [source] message to a crucible/state statechart so that consuming a message *is* firing a transition, and the ack is tied to a durable transition. It is the ingress mirror of crucible/sink’s bridge: a separate module depending on both crucible/source and crucible/state, so neither core imports the other.
The binding is one declared step, not hand-wired plumbing:
decode → route to (instanceKey, event) → load instance → Fire(event) → hand emitted effects to a sink → persist new state → ackThe ack comes only after a successful durable Store.Save (ack-after-durable-commit), so at-least-once delivery never advances the stream past an unpersisted transition.
Differentiators
Section titled “Differentiators”What only a state-machine-native ingress binding can own:
- Exactly-once into the machine. The persisted instance carries a monotonic version, and Drive records the event id of the last applied message. A redelivered (key, eventID) that was already applied returns [source.Skip] — acked, never re-fired — so redelivery is provably idempotent with no external dedup store. The machine’s own state version is the dedup key.
- State-aware rejection. A [state.Instance.Fire] that fails because the event is illegal for the current state (no declared transition, or a failing guard/[state.Machine.Assay]) returns [source.Reject] (Term, classified InvalidForState) carrying a [*source.GuardRejection] — distinct from a transient Store or infrastructure error, which returns [source.Nak] (Retryable). “Wrong time” and “try again later” are different first-class outcomes.
- consume → transition → emit. A transition’s emitted effects ([state.FireResult.Effects]) are handed to an injected Sink in the same step, before the ack, so the statechart is the processor and emitted effects are transition outputs.
- Analyzable consumption. Conformance validates that a router’s event union is exhaustive against the machine’s event alphabet and reports inbound events that no state can ever handle, at build or load time.
Two binding modes share the same outcomes:
- Durable: Drive loads and saves each instance through a Store, persisting the transition before acking. This is the exactly-once path.
- Stateless: DriveFunc fires each message against a caller-supplied function with no persistence, for sources that drive a transient or externally-owned machine.
Store coupling
Section titled “Store coupling”The bridge depends only on the small Store interface, never on a concrete durable backend; the crucible/durable module (or any store) can provide an adapter. NewMemStore is an in-memory Store for tests and single-process use.
Stability
Section titled “Stability”Experimental (pre-v1); the API may change until the suite locks v1.0.0.
- Constants
- Variables
- func DefaultEventID(m source.Message) string
- func Drive[K comparable, E comparable, C any](machine *state.Machine[K, E, C], store Store[K, K, E, C], router Router[K, E], opts …Option) source.Handler
- func DriveFunc[K comparable, E comparable](fire FireFunc[K, E], router Router[K, E], opts …Option) source.Handler
- func EventAlphabet[K comparable, E comparable, C any](machine *state.Machine[K, E, C]) []E
- type Conformance
- type EventID
- type FireFunc
- type MemStore
- type Option
- type Record
- type Router
- type Sink
- type SinkFunc
- type Store
Constants
Section titled “Constants”DefaultEventIDHeader is the header DefaultEventID reads a message’s idempotency id from when present.
const DefaultEventIDHeader = "message-id"Variables
Section titled “Variables”ErrConflict reports that a Store.Save was rejected because the persisted version no longer matched the expected version the caller loaded: another writer advanced the same key concurrently. It is a transient, retry-by-reloading condition — Drive surfaces it as [source.Nak] so the message is redelivered and re-applied against the now-current state. Match it with errors.Is.
var ErrConflict = errors.New("statemachine: store version conflict")func DefaultEventID
Section titled “func DefaultEventID”func DefaultEventID(m source.Message) stringDefaultEventID is the EventID used when none is configured: the DefaultEventIDHeader header value if present, else the message’s cursor string. The cursor is a stream-local coordinate, so the fallback is unique within a stream but not across re-published messages — supply WithEventID with a domain id (an order id, a CloudEvents id) for cross-stream dedup.
func Drive
Section titled “func Drive”func Drive[K comparable, E comparable, C any](machine *state.Machine[K, E, C], store Store[K, K, E, C], router Router[K, E], opts ...Option) source.HandlerDrive binds a durable state-machine instance to a [source.Handler]: each message is routed to an instance key and event, the instance is loaded through the Store, the event is fired, the emitted effects are handed to the configured Sink, the new state is persisted, and only then is the message acked (ack-after-durable-commit). The event type E must be the machine’s event type ([state.Machine] is generic over the same E).
Outcomes:
- Route/decode failure → [source.Term] (poison): unroutable, never retried.
- Redelivery already applied (the message id equals the persisted LastEventID) → [source.Skip]: acked, never re-fired (exactly-once).
- Fire rejected as illegal for the current state (no transition, or a failing guard) → [source.Reject] (Term, InvalidForState) carrying a [*source.GuardRejection].
- Sink emit, persist, or load failure → [source.Nak] (Retryable): a transient infrastructure error, redelivered and re-applied. A persist that lost an optimistic-concurrency race (ErrConflict) is also a nak.
- Success → [source.Ack] after the transition is durably persisted.
Drive is safe for concurrent use; the [source.Hopper] invokes it from per-key ordered lanes, so one key’s load→fire→save never interleaves with itself.
Example
ExampleDrive binds a turnstile statechart to a source.Handler: consuming a “coin” message fires the unlock transition, hands the emitted effect to a sink, persists the new state, and acks only after that durable commit. A redelivery of the same message is a no-op ack — exactly-once into the machine.
machine := buildTurnstile()store := statemachine.NewMemStore[turnstileState, turnstileState, turnstileEvent, *turnstile]()
// Seed a funded turnstile in the locked state at version 1.seeded := machine.Cast(&turnstile{Funded: true}, state.WithInitialState[turnstileState](locked))_ = store.Save(context.Background(), locked, statemachine.Record[turnstileState, turnstileEvent, *turnstile]{Snapshot: seeded.Snapshot(), Version: 1}, 0)
// Route every message to the one instance, firing coin.router := func(source.Message) (turnstileState, turnstileEvent, error) { return locked, coin, nil}sink := statemachine.SinkFunc(func(_ context.Context, eff any) error { fmt.Printf("emit: %v\n", eff) return nil})
h := statemachine.Drive[turnstileState, turnstileEvent, *turnstile]( machine, store, router, statemachine.WithSink(sink),)
first := h(context.Background(), msg("coin-1", "cursor-1"))fmt.Println("first:", first.Action)
// Redeliver the same message id: skipped, acked, not re-applied.again := h(context.Background(), msg("coin-1", "cursor-1"))fmt.Println("redelivery:", again.Action, again.Class)
rec, _, _ := store.Load(context.Background(), locked)fmt.Println("state:", rec.Snapshot.Current, "version:", rec.Version)
// Output:// emit: {coin}// first: ack// redelivery: ack drop// state: unlocked version: 2Output
Section titled “Output”emit: {coin}first: ackredelivery: ack dropstate: unlocked version: 2func DriveFunc
Section titled “func DriveFunc”func DriveFunc[K comparable, E comparable](fire FireFunc[K, E], router Router[K, E], opts ...Option) source.HandlerDriveFunc binds a stateless state-machine to a [source.Handler]: each message is routed to a key and event, then fired through a caller-supplied FireFunc with no persistence. It is the mode for a transient or externally-owned machine where there is no durable Store to commit against.
The emit hand-off, state-aware rejection, and the ack outcome match Drive, minus the load/save and minus version idempotency (a stateless binding has no persisted version to dedup against, so redelivery re-fires; supply a [Deduper] or use Drive for exactly-once):
- Route/decode failure → [source.Term] (poison).
- Fire rejected as illegal for the current state → [source.Reject] (InvalidForState) carrying a [*source.GuardRejection].
- FireFunc resolution error, or sink emit failure → [source.Nak] (Retryable).
- Success → [source.Ack].
func EventAlphabet
Section titled “func EventAlphabet”func EventAlphabet[K comparable, E comparable, C any](machine *state.Machine[K, E, C]) []EEventAlphabet returns every event that appears on a declared transition of machine — across all states, nested substates, and parallel regions — sorted by its string form for stable output. It is the introspection CheckEvents builds on, exported so a caller can enumerate “what events can this machine ever act on” directly.
It derives the alphabet from the machine’s serialized IR, the public definition surface, so it never reaches into kernel internals.
type Conformance
Section titled “type Conformance”Conformance is the result of checking a router’s (or codec’s) accepted event union against a machine’s event alphabet: the analyzable-consumption answer to “what does consuming into this machine actually accept, and what can it never trigger?”. It is a value a caller asserts on in a build-time test or logs at load time.
type Conformance[E comparable] struct { // Alphabet is the machine's event alphabet: every event that appears on a // declared transition, across all states, substates, and parallel regions, // sorted for stable reporting. Alphabet []E // Accepted is the event union the router/codec declares it can produce, as // supplied to [CheckEvents]. Accepted []E // Missing lists alphabet events the accepted union does NOT cover: events the // machine can act on but this consumer will never deliver. A non-empty Missing // is usually a gap — a transition no inbound message can ever drive. Missing []E // Unreachable lists accepted events that are NOT in the machine's alphabet: events // the consumer can deliver but no state can ever handle, so they would always be // rejected as invalid-for-state. A non-empty Unreachable is usually dead routing. Unreachable []E}func CheckEvents
Section titled “func CheckEvents”func CheckEvents[K comparable, E comparable, C any](machine *state.Machine[K, E, C], accepted []E) Conformance[E]CheckEvents validates that accepted — the event union a router or codec can produce — is exhaustive against machine’s event alphabet, and reports both the alphabet events the union misses and the accepted events the machine can never handle (which would always be rejected as invalid-for-state). It reads the alphabet from the machine’s serialized definition through the state module’s IR (no private state access), so it works on any built or loaded [state.Machine].
A machine whose definition cannot be serialized (an impossible state for a Quenched machine) yields an empty alphabet and reports every accepted event as unreachable, surfacing the problem rather than masking it.
Example
ExampleCheckEvents validates that a consumer’s accepted event union is exhaustive against the machine’s event alphabet, reporting an event the machine can never handle (which would always be rejected as invalid-for-state).
machine := buildTurnstile()
c := statemachine.CheckEvents(machine, []turnstileEvent{coin, push, maintenance})fmt.Println("exhaustive:", c.Exhaustive())fmt.Println("unreachable:", c.Unreachable)
// Output:// exhaustive: false// unreachable: [maintenance]Output
Section titled “Output”exhaustive: falseunreachable: [maintenance]func (Conformance[E]) Err
Section titled “func (Conformance[E]) Err”func (c Conformance[E]) Err() errorErr returns a non-nil error describing the gaps when the consumption is not Conformance.Exhaustive, so a conformance test can fail with one check:
if err := statemachine.CheckEvents(m, accepted).Err(); err != nil { t.Fatal(err)}It returns nil when exhaustive.
func (Conformance[E]) Exhaustive
Section titled “func (Conformance[E]) Exhaustive”func (c Conformance[E]) Exhaustive() boolExhaustive reports whether the accepted union exactly covers the machine’s alphabet with nothing unreachable: every event the machine handles is deliverable, and every deliverable event is handled.
type EventID
Section titled “type EventID”EventID extracts the idempotency id of an inbound message: the stable per-message identifier the exactly-once dedup keys on. A redelivery of the same logical message must yield the same id. The default reads the “message-id” header (DefaultEventIDHeader) and falls back to the message [source.Cursor] string; override it with WithEventID to read a different header or derive an id from the decoded value.
type EventID func(m source.Message) stringtype FireFunc
Section titled “type FireFunc”FireFunc fires a routed event against a caller-owned instance and returns the transition result. It is the stateless mode’s escape from the Store: the host owns instance lifecycle (a transient instance, an externally-persisted one, or one resolved from the message itself) and only the transition outcome flows back to the binding.
A nil error and a result whose Err is nil is a successful transition; a result carrying a [state.ErrInvalidTransition] or [state.ErrGuardFailed] is the state-aware rejection. Returning a non-nil error (not the FireResult.Err) is a transient failure resolving the instance — a nak.
type FireFunc[K comparable, E comparable] func(ctx context.Context, key K, event E) (state.FireResult[K], error)type MemStore
Section titled “type MemStore”MemStore is an in-memory Store for tests and single-process use. It enforces the same optimistic-concurrency contract as a durable backend, returning ErrConflict when a Save’s expected version does not match the stored version, so the exactly-once and conflict paths are exercised without infrastructure. It is safe for concurrent use.
type MemStore[K comparable, S comparable, E comparable, C any] struct { // contains filtered or unexported fields}func NewMemStore
Section titled “func NewMemStore”func NewMemStore[K comparable, S comparable, E comparable, C any]() *MemStore[K, S, E, C]NewMemStore returns an empty in-memory Store.
func (*MemStore[K, S, E, C]) Load
Section titled “func (*MemStore[K, S, E, C]) Load”func (s *MemStore[K, S, E, C]) Load(_ context.Context, key K) (Record[S, E, C], bool, error)Load returns the stored record for key and whether one exists.
func (*MemStore[K, S, E, C]) Save
Section titled “func (*MemStore[K, S, E, C]) Save”func (s *MemStore[K, S, E, C]) Save(_ context.Context, key K, rec Record[S, E, C], expectedVersion int64) errorSave persists rec for key, rejecting a stale write with ErrConflict when the stored version no longer matches expectedVersion.
type Option
Section titled “type Option”Option configures a Drive or DriveFunc binding. Options are additive: a new capability arrives as a new option, never a changed signature.
type Option func(*config)func WithEventID
Section titled “func WithEventID”func WithEventID(fn EventID) OptionWithEventID sets the EventID extractor used for exactly-once dedup. The default reads the DefaultEventIDHeader header and falls back to the cursor; supply a domain id for dedup across re-published streams. A nil extractor is ignored.
func WithSink
Section titled “func WithSink”func WithSink(s Sink) OptionWithSink sets the Sink a transition’s emitted effects are handed to, in the same step, before the ack. The default discards effects. A nil sink is ignored.
func WithSpanName
Section titled “func WithSpanName”func WithSpanName(name string) OptionWithSpanName overrides the per-message span name (default “statemachine.drive”).
func WithTracer
Section titled “func WithTracer”func WithTracer(t telemetry.Tracer) OptionWithTracer sets the tracer the binding starts a per-message span on (decode, route, fire, emit, persist). The default is [telemetry.NopTracer]. Wire the same tracer the Sink uses so emit spans nest under the drive span. A nil tracer is ignored.
type Record
Section titled “type Record”Record is the durable form of a state-machine instance the Store persists: a [state.Snapshot] (the lossless instance state), the monotonic Version that advances by one on every applied transition, and LastEventID, the id of the most recently applied message. Version and LastEventID together are the exactly-once dedup key — a redelivered message whose id equals LastEventID was already folded into Version, so it is skipped rather than re-fired.
A Record round-trips through JSON whenever its [state.Snapshot] does (a JSON-marshalable context C, or a context codec on the store side), so a Store may persist it verbatim.
type Record[S comparable, E comparable, C any] struct { // Snapshot is the instance's persisted state, restorable with // [state.Machine.Restore]. Snapshot state.Snapshot[S, E, C] `json:"snapshot"` // Version is the monotonic sequence advanced once per applied transition, // starting at zero for a never-fired instance. It is the optimistic-concurrency // token a [Store.Save] checks against the expected version. Version int64 `json:"version"` // LastEventID is the id of the most recently applied inbound message (empty for // a never-fired instance). A redelivery carrying this id is a no-op ack. LastEventID string `json:"lastEventId,omitempty"`}type Router
Section titled “type Router”Router resolves an inbound message to the instance key it targets and the event to fire. A decode/route failure returns a non-nil error, which the binding treats as poison ([source.Term]): a message that cannot be routed cannot be retried into legibility. The decoded event is the typed event the machine fires on.
A Router typically delegates to a [source.Registry] (DecodeTyped) to recover the domain value, then projects it to a key and event.
type Router[K comparable, E any] func(m source.Message) (key K, event E, err error)type Sink
Section titled “type Sink”Sink receives the effects a transition emitted, in emission order, in the same step that fired the transition and before the message is acked. It is the consume→transition→emit hand-off, declared as a tiny interface so this module never hard-depends on crucible/sink: a crucible/sink Manifold, a publisher, or any effect handler can be adapted to it. The default is a no-op ([discardSink]); wire one with WithSink.
Emit is called once per emitted effect. A non-nil error fails the step before the ack, so the message is redelivered ([source.Nak]); an emit that must not block the transition should swallow its own errors.
type Sink interface { // Emit hands one transition effect to the sink. The effect's concrete type is a // crucible/state effect value (for example state.SendTo); a handler type-switches // on it. Returning an error nak's the message. Emit(ctx context.Context, effect any) error}type SinkFunc
Section titled “type SinkFunc”SinkFunc adapts a plain function to a Sink.
type SinkFunc func(ctx context.Context, effect any) errorfunc (SinkFunc) Emit
Section titled “func (SinkFunc) Emit”func (f SinkFunc) Emit(ctx context.Context, effect any) errorEmit calls the underlying function.
type Store
Section titled “type Store”Store is the durable seam Drive depends on to load and persist a state-machine instance, keyed by an instance key K. It is deliberately small so the bridge never hard-depends on a concrete backend: the crucible/durable module, a SQL store, or any key/value store can supply an adapter, and NewMemStore provides an in-memory implementation for tests.
Implementations must be safe for concurrent use across keys; Drive serializes load→fire→save per key through the [source.Hopper]‘s ordered lanes, but different keys are driven in parallel.
type Store[K comparable, S comparable, E comparable, C any] interface { // Load returns the persisted [Record] for key and whether one exists. A missing // key returns ok=false and a nil error so [Drive] can fire a fresh instance from // its initial state; a backend failure returns a non-nil error, which [Drive] // treats as transient (nak). Load(ctx context.Context, key K) (rec Record[S, E, C], ok bool, err error) // Save persists rec for key under optimistic concurrency: expectedVersion is the // version the caller loaded (zero for a first write). A Save whose persisted // version no longer matches expectedVersion must return an error matching // [ErrConflict]; any other failure is a transient backend error. On success the // persisted version becomes rec.Version. Save(ctx context.Context, key K, rec Record[S, E, C], expectedVersion int64) error}Generated by gomarkdoc