source/statemachine
import "github.com/stablekernel/crucible/source/statemachine"Package statemachine binds an inbound [source] message to a crucible/state statechart so that consuming a message *is* firing a transition, and the ack is tied to a durable transition. It is the ingress mirror of crucible/sink’s bridge: a separate module depending on both crucible/source and crucible/state, so neither core imports the other.
The binding is one declared step, not hand-wired plumbing:
decode → route to (instanceKey, event) → load instance → Fire(event) → hand emitted effects to a sink → persist new state → ackThe ack comes only after a successful durable Store.Save (ack-after-durable-commit), so at-least-once delivery never advances the stream past an unpersisted transition.
Differentiators
Section titled “Differentiators”What only a state-machine-native ingress binding can own:
- Exactly-once into the machine. The persisted instance carries a monotonic version, and Drive records the event id of the last applied message. A redelivered (key, eventID) that was already applied returns [source.Skip] — acked, never re-fired — so redelivery is provably idempotent with no external dedup store. The machine’s own state version is the dedup key.
- State-aware rejection. A [state.Instance.Fire] that fails because the event is illegal for the current state (no declared transition, or a failing guard/[state.Machine.Verify]) returns [source.Reject] (Term, classified InvalidForState) carrying a [*source.GuardRejection] — distinct from a transient Store or infrastructure error, which returns [source.Nak] (Retryable). “Wrong time” and “try again later” are different first-class outcomes.
- consume → transition → emit. A transition’s emitted effects ([state.FireResult.Effects]) are handed to an injected Sink in the same step, before the ack, so the statechart is the processor and emitted effects are transition outputs.
- Analyzable consumption. Conformance validates that a router’s event union is exhaustive against the machine’s event alphabet and reports inbound events that no state can ever handle, at build or load time.
Three binding modes share the same outcomes:
- Durable: Drive loads and saves each instance through a Store, persisting the transition before acking. Redelivery is deduplicated by the persisted event id (exactly-once into the machine, at-least-once delivery).
- Transactional: DriveTx runs the durable path inside a [source.Transactional] consume-process-produce transaction (Kafka EOS), so the records a transition emits and the consumed offset commit as one atomic unit. It is the exactly-once-into-a-sink path; use it only on a backend that satisfies [source.Transactional].
- Stateless: DriveFunc fires each message against a caller-supplied function with no persistence, for sources that drive a transient or externally-owned machine.
Store coupling
Section titled “Store coupling”The bridge depends only on the small Store interface, never on a concrete durable backend; the crucible/durable module (or any store) can provide an adapter. NewMemStore is an in-memory Store for tests and single-process use.
Stability
Section titled “Stability”Experimental (pre-v1); the API may change until the suite locks v1.0.0.
- Constants
- Variables
- func DefaultEventID(m source.Message) string
- func Drive[K comparable, E comparable, C any](machine *state.Machine[K, E, C], store Store[K, K, E, C], router Router[K, E], opts …Option) source.Handler
- func DriveFunc[K comparable, E comparable](fire FireFunc[K, E], router Router[K, E], opts …Option) source.Handler
- func DriveTx[K comparable, E comparable, C any](machine *state.Machine[K, E, C], store Store[K, K, E, C], router Router[K, E], tx source.Transactional, sink TxSink, opts …Option) source.Handler
- func EventAlphabet[K comparable, E comparable, C any](machine *state.Machine[K, E, C]) []E
- type Conformance
- type EventID
- type FireFunc
- type MemStore
- type Option
- type Record
- type Router
- type Sink
- type SinkFunc
- type Store
- type TxSink
- type TxSinkFunc
Constants
Section titled “Constants”DefaultEventIDHeader is the header DefaultEventID reads a message’s idempotency id from when present.
const DefaultEventIDHeader = "message-id"Variables
Section titled “Variables”ErrConflict reports that a Store.Save was rejected because the persisted version no longer matched the expected version the caller loaded: another writer advanced the same key concurrently. It is a transient, retry-by-reloading condition — Drive surfaces it as [source.Nak] so the message is redelivered and re-applied against the now-current state. Match it with errors.Is.
var ErrConflict = errors.New("statemachine: store version conflict")ErrNotTransactional reports that DriveTx was given a [source.Transactional] that is nil, or a TxSink that is nil. It is a wiring error surfaced at the first message rather than a runtime data condition. Match it with errors.Is.
var ErrNotTransactional = errors.New("statemachine: DriveTx requires a non-nil Transactional and TxSink")func DefaultEventID
Section titled “func DefaultEventID”func DefaultEventID(m source.Message) stringDefaultEventID is the EventID used when none is configured: the DefaultEventIDHeader header value if present, else the message’s cursor string. The cursor is a stream-local coordinate, so the fallback is unique within a stream but not across re-published messages — supply WithEventID with a domain id (an order id, a CloudEvents id) for cross-stream dedup.
func Drive
Section titled “func Drive”func Drive[K comparable, E comparable, C any](machine *state.Machine[K, E, C], store Store[K, K, E, C], router Router[K, E], opts ...Option) source.HandlerDrive binds a durable state-machine instance to a [source.Handler]: each message is routed to an instance key and event, the instance is loaded through the Store, the event is fired, the emitted effects are handed to the configured Sink, the new state is persisted, and only then is the message acked (ack-after-durable-commit). The event type E must be the machine’s event type ([state.Machine] is generic over the same E).
Outcomes:
- Route/decode failure → [source.Term] (poison): unroutable, never retried.
- Redelivery already applied (the message id equals the persisted LastEventID) → [source.Skip]: acked, never re-fired (exactly-once).
- Fire rejected as illegal for the current state (no transition, or a failing guard) → [source.Reject] (Term, InvalidForState) carrying a [*source.GuardRejection].
- Sink emit, persist, or load failure → [source.Nak] (Retryable): a transient infrastructure error, redelivered and re-applied. A persist that lost an optimistic-concurrency race (ErrConflict) is also a nak.
- Success → [source.Ack] after the transition is durably persisted.
Drive is safe for concurrent use; the [source.Hopper] invokes it from per-key ordered lanes, so one key’s load→fire→save never interleaves with itself.
Example
ExampleDrive binds a turnstile statechart to a source.Handler: consuming a “coin” message fires the unlock transition, hands the emitted effect to a sink, persists the new state, and acks only after that durable commit. A redelivery of the same message is a no-op ack — exactly-once into the machine.
machine := buildTurnstile()store := statemachine.NewMemStore[turnstileState, turnstileState, turnstileEvent, *turnstile]()
// Seed a funded turnstile in the locked state at version 1.seeded := machine.Cast(&turnstile{Funded: true}, state.WithInitialState[turnstileState](locked))_ = store.Save(context.Background(), locked, statemachine.Record[turnstileState, turnstileEvent, *turnstile]{Snapshot: seeded.Snapshot(), Version: 1}, 0)
// Route every message to the one instance, firing coin.router := func(source.Message) (turnstileState, turnstileEvent, error) { return locked, coin, nil}sink := statemachine.SinkFunc(func(_ context.Context, eff any) error { fmt.Printf("emit: %v\n", eff) return nil})
h := statemachine.Drive[turnstileState, turnstileEvent, *turnstile]( machine, store, router, statemachine.WithSink(sink),)
first := h(context.Background(), msg("coin-1", "cursor-1"))fmt.Println("first:", first.Action)
// Redeliver the same message id: skipped, acked, not re-applied.again := h(context.Background(), msg("coin-1", "cursor-1"))fmt.Println("redelivery:", again.Action, again.Class)
rec, _, _ := store.Load(context.Background(), locked)fmt.Println("state:", rec.Snapshot.Current, "version:", rec.Version)
// Output:// emit: {coin}// first: ack// redelivery: ack drop// state: unlocked version: 2Output
Section titled “Output”emit: {coin}first: ackredelivery: ack dropstate: unlocked version: 2Example (Cdc)
ExampleDrive_cdc shows the intended change-data-capture pattern: a Debezium topic carries row changes for a turnstile table, a cdc.Codec decodes each change event, and a Router projects the decoded row into the instance key and the event to fire. The Hopper then drives the statechart per primary key, acking only after the transition is durably persisted.
Here a single update row (the turnstile becomes funded) decodes to a coin event, unlocking the machine. No broker is involved; the message stands in for one Debezium record off the topic.
package main
import ( "context" "fmt"
"github.com/stablekernel/crucible/source" "github.com/stablekernel/crucible/source/cdc" "github.com/stablekernel/crucible/source/statemachine" "github.com/stablekernel/crucible/state")
// ExampleDrive_cdc shows the intended change-data-capture pattern: a Debezium// topic carries row changes for a turnstile table, a cdc.Codec decodes each// change event, and a Router projects the decoded row into the instance key and// the event to fire. The Hopper then drives the statechart per primary key,// acking only after the transition is durably persisted.//// Here a single update row (the turnstile becomes funded) decodes to a coin// event, unlocking the machine. No broker is involved; the message stands in for// one Debezium record off the topic.func main() { machine := buildTurnstile() store := statemachine.NewMemStore[turnstileState, turnstileState, turnstileEvent, *turnstile]()
// Seed a funded turnstile in the locked state at version 1. seeded := machine.Cast(&turnstile{Funded: true}, state.WithInitialState[turnstileState](locked)) _ = store.Save(context.Background(), locked, statemachine.Record[turnstileState, turnstileEvent, *turnstile]{Snapshot: seeded.Snapshot(), Version: 1}, 0)
// One codec, registered as the registry default for the CDC topic. registry := source.NewRegistry().SetDefault(cdc.New())
// The Router decodes the change event and projects its after-image into a // (key, event): a create or update on a funded row drives the coin event. router := func(m source.Message) (turnstileState, turnstileEvent, error) { event, err := cdc.DecodeEvent(registry, m) if err != nil { return 0, 0, err } row, err := cdc.AfterAs[turnstile](event) if err != nil { return 0, 0, fmt.Errorf("cdc example: project after-image: %w", err) } if !row.Funded { return 0, 0, fmt.Errorf("cdc example: row not funded") } return locked, coin, nil }
sink := statemachine.SinkFunc(func(_ context.Context, eff any) error { fmt.Printf("emit: %v\n", eff) return nil }) h := statemachine.Drive[turnstileState, turnstileEvent, *turnstile]( machine, store, router, statemachine.WithSink(sink), )
// A Debezium update envelope: the row's funded column flips true. change := cdcMessage(`{ "op":"u", "before":{"funded":false}, "after":{"funded":true}, "source":{"connector":"postgresql","db":"gate","schema":"public","table":"turnstile"}, "ts_ms":1700000000000 }`, "lsn-100")
res := h(context.Background(), change) fmt.Println("result:", res.Action)
rec, _, _ := store.Load(context.Background(), locked) fmt.Println("state:", rec.Snapshot.Current, "version:", rec.Version)
}
// cdcMessage builds a fakeMessage carrying a Debezium JSON change event on a// CDC topic, with the cursor standing in for the source log position.func cdcMessage(payload, cursor string) fakeMessage { return fakeMessage{ value: []byte(payload), subject: "gate.public.turnstile", cursor: fakeCursor(cursor), }}Output
Section titled “Output”emit: {coin}result: ackstate: unlocked version: 2func DriveFunc
Section titled “func DriveFunc”func DriveFunc[K comparable, E comparable](fire FireFunc[K, E], router Router[K, E], opts ...Option) source.HandlerDriveFunc binds a stateless state-machine to a [source.Handler]: each message is routed to a key and event, then fired through a caller-supplied FireFunc with no persistence. It is the mode for a transient or externally-owned machine where there is no durable Store to commit against.
The emit hand-off, state-aware rejection, and the ack outcome match Drive, minus the load/save and minus version idempotency: a stateless binding has no persisted version to dedup against, so a redelivery re-fires. For idempotent redelivery either use Drive, whose persisted version skips an already-applied event id, or add the source/idempotency middleware to the Hopper to suppress duplicates before they reach this handler.
- Route/decode failure → [source.Term] (poison).
- Fire rejected as illegal for the current state → [source.Reject] (InvalidForState) carrying a [*source.GuardRejection].
- FireFunc resolution error, or sink emit failure → [source.Nak] (Retryable).
- Success → [source.Ack].
func DriveTx
Section titled “func DriveTx”func DriveTx[K comparable, E comparable, C any](machine *state.Machine[K, E, C], store Store[K, K, E, C], router Router[K, E], tx source.Transactional, sink TxSink, opts ...Option) source.HandlerDriveTx binds a durable state-machine instance to a [source.Handler] with exactly-once consume-process-produce on a [source.Transactional] subscription (Kafka EOS): the records a transition emits and the consumed offset of the inbound message are committed in one atomic unit, so neither the emitted output nor the ack can survive without the other.
It is the transactional counterpart of Drive. The route, load, fire, version-idempotency, and state-aware-rejection semantics are identical; the difference is the commit boundary. On a successful fire, DriveTx opens a transaction around the message with [source.Transactional.Begin] and, inside it, produces the emitted effects through the TxSink, persists the new state through the Store, and lets Begin commit the produced records together with the consumed offset. It then returns [source.Manual]: the [source.Hopper] takes no further settle action because Begin already committed the offset.
Outcomes:
- Route/decode failure → [source.Term] (poison), settled by the Hopper (no transaction is opened).
- Redelivery already applied (message id equals the persisted LastEventID) → [source.Skip] (acked, never re-fired). The Hopper settles it; the skip is a plain offset advance, not a transaction.
- Fire rejected as illegal for the current state → [source.Reject] (Term, InvalidForState), settled by the Hopper.
- Begin/emit/persist failure, or a broker abort (a rebalance fences the producer) → [source.Nak] (Retryable): the transaction aborted, the offset was not committed, and the message is redelivered.
- Success → [source.Manual] after Begin commits the emitted records and the consumed offset atomically.
A note on persist ordering: the Store.Save runs inside the transaction, so a broker abort after a successful Save leaves the instance advanced but the offset uncommitted. The redelivery is then deduplicated by version (LastEventID), so it acks as a no-op rather than double-applying — the exactly-once-into-the-machine guarantee covers the gap the offset abort opens.
Use DriveTx only with a subscription that satisfies [source.Transactional] (Kafka built with kafka.WithTransactional). On any other backend the capability is absent; use Drive for the at-least-once path. The Hopper must run the handler with [source.WithConcurrency](1) per key so one key’s transactions do not interleave (its default per-key ordering already provides this).
Example
ExampleDriveTx binds a turnstile statechart to a source.Handler with exactly-once consume-process-produce: firing the unlock transition produces the emitted effect AND commits the consumed offset in one transaction, so the handler returns Manual (the engine takes no further settle action).
machine := buildTurnstile()store := statemachine.NewMemStore[turnstileState, turnstileState, turnstileEvent, *turnstile]()
seeded := machine.Cast(&turnstile{Funded: true}, state.WithInitialState[turnstileState](locked))_ = store.Save(context.Background(), locked, statemachine.Record[turnstileState, turnstileEvent, *turnstile]{Snapshot: seeded.Snapshot(), Version: 1}, 0)
router := func(source.Message) (turnstileState, turnstileEvent, error) { return locked, coin, nil}// The TxSink turns each emitted effect into a record produced inside the// transaction.txSink := statemachine.TxSinkFunc(func(ctx context.Context, tx source.Tx, eff any) error { if oe, ok := eff.(openedEffect); ok { return tx.Produce(ctx, source.ProducedRecord{Topic: "turnstile.out", Value: []byte("opened by " + oe.By)}) } return nil})
h := statemachine.DriveTx[turnstileState, turnstileEvent, *turnstile]( machine, store, router, exampleTransactional{}, txSink,)
res := h(context.Background(), msg("coin-1", "cursor-1"))fmt.Println("action:", res.Action)
rec, _, _ := store.Load(context.Background(), locked)fmt.Println("state:", rec.Snapshot.Current, "version:", rec.Version)
// Output:// produce: turnstile.out -> opened by coin// action: manual// state: unlocked version: 2Output
Section titled “Output”produce: turnstile.out -> opened by coinaction: manualstate: unlocked version: 2func EventAlphabet
Section titled “func EventAlphabet”func EventAlphabet[K comparable, E comparable, C any](machine *state.Machine[K, E, C]) []EEventAlphabet returns every event that appears on a declared transition of machine — across all states, nested substates, and parallel regions — sorted by its string form for stable output. It is the introspection CheckEvents builds on, exported so a caller can enumerate “what events can this machine ever act on” directly.
It derives the alphabet from the machine’s serialized IR, the public definition surface, so it never reaches into kernel internals.
type Conformance
Section titled “type Conformance”Conformance is the result of checking a router’s (or codec’s) accepted event union against a machine’s event alphabet: the analyzable-consumption answer to “what does consuming into this machine actually accept, and what can it never trigger?”. It is a value a caller asserts on in a build-time test or logs at load time.
type Conformance[E comparable] struct { // Alphabet is the machine's event alphabet: every event that appears on a // declared transition, across all states, substates, and parallel regions, // sorted for stable reporting. Alphabet []E // Accepted is the event union the router/codec declares it can produce, as // supplied to [CheckEvents]. Accepted []E // Missing lists alphabet events the accepted union does NOT cover: events the // machine can act on but this consumer will never deliver. A non-empty Missing // is usually a gap — a transition no inbound message can ever drive. Missing []E // Unreachable lists accepted events that are NOT in the machine's alphabet: events // the consumer can deliver but no state can ever handle, so they would always be // rejected as invalid-for-state. A non-empty Unreachable is usually dead routing. Unreachable []E}func CheckEvents
Section titled “func CheckEvents”func CheckEvents[K comparable, E comparable, C any](machine *state.Machine[K, E, C], accepted []E) Conformance[E]CheckEvents validates that accepted — the event union a router or codec can produce — is exhaustive against machine’s event alphabet, and reports both the alphabet events the union misses and the accepted events the machine can never handle (which would always be rejected as invalid-for-state). It reads the alphabet from the machine’s serialized definition through the state module’s IR (no private state access), so it works on any built or loaded [state.Machine].
A machine whose definition cannot be serialized (an impossible state for a Quenched machine) yields an empty alphabet and reports every accepted event as unreachable, surfacing the problem rather than masking it.
Example
ExampleCheckEvents validates that a consumer’s accepted event union is exhaustive against the machine’s event alphabet, reporting an event the machine can never handle (which would always be rejected as invalid-for-state).
machine := buildTurnstile()
c := statemachine.CheckEvents(machine, []turnstileEvent{coin, push, maintenance})fmt.Println("exhaustive:", c.Exhaustive())fmt.Println("unreachable:", c.Unreachable)
// Output:// exhaustive: false// unreachable: [maintenance]Output
Section titled “Output”exhaustive: falseunreachable: [maintenance]func (Conformance[E]) Err
Section titled “func (Conformance[E]) Err”func (c Conformance[E]) Err() errorErr returns a non-nil error describing the gaps when the consumption is not Conformance.Exhaustive, so a conformance test can fail with one check:
if err := statemachine.CheckEvents(m, accepted).Err(); err != nil { t.Fatal(err)}It returns nil when exhaustive.
func (Conformance[E]) Exhaustive
Section titled “func (Conformance[E]) Exhaustive”func (c Conformance[E]) Exhaustive() boolExhaustive reports whether the accepted union exactly covers the machine’s alphabet with nothing unreachable: every event the machine handles is deliverable, and every deliverable event is handled.
type EventID
Section titled “type EventID”EventID extracts the idempotency id of an inbound message: the stable per-message identifier the exactly-once dedup keys on. A redelivery of the same logical message must yield the same id. The default reads the “message-id” header (DefaultEventIDHeader) and falls back to the message [source.Cursor] string; override it with WithEventID to read a different header or derive an id from the decoded value.
An empty id disables dedup for that message: with no id to compare against the persisted LastEventID, a redelivery re-fires the transition. The bindings surface this on their span (the “statemachine.exactly_once” attribute is false) so a message stream that yields no id is visible in traces rather than silently losing the guarantee. Return a non-empty stable id to keep exactly-once.
type EventID func(m source.Message) stringtype FireFunc
Section titled “type FireFunc”FireFunc fires a routed event against a caller-owned instance and returns the transition result. It is the stateless mode’s escape from the Store: the host owns instance lifecycle (a transient instance, an externally-persisted one, or one resolved from the message itself) and only the transition outcome flows back to the binding.
A nil error and a result whose Err is nil is a successful transition; a result carrying a [state.InvalidTransitionError] or [state.GuardFailedError] is the state-aware rejection. Returning a non-nil error (not the FireResult.Err) is a transient failure resolving the instance — a nak.
type FireFunc[K comparable, E comparable] func(ctx context.Context, key K, event E) (state.FireResult[K], error)type MemStore
Section titled “type MemStore”MemStore is an in-memory Store for tests and single-process use. It enforces the same optimistic-concurrency contract as a durable backend, returning ErrConflict when a Save’s expected version does not match the stored version, so the exactly-once and conflict paths are exercised without infrastructure. It is safe for concurrent use.
type MemStore[K comparable, S comparable, E comparable, C any] struct { // contains filtered or unexported fields}func NewMemStore
Section titled “func NewMemStore”func NewMemStore[K comparable, S comparable, E comparable, C any]() *MemStore[K, S, E, C]NewMemStore returns an empty in-memory Store.
func (*MemStore[K, S, E, C]) Load
Section titled “func (*MemStore[K, S, E, C]) Load”func (s *MemStore[K, S, E, C]) Load(_ context.Context, key K) (Record[S, E, C], bool, error)Load returns the stored record for key and whether one exists.
func (*MemStore[K, S, E, C]) Save
Section titled “func (*MemStore[K, S, E, C]) Save”func (s *MemStore[K, S, E, C]) Save(_ context.Context, key K, rec Record[S, E, C], expectedVersion int64) errorSave persists rec for key, rejecting a stale write with ErrConflict when the stored version no longer matches expectedVersion.
type Option
Section titled “type Option”Option configures a Drive or DriveFunc binding. Options are additive: a new capability arrives as a new option, never a changed signature.
type Option func(*config)func WithEventID
Section titled “func WithEventID”func WithEventID(fn EventID) OptionWithEventID sets the EventID extractor used for exactly-once dedup. The default reads the DefaultEventIDHeader header and falls back to the cursor; supply a domain id for dedup across re-published streams. A nil extractor is ignored.
func WithSink
Section titled “func WithSink”func WithSink(s Sink) OptionWithSink sets the Sink a transition’s emitted effects are handed to, in the same step, before the ack. The default discards effects. A nil sink is ignored.
func WithSpanName
Section titled “func WithSpanName”func WithSpanName(name string) OptionWithSpanName overrides the per-message span name (default “statemachine.drive”).
func WithTracer
Section titled “func WithTracer”func WithTracer(t telemetry.Tracer) OptionWithTracer sets the tracer the binding starts a per-message span on (decode, route, fire, emit, persist). The default is [telemetry.NopTracer]. Wire the same tracer the Sink uses so emit spans nest under the drive span. A nil tracer is ignored.
type Record
Section titled “type Record”Record is the durable form of a state-machine instance the Store persists: a [state.Snapshot] (the lossless instance state), the monotonic Version that advances by one on every applied transition, and LastEventID, the id of the most recently applied message. Version and LastEventID together are the exactly-once dedup key — a redelivered message whose id equals LastEventID was already folded into Version, so it is skipped rather than re-fired.
A Record round-trips through JSON whenever its [state.Snapshot] does (a JSON-marshalable context C, or a context codec on the store side), so a Store may persist it verbatim.
type Record[S comparable, E comparable, C any] struct { // Snapshot is the instance's persisted state, restorable with // [state.Machine.Restore]. Snapshot state.Snapshot[S, E, C] `json:"snapshot"` // Version is the monotonic sequence advanced once per applied transition, // starting at zero for a never-fired instance. It is the optimistic-concurrency // token a [Store.Save] checks against the expected version. Version int64 `json:"version"` // LastEventID is the id of the most recently applied inbound message (empty for // a never-fired instance). A redelivery carrying this id is a no-op ack. LastEventID string `json:"lastEventId,omitempty"`}type Router
Section titled “type Router”Router resolves an inbound message to the instance key it targets and the event to fire. A decode/route failure returns a non-nil error, which the binding treats as poison ([source.Term]): a message that cannot be routed cannot be retried into legibility. The decoded event is the typed event the machine fires on.
A Router typically delegates to a [source.Registry] (DecodeTyped) to recover the domain value, then projects it to a key and event.
type Router[K comparable, E any] func(m source.Message) (key K, event E, err error)type Sink
Section titled “type Sink”Sink receives the effects a transition emitted, in emission order, in the same step that fired the transition and before the message is acked. It is the consume→transition→emit hand-off, declared as a tiny interface so this module never hard-depends on crucible/sink: a crucible/sink Manifold, a publisher, or any effect handler can be adapted to it. The default is a no-op ([discardSink]); wire one with WithSink.
Emit is called once per emitted effect. A non-nil error fails the step before the ack, so the message is redelivered ([source.Nak]); an emit that must not block the transition should swallow its own errors.
type Sink interface { // Emit hands one transition effect to the sink. The effect's concrete type is a // crucible/state effect value (for example state.SendTo); a handler type-switches // on it. Returning an error nak's the message. Emit(ctx context.Context, effect any) error}type SinkFunc
Section titled “type SinkFunc”SinkFunc adapts a plain function to a Sink.
type SinkFunc func(ctx context.Context, effect any) errorfunc (SinkFunc) Emit
Section titled “func (SinkFunc) Emit”func (f SinkFunc) Emit(ctx context.Context, effect any) errorEmit calls the underlying function.
type Store
Section titled “type Store”Store is the durable seam Drive depends on to load and persist a state-machine instance, keyed by an instance key K. It is deliberately small so the bridge never hard-depends on a concrete backend: the crucible/durable module, a SQL store, or any key/value store can supply an adapter, and NewMemStore provides an in-memory implementation for tests.
Implementations must be safe for concurrent use across keys; Drive serializes load→fire→save per key through the [source.Hopper]‘s ordered lanes, but different keys are driven in parallel.
type Store[K comparable, S comparable, E comparable, C any] interface { // Load returns the persisted [Record] for key and whether one exists. A missing // key returns ok=false and a nil error so [Drive] can fire a fresh instance from // its initial state; a backend failure returns a non-nil error, which [Drive] // treats as transient (nak). Load(ctx context.Context, key K) (rec Record[S, E, C], ok bool, err error) // Save persists rec for key under optimistic concurrency: expectedVersion is the // version the caller loaded (zero for a first write). A Save whose persisted // version no longer matches expectedVersion must return an error matching // [ErrConflict]; any other failure is a transient backend error. On success the // persisted version becomes rec.Version. Save(ctx context.Context, key K, rec Record[S, E, C], expectedVersion int64) error}type TxSink
Section titled “type TxSink”TxSink translates a transition’s emitted effects into records produced on a transactional [source.Tx], so the emitted records and the consumed offset are committed exactly once. It is the transactional mirror of Sink: where Sink fires-and-forgets an effect at an external destination, a TxSink produces the effect into the open transaction the consume-process-produce cycle is fenced by, via [source.Tx.Produce].
EmitTx is called once per emitted effect, in order, inside the transaction. A non-nil error aborts the transaction (so the consumed offset is not committed and the input is redelivered) and is returned by the produce side of DriveTx. An effect a TxSink does not recognize should return an error rather than silently dropping it, so a missing mapping fails loudly rather than losing an output.
type TxSink interface { // EmitTx produces the records that realize effect into tx. The effect's // concrete type is a crucible/state effect value; a TxSink type-switches on it // and calls tx.Produce with the resulting [source.ProducedRecord]s. Returning // an error aborts the transaction. EmitTx(ctx context.Context, tx source.Tx, effect any) error}type TxSinkFunc
Section titled “type TxSinkFunc”TxSinkFunc adapts a plain function to a TxSink.
type TxSinkFunc func(ctx context.Context, tx source.Tx, effect any) errorfunc (TxSinkFunc) EmitTx
Section titled “func (TxSinkFunc) EmitTx”func (f TxSinkFunc) EmitTx(ctx context.Context, tx source.Tx, effect any) errorEmitTx calls the underlying function.
Generated by gomarkdoc