durable
import "github.com/stablekernel/crucible/durable"Package durable is the host-side durable-execution runtime for the Crucible state kernel. It records the nondeterministic results a running instance consumes and persists them, so an instance can be checkpointed, crash, and resume by replaying recorded values back through the kernel’s pure transition function rather than re-invoking their external sources.
The package is additive over the state kernel: it consumes the kernel’s already-reserved persistence seams — Snapshot.Journal ([]state.JournalEntry), the EffectEnvelope.EffectID correlation slot, and the injectable Clock, ServiceRunner, and ActorSystem drivers — without requiring any change to the kernel, which stays pure and stdlib-only.
Guarantees
Section titled “Guarantees”Deterministic replay: an instance recovered from its Store reaches exactly the same configuration, context, and history as a run that never crashed, because recovery replays the exact recorded driving events and nondeterministic results through the same pure transition function.
Exactly-once effects: a domain effect emitted by a transition is applied exactly once over the instance’s lifetime — the live run plus any number of recoveries — even though the runtime’s replay loop is at-least-once. Each effect is stamped with a deterministic EffectID (step, ordinal, kind) and deduplicated through the Store’s dispatch set.
Durability across restart: every Fire step is written to the Store before it is acknowledged (write-ahead append), so a crash after a successful Fire never loses the step. A periodic Snapshot checkpoint bounds the tail that recovery must replay, so recovery cost is proportional to the tail length, not the whole run.
Caveats
Section titled “Caveats”Payloads are serialized at the journal boundary. Event, service done-data, actor done-data, and actor messages are recorded as their JSON form; a parent reducer that type-asserts a non-JSON Go type from AssignCtx.Event observes the JSON-decoded shape on the replayed onDone. A typed-codec option to carry the concrete Go value across the boundary is reserved for a later, additive change.
The kernel stays pure and dependency-free. The durable module is host-side: database drivers, cloud SDKs, and I/O libraries live in the caller or in out- of-tree Store backends, never in this module. The in-tree MemStore and FileStore are stdlib-only reference implementations.
Store is the persistence seam. A durable instance is an ordered log of Records (one per Fire step) layered over periodic full-Snapshot checkpoints. Load reconstructs an instance from its latest checkpoint plus the journal and effect tail recorded after it.
MemStore is the in-memory reference implementation: thread-safe, stdlib-only, not durable across process restarts. Use it for tests, examples, and single- process development.
FileStore is the on-disk reference implementation: a directory of per-instance subdirectories, each holding an append-only journal, an atomic checkpoint, an idempotency ledger, and a dispatched-effect log. Each Append flushes to stable storage; each Checkpoint uses write-temp+rename for crash-safe atomicity. Use FileStore when you need durability across restarts without a database.
Persistent database backends (PostgreSQL, DynamoDB, and the like) implement the Store interface out of tree, so heavy drivers never burden this module’s dependency or vulnerability surface.
Runner / Handle / Recover
Section titled “Runner / Handle / Recover”Runner is the durable wrapper around a state.Machine. Wire one with a Store, then call Start to create a fresh instance (persisting a baseline checkpoint) or Fire to drive events statelessly (loading, replaying, and re-recording for each call). For a hot path that fires many events in sequence, obtain a Handle from Start or Recover and call Handle.Fire directly, avoiding a Store round- trip between steps.
Recover reconstructs a durable instance purely from the Store: it loads the latest checkpoint Snapshot and the tail of Records after it, restores the snapshot under a replay clock (firing nothing), and replays the tail’s recorded driving events through the kernel to reach the live tip. The returned Handle continues recording subsequent fires.
The three nondeterministic seams
Section titled “The three nondeterministic seams”A state machine is a pure function of its inputs. The durable runtime isolates each source of nondeterminism behind a seam, records the result the first time, and replays it verbatim on every subsequent recovery — so the kernel’s transition function is never re-invoked for a value it has already consumed.
Clock (WithRunnerClock): a running instance reads time only through its host scheduler, which arms and ticks delayed `after` transitions. The Runner wraps the real clock in a recording clock on the live path so every Now() reading the scheduler consumes is journaled as a JournalClockRead in the step’s Record. On recovery a replay clock returns those readings in order, making timer-driven transitions wall-clock-independent: the same timers fire at the same recorded instants regardless of the wall clock at recovery time. A timer also survives checkpoint compaction: each checkpoint persists the absolute deadlines of the timers armed at that instant; on recovery a timer whose arming ScheduleAfter was compacted out of the tail is re-armed from its persisted deadline rather than lost.
Invoked services (WithServiceRegistry): a service (`invoke`) runs exactly once on the live path; its result is journaled as a JournalServiceResult. On recovery the recorded result is replayed back through the same kernel settle seam — the service is never re-invoked and the same onDone or onError event re-fires with the same data.
Child-machine actors (WithActorPalette): an actor’s behavior runs exactly once on the live path; each parent transition the delivery drives is journaled as a JournalActorMessage. On recovery the recorded transition is re-fired directly through the parent with the recorded done-data — the actor behavior is never re-instantiated.
Idempotent effect dispatch
Section titled “Idempotent effect dispatch”A transition may emit a domain effect — sending an email, charging a card, publishing a message — as an Effect value the kernel routes out through FireResult.Effects without applying. The Runner applies it through the caller- supplied EffectHandler (WithEffectHandler), exactly once over the instance’s lifetime.
Each emitted effect is stamped with a deterministic EffectID derived from its step, its emission ordinal within that step, and its kind — every component a pure function of the recorded run, so the same effect carries the same id on the live path and on every recovery. The Runner write-ahead appends the step Record (carrying those ids) before dispatching, then applies each effect whose id is not already in the Store’s dispatched set and marks it as it succeeds. A crash between append and dispatch leaves the effect recorded but un-marked, so recovery redispatches it; a crash after dispatch finds the id marked and skips it.
Kernel driver effects (services, timers, actors) are absorbed by their host drivers and never reach the handler.
Time-travel reader
Section titled “Time-travel reader”StateAt reconstructs a durable instance’s state as of any recorded step, read- only: it restores the start baseline and replays recorded events forward up to the requested step, running no service, re-instantiating no actor, reading no wall clock, and dispatching no domain effect. The Store is consulted only through reads; neither the recorded log nor the dispatched set is touched.
Time-travel requires the full Record history through the target step. A Store that compacts its journal at each checkpoint can no longer reach a compacted step. A Store opts into full history by implementing HistoryStore; the in-tree MemStore does so under WithHistory. StateAt falls back to Load (latest checkpoint plus tail) when the Store does not implement HistoryStore.
Example
Example shows the Store round-trip a durable instance relies on: append the per-step journal as an instance advances, checkpoint a full Snapshot to compact the tail, then load the checkpoint plus the post-checkpoint tail to reconstruct the instance.
package main
import ( "context" "encoding/json" "fmt"
"github.com/stablekernel/crucible/durable" "github.com/stablekernel/crucible/state")
func main() { ctx := context.Background() store := durable.NewMemStore() const id = durable.InstanceID("order-42")
// Record two steps of nondeterministic results as the instance fires. for step := range 2 { _, err := store.Append(ctx, id, durable.Record{ Step: step, Entries: []state.JournalEntry{{ Step: step, Kind: state.JournalServiceResult, CorrelationID: fmt.Sprintf("charge-%d", step), Payload: json.RawMessage(`{"ok":true}`), }}, }) if err != nil { panic(err) } }
// Checkpoint a full snapshot through step 0, compacting that step's tail. snap, err := state.MarshalSnapshot(state.Snapshot[string, string, map[string]any]{ Machine: "order", Current: "charging", Configuration: []string{"charging"}, Context: map[string]any{"orderID": "42"}, }) if err != nil { panic(err) } if cpErr := store.Checkpoint(ctx, id, snap, 0); cpErr != nil { panic(cpErr) }
// Load reconstructs from the checkpoint plus the post-checkpoint tail. loaded, tail, err := store.Load(ctx, id) if err != nil { panic(err) } restored, err := state.UnmarshalSnapshot[string, string, map[string]any](loaded) if err != nil { panic(err) }
fmt.Println("checkpoint state:", restored.Current) fmt.Println("tail steps:", len(tail)) fmt.Println("first tail step:", tail[0].Step)}Output
Section titled “Output”checkpoint state: chargingtail steps: 1first tail step: 1- Constants
- Variables
- func JournalPath(root string, id InstanceID) string
- func Steps(ctx context.Context, st Store, id InstanceID) ([]int, error)
- type AppendOption
- type CheckpointOption
- type DispatchStore
- type EffectHandler
- type EventCodec
- type FileStore
- func NewFileStore(dir string, opts …FileStoreOption) (*FileStore, error)
- func (s *FileStore) Append(_ context.Context, id InstanceID, rec Record, opts …AppendOption) (int64, error)
- func (s *FileStore) Checkpoint(_ context.Context, id InstanceID, snapshot []byte, throughStep int, opts …CheckpointOption) error
- func (s *FileStore) Dispatched(_ context.Context, id InstanceID) (map[string]bool, error)
- func (s *FileStore) Load(_ context.Context, id InstanceID) ([]byte, []Record, error)
- func (s *FileStore) MarkDispatched(_ context.Context, id InstanceID, effectIDs …string) error
- type FileStoreOption
- type Handle
- func Recover[S comparable, E comparable, C any](ctx context.Context, m *state.Machine[S, E, C], st Store, id InstanceID, opts …Option[S, E, C]) (*Handle[S, E, C], error)
- func (h *Handle[S, E, C]) ActorRef(id string) (state.ActorRef, bool)
- func (h *Handle[S, E, C]) DeliverToActor(ctx context.Context, ref state.ActorRef, event any) (bool, error)
- func (h *Handle[S, E, C]) DeliverToActorByID(ctx context.Context, id string, event any) (bool, error)
- func (h *Handle[S, E, C]) Fire(ctx context.Context, event E, opts …state.FireOption) (state.FireResult[S], error)
- func (h *Handle[S, E, C]) ID() InstanceID
- func (h *Handle[S, E, C]) Instance() *state.Instance[S, E, C]
- func (h *Handle[S, E, C]) RunService(ctx context.Context, id string) (state.FireResult[S], bool, error)
- func (h *Handle[S, E, C]) Tick(ctx context.Context) ([]state.FireResult[S], error)
- type HistoryStore
- type InstanceID
- type MemStore
- func NewMemStore(opts …MemStoreOption) *MemStore
- func (s *MemStore) Append(_ context.Context, id InstanceID, rec Record, opts …AppendOption) (int64, error)
- func (s *MemStore) Checkpoint(_ context.Context, id InstanceID, snapshot []byte, throughStep int, opts …CheckpointOption) error
- func (s *MemStore) Dispatched(_ context.Context, id InstanceID) (map[string]bool, error)
- func (s *MemStore) History(_ context.Context, id InstanceID) ([]byte, []Record, error)
- func (s *MemStore) Load(_ context.Context, id InstanceID) ([]byte, []Record, error)
- func (s *MemStore) MarkDispatched(_ context.Context, id InstanceID, effectIDs …string) error
- type MemStoreOption
- type Option
- func WithActorPalette[S comparable, E comparable, C any](palette map[string]state.ActorBehavior) Option[S, E, C]
- func WithCheckpointEvery[S comparable, E comparable, C any](n int) Option[S, E, C]
- func WithEffectHandler[S comparable, E comparable, C any](h EffectHandler) Option[S, E, C]
- func WithEventCodec[S comparable, E comparable, C any](codec EventCodec[E]) Option[S, E, C]
- func WithRunnerClock[S comparable, E comparable, C any](c state.Clock) Option[S, E, C]
- func WithServiceRegistry[S comparable, E comparable, C any](reg *state.Registry[C]) Option[S, E, C]
- type Record
- type Runner
- func NewRunner[S comparable, E comparable, C any](m *state.Machine[S, E, C], st Store, opts …Option[S, E, C]) *Runner[S, E, C]
- func (r *Runner[S, E, C]) Fire(ctx context.Context, id InstanceID, event E, opts …state.FireOption) (state.FireResult[S], error)
- func (r *Runner[S, E, C]) Start(ctx context.Context, id InstanceID, input C, opts …state.CastOption[S]) (*Handle[S, E, C], error)
- type Store
- type TimeTravelView
- func StateAt[S comparable, E comparable, C any](ctx context.Context, m *state.Machine[S, E, C], st Store, id InstanceID, step int, opts …Option[S, E, C]) (*TimeTravelView[S, E, C], error)
- func (v *TimeTravelView[S, E, C]) Instance() *state.Instance[S, E, C]
- func (v *TimeTravelView[S, E, C]) Snapshot() state.Snapshot[S, E, C]
- func (v *TimeTravelView[S, E, C]) Step() int
Constants
Section titled “Constants”BaselineStep is the step that addresses an instance’s start baseline — its state immediately after Start, before any event fired. StateAt(ctx, m, st, id, BaselineStep) reconstructs the freshly cast instance.
const BaselineStep = baselineStepVariables
Section titled “Variables”Sentinel errors a Store reports. Callers match them with errors.Is.
var ( // ErrInstanceNotFound is reported by Store.Load for an instance that has // never been written. ErrInstanceNotFound = errors.New("crucible/durable: instance not found")
// ErrStepOutOfOrder is reported by Store.Append when a Record's Step does not // strictly follow the instance's highest recorded Step (and is not an // idempotent re-append of an already-recorded Step). ErrStepOutOfOrder = errors.New("crucible/durable: record step out of order")
// ErrCheckpointNotAdvancing is reported by Store.Checkpoint when throughStep // does not advance beyond the instance's current checkpoint. ErrCheckpointNotAdvancing = errors.New("crucible/durable: checkpoint does not advance")
// ErrInstanceExists is reported by Runner.Start when the InstanceID is already // present in the Store, so a fresh Start does not clobber a recorded instance. ErrInstanceExists = errors.New("crucible/durable: instance already exists")
// ErrEffectDispatch wraps a failure returned by the caller-supplied effect // handler (WithEffectHandler) while applying an emitted effect. The effect was // persisted (write-ahead) but not marked dispatched, so a later recovery // retries it; callers match this with errors.Is to distinguish a dispatch // failure from a kernel transition or persistence error. ErrEffectDispatch = errors.New("crucible/durable: effect dispatch failed"))ErrStepOutOfRange is reported by StateAt when the requested step is below the start baseline or beyond the instance’s last recorded step. Callers match it with errors.Is.
var ErrStepOutOfRange = errors.New("crucible/durable: time-travel step out of range")func JournalPath
Section titled “func JournalPath”func JournalPath(root string, id InstanceID) stringJournalPath returns the append-only journal file path for an instance under a FileStore rooted at root. It is exported so tests and tooling can inspect or fault-inject the on-disk journal directly.
func Steps
Section titled “func Steps”func Steps(ctx context.Context, st Store, id InstanceID) ([]int, error)Steps enumerates the recorded step ordinals of an instance, in order, so a caller can drive StateAt across the run (for example to render an audit timeline). It reports only externally fired and service/actor steps — the ordinals a StateAt target may name — and is read-only. An unknown instance reports ErrInstanceNotFound.
type AppendOption
Section titled “type AppendOption”AppendOption configures a single Store.Append call. It is the additive extension point for per-append behavior the durable runner layers on later (for example, an explicit idempotency key distinct from the Record’s Step).
type AppendOption func(*appendConfig)func WithIdempotencyKey
Section titled “func WithIdempotencyKey”func WithIdempotencyKey(key string) AppendOptionWithIdempotencyKey sets an explicit idempotency key for an Append, overriding the default of deduplicating on the Record’s Step alone. Two Appends carrying the same key for the same instance collapse to one. An empty key is ignored (the Step-based default applies).
type CheckpointOption
Section titled “type CheckpointOption”CheckpointOption configures a single Store.Checkpoint call. It is the additive extension point for per-checkpoint policy a backend may layer on. No option is defined yet: the seam reserves a stable signature so per-checkpoint policy can arrive additively without breaking the Store interface. Time-travel retention, which an earlier per-checkpoint flag covered, is now a store-level capability (the HistoryStore seam, MemStore’s WithHistory).
type CheckpointOption func(*checkpointConfig)type DispatchStore
Section titled “type DispatchStore”DispatchStore is the dedup seam for idempotent effect dispatch: the Store tracks which effect ids an instance has applied, so a (re)dispatch skips the ones that already landed. It is an additive, optional capability — the core Store interface is unchanged; a backend opts in by implementing these two methods, and the Runner dispatches effects only against a Store that does (the in-tree MemStore does). A persistent backend marks an id in the same transaction that records its side-effect acknowledgement to keep the guarantee tight.
type DispatchStore interface { // MarkDispatched records that the effects named by effectIDs have been applied // for the instance. It is idempotent: re-marking an id is a no-op. MarkDispatched(ctx context.Context, id InstanceID, effectIDs ...string) error // Dispatched returns the set of effect ids already applied for the instance as // a membership map. An instance with none reports an empty (non-nil) map. Dispatched(ctx context.Context, id InstanceID) (map[string]bool, error)}type EffectHandler
Section titled “type EffectHandler”EffectHandler applies one emitted domain effect, identified by its stamped EffectID. The Runner calls it exactly once per EffectID over the lifetime of an instance (see WithEffectHandler). A non-nil error leaves the effect un-marked for a later retry and is surfaced to the caller wrapped in ErrEffectDispatch.
type EffectHandler func(ctx context.Context, effectID string, effect state.Effect) errortype EventCodec
Section titled “type EventCodec”EventCodec encodes and decodes an event value E to and from its structured JSON form, the inverse of the kernel’s Trace.EventPayload marshaling. It is the seam by which Recover reconstructs the exact event to re-Fire. The default codec uses encoding/json; supply a custom one with WithEventCodec for events the default cannot round-trip.
type EventCodec[E comparable] interface { // Decode reconstructs the event value from its recorded payload. An empty // payload decodes to the zero event. Decode(payload []byte) (E, error)}type FileStore
Section titled “type FileStore”FileStore is an on-disk Store: a directory of per-instance subdirectories, each holding an append-only journal of Records, the instance’s latest Snapshot checkpoint, an append-only idempotency ledger, and an append-only set of dispatched effect ids. It is stdlib-only and durable across process restarts — a fresh FileStore opened over the same directory reconstructs every instance from disk, with no carried in-memory state — so it is the persistent reference backend the in-memory MemStore is the volatile counterpart to.
On-disk layout
Section titled “On-disk layout”Under the store’s root directory, each instance lives in its own subdirectory named by a filesystem-safe encoding of its InstanceID:
<root>/<encoded-id>/journal.log append-only, one JSON Record per line<root>/<encoded-id>/checkpoint.snap latest checkpoint Snapshot bytes<root>/<encoded-id>/checkpoint.meta latest checkpoint throughStep<root>/<encoded-id>/applied.log append-only idempotency ledger (key\tseq)<root>/<encoded-id>/dispatched.log append-only dispatched effect idsThe journal is the source of truth for the post-checkpoint Record tail; the checkpoint files hold the compacted prefix’s Snapshot; the applied ledger preserves an idempotency key even after its Record is compacted out of the journal; the dispatched log backs the DispatchStore dedup set.
Atomicity and crash-safety
Section titled “Atomicity and crash-safety”An Append writes one complete newline-terminated JSON line to the journal and flushes it to stable storage before returning, so a successful Append is durable. A crash mid-write can leave a torn (newline-less or unparseable) trailing line; on reopen the loader reads complete lines and stops at the first torn trailing record, discarding it without corrupting the intact prefix. A Checkpoint writes the Snapshot and its throughStep through a write-temp+rename, which is atomic on POSIX filesystems, then rewrites the journal to only the post-checkpoint tail (also via temp+rename), so a concurrent reopen never observes a checkpoint torn against its tail.
All methods are safe for concurrent use; a per-instance mutex serializes writes to one instance’s files while distinct instances proceed independently.
type FileStore struct { // contains filtered or unexported fields}func NewFileStore
Section titled “func NewFileStore”func NewFileStore(dir string, opts ...FileStoreOption) (*FileStore, error)NewFileStore opens (creating if absent) a FileStore rooted at dir. Existing instance subdirectories under dir are discovered lazily on first access and reconstructed from their files, so reopening over a populated directory carries every recorded instance forward. Construction is configured through functional options; with none supplied it returns a ready store.
func (*FileStore) Append
Section titled “func (*FileStore) Append”func (s *FileStore) Append(_ context.Context, id InstanceID, rec Record, opts ...AppendOption) (int64, error)Append implements Store. It is atomic and idempotent per (id, key), durably writing a complete journal line before returning.
func (*FileStore) Checkpoint
Section titled “func (*FileStore) Checkpoint”func (s *FileStore) Checkpoint(_ context.Context, id InstanceID, snapshot []byte, throughStep int, opts ...CheckpointOption) errorCheckpoint implements Store. It atomically persists snapshot at throughStep (temp+rename) and compacts the on-disk journal to only the post-checkpoint tail. throughStep must advance beyond the current checkpoint.
func (*FileStore) Dispatched
Section titled “func (*FileStore) Dispatched”func (s *FileStore) Dispatched(_ context.Context, id InstanceID) (map[string]bool, error)Dispatched returns the set of effect ids already applied for the instance as a membership map. An instance never written reports an empty (non-nil) set. It satisfies the DispatchStore seam.
func (*FileStore) Load
Section titled “func (*FileStore) Load”func (s *FileStore) Load(_ context.Context, id InstanceID) ([]byte, []Record, error)Load implements Store. It returns the latest checkpoint plus the post- checkpoint Record tail in Step order, or ErrInstanceNotFound.
func (*FileStore) MarkDispatched
Section titled “func (*FileStore) MarkDispatched”func (s *FileStore) MarkDispatched(_ context.Context, id InstanceID, effectIDs ...string) errorMarkDispatched records that the effects named by effectIDs have been applied for the instance, appending each new id durably. It is idempotent and satisfies the DispatchStore seam.
type FileStoreOption
Section titled “type FileStoreOption”FileStoreOption configures FileStore construction. It keeps NewFileStore extensible — new construction-time knobs (on-disk encoding, sync policy, retention) arrive as additive options rather than new positional parameters.
type FileStoreOption func(*fileStoreConfig)type Handle
Section titled “type Handle”Handle is a live durable instance: the recovered or freshly started kernel Instance bound to its Runner and InstanceID, so subsequent Fires continue to record. It owns the instance’s delayed-transition scheduler and the recording clock that journals every reading the scheduler consumes, so timer-driven transitions are durable and replay wall-clock-independent. Obtain a Handle from Runner.Start or Recover.
type Handle[S comparable, E comparable, C any] struct { // contains filtered or unexported fields}func Recover
Section titled “func Recover”func Recover[S comparable, E comparable, C any](ctx context.Context, m *state.Machine[S, E, C], st Store, id InstanceID, opts ...Option[S, E, C]) (*Handle[S, E, C], error)Recover reconstructs a durable instance purely from the Store: it loads the latest checkpoint Snapshot and the journal/effect tail after it, Restores the snapshot (firing nothing, no IO), and replays the tail’s recorded driving events through the kernel to reach the instance’s live state. The returned Handle continues recording subsequent Fires. Recover reports ErrInstanceNotFound for an instance that was never started.
func (*Handle[S, E, C]) ActorRef
Section titled “func (*Handle[S, E, C]) ActorRef”func (h *Handle[S, E, C]) ActorRef(id string) (state.ActorRef, bool)ActorRef returns the ActorRef for the running actor under id, and whether such an actor is running, so a host can address a spawned child for DeliverToActor. It is a thin pass-through to the underlying ActorSystem; a handle with no actor palette wired runs no actors and always reports false.
func (*Handle[S, E, C]) DeliverToActor
Section titled “func (*Handle[S, E, C]) DeliverToActor”func (h *Handle[S, E, C]) DeliverToActor(ctx context.Context, ref state.ActorRef, event any) (bool, error)DeliverToActor routes event into the running actor identified by ref exactly once, runs the actor (settling it when it reaches its final state), records each parent transition the delivery drove, and persists the produced step. The actor behavior is resolved and run against the palette supplied with WithActorPalette; the done-data, error, or parent-driving message it routes is journaled as a JournalActorMessage correlated by the actor id so recovery replays it without re-instantiating the actor.
It returns whether the actor was found running (false for a ref that names no live actor — already settled, stopped, or never spawned), and a persistence failure as the error. The kernel transitions the delivery drove do not themselves fail.
Example
ExampleHandle_DeliverToActor shows the actor record/replay seam: a child shipping actor runs exactly once on the live path (its done-data recorded), and on recovery the recorded done-data is replayed back through the same parent onDone without re-running the actor — so the recovered instance reaches the same state and context, and the actor’s run count never advances past one.
package main
import ( "context" "fmt"
"github.com/stablekernel/crucible/durable" "github.com/stablekernel/crucible/state")
// fulfillmentCtx is a small JSON-marshalable context for the actor example.type fulfillmentCtx struct { Tracking string `json:"tracking"`}
// fulfillmentMachine supervises a child shipping actor: it spawns the actor on// entering supervising and folds the actor's done-data (a tracking number) into// context on its onDone.func fulfillmentMachine() *state.Machine[string, string, *fulfillmentCtx] { return state.ForgeFor[*fulfillmentCtx]("fulfillment"). Reducer("track", func(in state.AssignCtx[*fulfillmentCtx]) *fulfillmentCtx { c := in.Entity if tr, ok := in.Event.(string); ok { c.Tracking = tr } return c }). Actor("ship"). State("supervising").InvokeActor("ship", state.WithInvokeOnDone("shipped"), state.WithInvokeOnError("failed")). State("complete").Final(). State("aborted").Final(). Initial("supervising"). Transition("supervising").On("shipped").GoTo("complete").Assign("track"). Transition("supervising").On("failed").GoTo("aborted"). Quench()}
func main() { ctx := context.Background() var runs int palette := map[string]state.ActorBehavior{ "ship": func(map[string]any) (state.ActorInstance, error) { child := state.ForgeFor[*fulfillmentCtx]("shipper"). State("packing"). State("shipped").Final(). Initial("packing"). Transition("packing").On("dispatch").GoTo("shipped"). Quench() inst := child.Cast(&fulfillmentCtx{}, state.WithInitialState[string]("packing")) return state.NewActor(inst, func(*state.Instance[string, string, *fulfillmentCtx]) any { runs++ return fmt.Sprintf("TRK-%d", runs) }), nil }, } m := fulfillmentMachine() store := durable.NewMemStore() const id = durable.InstanceID("ship-1")
runner := durable.NewRunner(m, store, durable.WithActorPalette[string, string, *fulfillmentCtx](palette)) h, err := runner.Start(ctx, id, &fulfillmentCtx{}, state.WithInitialState("supervising")) if err != nil { panic(err) } // Drive the shipping actor to completion: it runs once and routes its tracking // number through the parent onDone. ref, ok := h.ActorRef(state.ActorID("fulfillment", "supervising", 0)) if !ok { panic("no actor ref") } if _, err = h.DeliverToActor(ctx, ref, "dispatch"); err != nil { panic(err) }
// Recover: the recorded done-data is replayed; the actor is not run again. recovered, err := durable.Recover(ctx, m, store, id, durable.WithActorPalette[string, string, *fulfillmentCtx](palette)) if err != nil { panic(err) } snap := recovered.Instance().Snapshot() fmt.Println("recovered state:", snap.Current) fmt.Println("recovered tracking:", snap.Context.Tracking) fmt.Println("actor runs:", runs)}Output
Section titled “Output”recovered state: completerecovered tracking: TRK-1actor runs: 1func (*Handle[S, E, C]) DeliverToActorByID
Section titled “func (*Handle[S, E, C]) DeliverToActorByID”func (h *Handle[S, E, C]) DeliverToActorByID(ctx context.Context, id string, event any) (bool, error)DeliverToActorByID is DeliverToActor keyed by raw actor id, for a host that tracks ids rather than refs.
func (*Handle[S, E, C]) Fire
Section titled “func (*Handle[S, E, C]) Fire”func (h *Handle[S, E, C]) Fire(ctx context.Context, event E, opts ...state.FireOption) (state.FireResult[S], error)Fire drives one event through the Handle’s live instance and records the step: it Fires the kernel, appends a Record carrying the driving event at the produced Trace ordinal (write-ahead, before returning), and — when the checkpoint policy is due — persists a full Snapshot and compacts the tail. A kernel transition error is recorded as a no-op (no step was produced) and returned to the caller.
func (*Handle[S, E, C]) ID
Section titled “func (*Handle[S, E, C]) ID”func (h *Handle[S, E, C]) ID() InstanceIDID returns the InstanceID the Handle records under.
func (*Handle[S, E, C]) Instance
Section titled “func (*Handle[S, E, C]) Instance”func (h *Handle[S, E, C]) Instance() *state.Instance[S, E, C]Instance returns the underlying kernel Instance the Handle wraps, for reads such as Configuration, Snapshot, or Current. Drive it through the Handle’s Fire (or the Runner’s) so steps continue to be recorded; firing the bare Instance bypasses durability.
func (*Handle[S, E, C]) RunService
Section titled “func (*Handle[S, E, C]) RunService”func (h *Handle[S, E, C]) RunService(ctx context.Context, id string) (state.FireResult[S], bool, error)RunService runs the in-flight invoked service identified by id exactly once, records its outcome, and settles it — firing the invocation’s onDone (on success) or onError (on failure) event through the durable instance and recording the produced step. The service is resolved and executed against the registry supplied with WithServiceRegistry; its raw result or error is journaled as a JournalServiceResult correlated by id so recovery replays it without re-invoking the service.
It returns the routed FireResult and true, or the zero result and false when id names no in-flight service (already settled, stopped, or never started). A persistence failure is returned as the error; the kernel transition itself does not fail (a settled service always routes onDone or onError).
Example
ExampleHandle_RunService shows the invoked-service record/replay seam: a pricing service runs exactly once on the live path (its returned quote recorded), and on recovery the recorded quote is replayed back through the same settle seam without re-invoking the service — so the recovered instance reaches the same state and context, and the service’s call count never advances past one.
package main
import ( "context" "fmt"
"github.com/stablekernel/crucible/durable" "github.com/stablekernel/crucible/state")
// quoteCtx is a small JSON-marshalable context for the invoked-service example.type quoteCtx struct { Quote string `json:"quote"`}
// quoteMachine invokes a pricing service on entering quoting, folding the returned// quote into context on its onDone.func quoteMachine(fn state.ServiceFn[*quoteCtx]) *state.Machine[string, string, *quoteCtx] { return state.ForgeFor[*quoteCtx]("quote"). Service("price", fn). Reducer("save", func(in state.AssignCtx[*quoteCtx]) *quoteCtx { c := in.Entity if q, ok := in.Event.(string); ok { c.Quote = q } return c }). State("cart"). State("quoting").Invoke("price", state.WithInvokeOnDone("priced"), state.WithInvokeOnError("failed")). State("quoted").Final(). State("rejected").Final(). Initial("cart"). Transition("cart").On("checkout").GoTo("quoting"). Transition("quoting").On("priced").GoTo("quoted").Assign("save"). Transition("quoting").On("failed").GoTo("rejected"). Quench()}
func main() { ctx := context.Background() var calls int fn := func(context.Context, state.ServiceCtx[*quoteCtx]) (any, error) { calls++ return fmt.Sprintf("quote-%d", calls), nil } reg := state.NewRegistry[*quoteCtx]().Service("price", fn) m := quoteMachine(fn) store := durable.NewMemStore() const id = durable.InstanceID("quote-1")
runner := durable.NewRunner(m, store, durable.WithServiceRegistry[string, string, *quoteCtx](reg)) h, err := runner.Start(ctx, id, "eCtx{}, state.WithInitialState("cart")) if err != nil { panic(err) } if _, err = h.Fire(ctx, "checkout"); err != nil { panic(err) } // Run the pricing service: it executes once and routes its result through onDone. if _, _, err = h.RunService(ctx, state.InvokeID("quote", "quoting", 0)); err != nil { panic(err) }
// Recover: the recorded quote is replayed; the service is not run again. recovered, err := durable.Recover(ctx, m, store, id, durable.WithServiceRegistry[string, string, *quoteCtx](reg)) if err != nil { panic(err) } snap := recovered.Instance().Snapshot() fmt.Println("recovered state:", snap.Current) fmt.Println("recovered quote:", snap.Context.Quote) fmt.Println("service calls:", calls)}Output
Section titled “Output”recovered state: quotedrecovered quote: quote-1service calls: 1func (*Handle[S, E, C]) Tick
Section titled “func (*Handle[S, E, C]) Tick”func (h *Handle[S, E, C]) Tick(ctx context.Context) ([]state.FireResult[S], error)Tick advances the Handle’s delayed-transition scheduler: it fires every timer whose recorded deadline is at or before the recording clock’s current time, in due order, re-firing the delayed events through the durable instance and recording the clock readings the tick consumed. A host calls it from its own timer loop (with a real clock) or a test calls it after advancing a fake clock. It records one tick barrier — carrying the consumed clock readings and the count of timers fired — so recovery re-derives the same timers at their recorded instants. It returns the FireResults of the timers it fired, in order.
type HistoryStore
Section titled “type HistoryStore”HistoryStore is the optional time-travel seam: a Store that retains an instance’s start baseline snapshot and its full ordered Record log, so a reader can reconstruct the state as of any recorded step even after checkpoint compaction discarded that step from the live tail. It is additive, like DispatchStore — the core Store interface is unchanged; a backend opts in by implementing this method, and StateAt uses it when present, falling back to Load otherwise. The in-tree MemStore implements it when constructed WithHistory.
type HistoryStore interface { // History returns the instance's start baseline snapshot bytes (the BaselineStep // checkpoint, nil if the instance was never started through a Runner) together // with the full ordered Record log — every Record ever appended, in Step order, // including those a checkpoint later compacted out of the live tail. It reports // ErrInstanceNotFound for an instance that was never written. History(ctx context.Context, id InstanceID) (baseline []byte, records []Record, err error)}type InstanceID
Section titled “type InstanceID”InstanceID is the stable identity of one durable instance: the key under which a Store records and reconstructs that instance’s checkpoint and journal/effect tail. It is host-assigned and opaque to the Store.
type InstanceID stringtype MemStore
Section titled “type MemStore”MemStore satisfies Store and is safe for concurrent use by multiple goroutines.
type MemStore struct { // contains filtered or unexported fields}func NewMemStore
Section titled “func NewMemStore”func NewMemStore(opts ...MemStoreOption) *MemStoreNewMemStore returns an in-memory Store. Construction is configured through functional options (see WithInitialCapacity); with none supplied it returns a ready, empty store.
func (*MemStore) Append
Section titled “func (*MemStore) Append”func (s *MemStore) Append(_ context.Context, id InstanceID, rec Record, opts ...AppendOption) (int64, error)Append implements Store. It is atomic and idempotent per (id, key), where key is the Record Step by default or the WithIdempotencyKey value when supplied.
func (*MemStore) Checkpoint
Section titled “func (*MemStore) Checkpoint”func (s *MemStore) Checkpoint(_ context.Context, id InstanceID, snapshot []byte, throughStep int, opts ...CheckpointOption) errorCheckpoint implements Store. It persists snapshot as the instance’s checkpoint at throughStep and compacts the tail through that step. throughStep must advance beyond the current checkpoint. Time-travel retention is a store-level capability (NewMemStore with WithHistory), so the per-checkpoint CheckpointOptions do not change what Load returns here.
func (*MemStore) Dispatched
Section titled “func (*MemStore) Dispatched”func (s *MemStore) Dispatched(_ context.Context, id InstanceID) (map[string]bool, error)Dispatched returns the set of effect ids already applied for the instance, as a membership map. An instance never written reports an empty (non-nil) set rather than an error: the dedup query is a pure read of “what has landed”, orthogonal to whether any Record exists yet. It satisfies the DispatchStore seam.
func (*MemStore) History
Section titled “func (*MemStore) History”func (s *MemStore) History(_ context.Context, id InstanceID) ([]byte, []Record, error)History implements HistoryStore for a MemStore constructed WithHistory: it returns the instance’s start baseline snapshot and its full ordered Record log, including Records a checkpoint compacted out of the live tail, so a time-travel read can reconstruct any recorded step. Without WithHistory the baseline is nil and the returned log is the live (compacted) tail, sufficient only for steps at or after the latest checkpoint. It reports ErrInstanceNotFound for an unknown instance.
func (*MemStore) Load
Section titled “func (*MemStore) Load”func (s *MemStore) Load(_ context.Context, id InstanceID) ([]byte, []Record, error)Load implements Store. It returns the latest checkpoint plus the post- checkpoint Record tail in Step order, or ErrInstanceNotFound.
func (*MemStore) MarkDispatched
Section titled “func (*MemStore) MarkDispatched”func (s *MemStore) MarkDispatched(_ context.Context, id InstanceID, effectIDs ...string) errorMarkDispatched records that the effects named by effectIDs have been applied for the instance, so a subsequent (re)dispatch skips them. It is atomic and idempotent: re-marking an already-marked id is a no-op, and a partially marked batch is completed without error. It satisfies the DispatchStore seam.
type MemStoreOption
Section titled “type MemStoreOption”MemStoreOption configures MemStore construction. It keeps NewMemStore extensible — new construction-time knobs arrive as additive options rather than new positional parameters.
type MemStoreOption func(*memStoreConfig)func WithHistory
Section titled “func WithHistory”func WithHistory() MemStoreOptionWithHistory makes a MemStore retain each instance’s start baseline snapshot and full ordered Record log, even across checkpoint compaction, so the time-travel reader (StateAt) can reconstruct the instance’s state as of any recorded step. It implements the HistoryStore seam. The default discards compacted Records to bound memory; enable this for audit, debugging, or replay-inspection workloads.
func WithInitialCapacity
Section titled “func WithInitialCapacity”func WithInitialCapacity(steps int) MemStoreOptionWithInitialCapacity pre-sizes each instance’s Record buffer to the given number of steps, trading a little memory for fewer reallocations on instances with a known step count. A non-positive value is ignored.
type Option
Section titled “type Option”Option configures a Runner (and, through Recover, a one-shot reconstruction). It is generic over the machine’s state, event, and context types so an event codec can be type-safe.
type Option[S comparable, E comparable, C any] func(*runnerConfig[S, E, C])func WithActorPalette
Section titled “func WithActorPalette”func WithActorPalette[S comparable, E comparable, C any](palette map[string]state.ActorBehavior) Option[S, E, C]WithActorPalette binds the child-machine actor behaviors the Runner spawns and runs on the live path and resolves on recovery, keyed by the actor src name (the name passed to InvokeActor or the Spawn built-in). A durable actor’s behavior runs exactly once — live — and its done-data, error, or parent-driving message is recorded; on recovery the recorded result is replayed back through the same parent transition and the behavior is never re-instantiated, so the palette is consulted only to run an actor the first time. Supply it for any machine that spawns child actors; a machine that spawns none needs none. Each behavior is the actor-model analog of a ServiceFn, registered by the same src name the machine declares.
func WithCheckpointEvery
Section titled “func WithCheckpointEvery”func WithCheckpointEvery[S comparable, E comparable, C any](n int) Option[S, E, C]WithCheckpointEvery sets the checkpoint policy: the Runner persists a full Snapshot and compacts the journal tail every n fired steps, bounding both the stored tail and the replay length on recovery. A non-positive n disables periodic checkpointing (the default), so recovery replays the whole run from the start baseline.
func WithEffectHandler
Section titled “func WithEffectHandler”func WithEffectHandler[S comparable, E comparable, C any](h EffectHandler) Option[S, E, C]WithEffectHandler binds the seam the Runner calls to apply each emitted domain effect — an email, a charge, a published message: any at-most-once side effect a transition emits as a domain Effect value. A durable effect is applied exactly once over the whole lifetime of an instance (the live run plus any number of recoveries), deduped by a deterministic EffectID: the Runner stamps each emitted effect, write-ahead persists the step Record carrying those ids, then invokes the handler for every id not already marked dispatched, marking each as it succeeds. A handler error is surfaced (wrapped in ErrEffectDispatch) and the failing effect is left un-marked, so a later recovery retries it (at-least-once until it succeeds; exactly-once once it does).
The handler receives the stamped EffectID and the live effect value. It is nil by default: without it the Runner records effect ids but dispatches nothing, so an event-driven user is unaffected. Kernel driver effects (services, timers, actors) are NOT routed here — they are absorbed by the ServiceRunner, Scheduler, and ActorSystem; only domain effects reach the handler.
func WithEventCodec
Section titled “func WithEventCodec”func WithEventCodec[S comparable, E comparable, C any](codec EventCodec[E]) Option[S, E, C]WithEventCodec overrides the event codec the Runner uses to reconstruct a recorded event for replay, for events the default encoding/json codec cannot round-trip. The default codec decodes through encoding/json, the inverse of the kernel’s Trace.EventPayload marshaling.
func WithRunnerClock
Section titled “func WithRunnerClock”func WithRunnerClock[S comparable, E comparable, C any](c state.Clock) Option[S, E, C]WithRunnerClock injects the real time seam the Runner records on the live path and replays on recovery. A durable instance reads time only through its host scheduler (which arms and ticks delayed `after` transitions); the Runner wraps this clock in a recording clock so every reading is journaled, and substitutes a replay clock on recovery so timers fire at their recorded instants independent of the wall clock at recovery time. It defaults to state.SystemClock(); supply a deterministic fake (state.NewFakeClock) in a test.
func WithServiceRegistry
Section titled “func WithServiceRegistry”func WithServiceRegistry[S comparable, E comparable, C any](reg *state.Registry[C]) Option[S, E, C]WithServiceRegistry binds the invoked-service implementations the Runner runs on the live path and resolves on recovery. A durable service runs exactly once — live — and its result is recorded; on recovery the recorded result is replayed through the same settle seam and the service is never re-invoked, so the registry is consulted only to execute a service the first time. Supply it for any machine that declares `invoke` states; a purely event-driven or timer-driven machine needs none. The registry binds the same ServiceFns the machine declares, by name (state.NewRegistry().Service(name, fn)).
type Record
Section titled “type Record”Record is one durable append: the unit a Store persists for a single Fire step. It carries the nondeterministic results recorded during the step (Entries), the effects the step emitted (Effects, each carrying a stamped EffectID for idempotent dispatch), and optionally a full marshaled Snapshot checkpoint taken at this step. A Record is identified within its instance by its Step ordinal; appending the same Step twice is a no-op (see Store.Append).
type Record struct { // Step is the Fire ordinal this Record was produced at, indexing the // instance's recorded Traces. It is the per-instance idempotency key: a // Store treats a second Append at the same Step as already-applied. Step int
// Event is the structured, JSON form of the driving event the Fire at this // Step consumed — the kernel's Trace.EventPayload, captured verbatim. Replay // reconstructs the exact event from this payload and re-Fires it to advance // the restored instance one step, so the recorded run reaches byte-identical // state without re-deriving the event from its human-readable label. It is // empty for a Record that carries only a checkpoint or only nondeterministic // Entries (an event with no JSON form is omitted, matching the kernel's // additive EventPayload contract). Event []byte
// Entries are the nondeterministic results recorded during this step — the // invoked-service done/error payloads, actor messages, clock reads, and // randomness draws the kernel consumed — in the order they resolved. Replay // returns these verbatim instead of re-invoking their sources. Entries []state.JournalEntry
// Effects are the effects the step emitted, each with its deterministically // stamped EffectID, persisted before dispatch so a crash between persist and // dispatch is recoverable: Resume re-dispatches, deduped by EffectID. Effects []state.EffectEnvelope
// Snapshot, when non-empty, is a full marshaled state.Snapshot captured at // this step — a checkpoint the instance can be reconstructed from without // replaying the whole journal from the start. It is produced by // state.MarshalSnapshot and consumed by state.UnmarshalSnapshot; the Store // treats it as opaque bytes. Snapshot []byte
// Tick marks a scheduler-tick barrier rather than an externally fired step: a // Record produced when the durable Handle ticked its delayed-transition // scheduler, carrying the clock readings that tick consumed (in Entries) and // the count of timer steps it produced (TickSteps). Replay re-derives the // timers by re-ticking the recovered scheduler against those recorded // readings rather than re-firing the events directly, so each timer fires at // its recorded instant. It is false for externally fired steps, which carry a // driving Event instead. Tick bool
// TickSteps is the number of timer transitions the tick at this barrier fired // (zero for a tick that found nothing due). It lets replay advance its step // accounting across a re-derived tick without a recorded Event per timer. TickSteps int}type Runner
Section titled “type Runner”Runner is the durable wrapper around a state.Machine: it drives the kernel’s pure transition function while recording each step to a Store, so an instance can be checkpointed, crash, and resume by replaying the recorded driving events rather than re-deriving them.
A Runner is created with NewRunner and bound to one machine and one Store; it is safe to drive many instances (distinguished by InstanceID) through a single Runner. The recording model is write-ahead: every Fire persists its Record before returning, so a crash after a successful Fire never loses the step.
Record / replay model
Section titled “Record / replay model”For an event-driven machine every transition is a pure function of (configuration, context, event payload, machine definition). The only input a Runner must record to reproduce a run is therefore the driving event: each Fire appends a Record{Step, Event} where Step is the produced Trace ordinal and Event is the kernel’s structured Trace.EventPayload. Periodically — governed by the checkpoint policy (WithCheckpointEvery) — the Runner also persists a full marshaled Snapshot and compacts the journal tail through that step, so recovery replays only the tail after the latest checkpoint rather than the whole run.
The first nondeterministic source recorded as Record.Entries is the clock: a Runner owns each instance’s delayed-transition scheduler and wraps the clock (WithRunnerClock) so every reading the scheduler consumes — arming a timer’s deadline or testing dueness — is journaled and returned verbatim on recovery, making timer-driven transitions durable and replay wall-clock-independent.
Invoked services are the second recorded seam: a service runs exactly once on the live path (Handle.RunService, against the registry supplied with WithServiceRegistry) and its result is journaled as a JournalServiceResult; on recovery the recorded result is replayed back through the kernel’s settle seam, so the service is never re-invoked and the same onDone / onError event re-fires with the same data.
Child-machine actors are the third recorded seam: an actor’s behavior runs exactly once on the live path (Handle.DeliverToActor, against the palette supplied with WithActorPalette) and each parent transition the delivery drives — the actor’s onDone / onError, or a message it sends — is journaled as a JournalActorMessage; on recovery the recorded transition is replayed by re-firing the parent directly with the recorded done-data, so the actor behavior is never re-instantiated. A purely event-driven machine records no Entries.
type Runner[S comparable, E comparable, C any] struct { // contains filtered or unexported fields}Example
ExampleRunner shows the durable record/replay loop: start an instance, fire a sequence of events through the Runner (each step recorded write-ahead), then recover a fresh instance purely from the Store and observe it reaches the same state and context the live run did.
package main
import ( "context" "fmt"
"github.com/stablekernel/crucible/durable" "github.com/stablekernel/crucible/state")
// orderCtx is a small JSON-marshalable context for the Runner example.type orderCtx struct { Charges int `json:"charges"`}
// orderMachine is a flat event-driven order machine: pending -> charged -> done.func orderMachine() *state.Machine[string, string, *orderCtx] { return state.ForgeFor[*orderCtx]("order"). Action("charge", func(c state.ActionCtx[*orderCtx]) (state.Effect, error) { c.Entity.Charges++ return nil, nil }). State("pending"). State("charged"). State("done").Final(). Initial("pending"). Transition("pending").On("pay").GoTo("charged").Do("charge"). Transition("charged").On("ship").GoTo("done"). Quench()}
func main() { ctx := context.Background() m := orderMachine() store := durable.NewMemStore() const id = durable.InstanceID("order-7")
// Start records a baseline checkpoint; each Fire appends the driving event, // and the checkpoint policy compacts the tail every two steps. runner := durable.NewRunner(m, store, durable.WithCheckpointEvery[string, string, *orderCtx](2)) if _, err := runner.Start(ctx, id, &orderCtx{}, state.WithInitialState("pending")); err != nil { panic(err) } for _, ev := range []string{"pay", "ship"} { if _, err := runner.Fire(ctx, id, ev); err != nil { panic(err) } }
// Recover reconstructs the instance from the Store alone — Load the checkpoint, // Restore it, replay the recorded tail. recovered, err := durable.Recover(ctx, m, store, id) if err != nil { panic(err) } snap := recovered.Instance().Snapshot()
fmt.Println("recovered state:", snap.Current) fmt.Println("recovered charges:", snap.Context.Charges)}Output
Section titled “Output”recovered state: donerecovered charges: 1Example (Durable Timer)
ExampleRunner_durableTimer shows a time-dependent machine recorded and replayed through the clock seam: the live run arms a one-hour reminder and fires it by advancing a fake clock; recovery on a different wall-clock baseline replays the recorded clock readings, so the reminder fires at its recorded instant and the recovered instance matches the live one — wall-clock-independent.
package main
import ( "context" "fmt" "time"
"github.com/stablekernel/crucible/durable" "github.com/stablekernel/crucible/state")
// timerOrderCtx is a small JSON-marshalable context for the durable-timer example.type timerOrderCtx struct { Reminded bool `json:"reminded"`}
// reminderMachine sends a reminder a fixed delay after an order is placed, driven// by a delayed (`after`) transition.func reminderMachine() *state.Machine[string, string, *timerOrderCtx] { return state.ForgeFor[*timerOrderCtx]("reminder"). Action("remind", func(c state.ActionCtx[*timerOrderCtx]) (state.Effect, error) { c.Entity.Reminded = true return nil, nil }). State("new"). State("waiting"). State("reminded").Final(). Initial("new"). Transition("new").On("place").GoTo("waiting"). Transition("waiting").After(time.Hour).On("due").GoTo("reminded").Do("remind"). Quench()}
func main() { ctx := context.Background() m := reminderMachine() store := durable.NewMemStore() const id = durable.InstanceID("order-9")
start := time.Date(2025, 6, 1, 9, 0, 0, 0, time.UTC) clk := state.NewFakeClock(start) runner := durable.NewRunner(m, store, durable.WithRunnerClock[string, string, *timerOrderCtx](clk)) h, err := runner.Start(ctx, id, &timerOrderCtx{}, state.WithInitialState("new")) if err != nil { panic(err) } if _, err = h.Fire(ctx, "place"); err != nil { panic(err) } clk.Advance(2 * time.Hour) // past the one-hour reminder deadline if _, err = h.Tick(ctx); err != nil { panic(err) }
// Recover on a wall clock days later: the reminder still fired at its recorded // instant, so the recovered instance reaches the same state. recovered, err := durable.Recover(ctx, m, store, id, durable.WithRunnerClock[string, string, *timerOrderCtx](state.NewFakeClock(start.Add(72*time.Hour)))) if err != nil { panic(err) } snap := recovered.Instance().Snapshot() fmt.Println("recovered state:", snap.Current) fmt.Println("reminded:", snap.Context.Reminded)}Output
Section titled “Output”recovered state: remindedreminded: truefunc NewRunner
Section titled “func NewRunner”func NewRunner[S comparable, E comparable, C any](m *state.Machine[S, E, C], st Store, opts ...Option[S, E, C]) *Runner[S, E, C]NewRunner binds a machine and a Store into a durable Runner. Behavior is tuned with functional options — the checkpoint policy (WithCheckpointEvery) and the event codec (WithEventCodec) — each additive and defaulting to a working baseline (no periodic checkpoint, JSON event encoding).
func (*Runner[S, E, C]) Fire
Section titled “func (*Runner[S, E, C]) Fire”func (r *Runner[S, E, C]) Fire(ctx context.Context, id InstanceID, event E, opts ...state.FireOption) (state.FireResult[S], error)Fire drives one event through a durable instance identified by id, loading and replaying it from the Store first, then recording the step. It is the stateless entry point (no Handle required); for a hot path that fires many events in sequence, hold a Handle from Start or Recover and use Handle.Fire to avoid reloading between steps.
func (*Runner[S, E, C]) Start
Section titled “func (*Runner[S, E, C]) Start”func (r *Runner[S, E, C]) Start(ctx context.Context, id InstanceID, input C, opts ...state.CastOption[S]) (*Handle[S, E, C], error)Start creates and registers a fresh durable instance: it casts the machine on input, persists a baseline checkpoint so the instance is loadable from the first step, and returns a live Handle. Cast options (for example state.WithInitialState) configure the initial configuration. Starting an InstanceID that already exists in the Store reports ErrInstanceExists rather than clobbering its recorded baseline.
type Store
Section titled “type Store”Store is the durable-execution persistence seam. It records an instance’s per-step Records and periodic Snapshot checkpoints, and reconstructs the instance from the latest checkpoint plus the tail of Records appended after it. The in-memory MemStore is the reference implementation; persistent backends implement the same contract.
Contract
Section titled “Contract”Ordering: Records for an instance are totally ordered by Step. Append accepts Records in increasing Step order; Load returns the post-checkpoint tail in that same Step order. A Store preserves the relative order of Entries and Effects within each Record verbatim.
Idempotency: Append is idempotent per (InstanceID, Step). Appending a Step that is already present is a no-op — the stored Record is retained unchanged and the original append sequence is returned — so an at-least-once caller (a crash-and-retry between persist and acknowledge) never double-applies a step. The first writer of a Step wins. The idempotency record for a Step survives a Checkpoint that compacts that Step away, so a delayed retry of an already-checkpointed Step is still recognized as a no-op rather than rejected as out of order.
Atomicity: each Append and each Checkpoint is atomic. A concurrent Load never observes a partially written Record or a checkpoint torn against its tail; it observes either the state before the call or the state fully after it.
Consistency: Load returns the most recent checkpoint Snapshot together with every Record whose Step is strictly greater than the checkpoint’s throughStep — the exact journal/effect tail needed to bring that checkpoint up to date. For an instance with no checkpoint, Snapshot is nil and the tail is the full Record history. For an unknown instance, Load reports ErrInstanceNotFound.
All methods are safe for concurrent use by multiple goroutines.
type Store interface { // Append persists rec for the instance and returns its monotonic per-instance // append sequence. Appending a Step already recorded for the instance is a // no-op that returns the existing sequence (idempotency). Records must be // appended in increasing Step order; an out-of-order Step is rejected with // ErrStepOutOfOrder. Append(ctx context.Context, id InstanceID, rec Record, opts ...AppendOption) (seq int64, err error)
// Load returns the instance's latest checkpoint Snapshot bytes (nil if the // instance has been appended to but never checkpointed) together with the // tail of Records appended after that checkpoint, in Step order. It reports // ErrInstanceNotFound for an instance that has never been written. Load(ctx context.Context, id InstanceID) (snapshot []byte, tail []Record, err error)
// Checkpoint persists snapshot as the instance's checkpoint at throughStep and // compacts the journal/effect tail through that step, so a subsequent Load // returns this Snapshot plus only the Records appended after throughStep. A // Checkpoint that does not advance throughStep beyond the current checkpoint // is rejected with ErrCheckpointNotAdvancing. Checkpoint(ctx context.Context, id InstanceID, snapshot []byte, throughStep int, opts ...CheckpointOption) error}type TimeTravelView
Section titled “type TimeTravelView”TimeTravelView is the read-only result of a StateAt reconstruction: the kernel Instance restored and replayed to the requested step, and the step it was reconstructed at. The Instance is detached — it shares no state with any live Handle and is safe to read (Snapshot, Current, Configuration) but not to drive (firing it records nothing and is not durable). Obtain one from StateAt.
type TimeTravelView[S comparable, E comparable, C any] struct { // contains filtered or unexported fields}func StateAt
Section titled “func StateAt”func StateAt[S comparable, E comparable, C any](ctx context.Context, m *state.Machine[S, E, C], st Store, id InstanceID, step int, opts ...Option[S, E, C]) (*TimeTravelView[S, E, C], error)StateAt reconstructs a durable instance’s state AS OF the recorded step, read-only: it restores the start baseline and replays the recorded driving events, service settlements, actor transitions, and clock reads forward up to step, returning a detached view of the instance at that point. It runs no service, re-instantiates no actor, reads no wall clock, and dispatches no domain effect — every result is a pure replay of recorded values — and it mutates neither the live instance nor the Store. The same functional options a Runner takes are accepted (the event codec, the service registry and actor palette needed to settle recorded results, the clock); a supplied effect handler is intentionally never invoked, since a historical read applies no side effect.
step must be in [BaselineStep, lastRecordedStep]; a step outside that range reports ErrStepOutOfRange, and an unknown instance reports ErrInstanceNotFound.
Example
ExampleStateAt reconstructs an instance’s state as of an earlier recorded step, read-only, without re-running any work or mutating the live instance. A history-retaining MemStore (WithHistory) keeps every Record so any step is reachable; Steps enumerates the recorded ordinals to read across.
package main
import ( "context" "fmt"
"github.com/stablekernel/crucible/durable" "github.com/stablekernel/crucible/state")
// ExampleStateAt reconstructs an instance's state as of an earlier recorded step,// read-only, without re-running any work or mutating the live instance. A// history-retaining MemStore (WithHistory) keeps every Record so any step is// reachable; Steps enumerates the recorded ordinals to read across.func main() { ctx := context.Background() m := state.ForgeFor[*auditCtx]("audit"). Action("bump", func(c state.ActionCtx[*auditCtx]) (state.Effect, error) { c.Entity.Count++ return nil, nil }). State("open"). State("closed").Final(). Initial("open"). Transition("open").On("tick").GoTo("open").Do("bump"). Transition("open").On("close").GoTo("closed").Do("bump"). Quench()
store := durable.NewMemStore(durable.WithHistory()) id := durable.InstanceID("ledger-1") runner := durable.NewRunner(m, store) h, err := runner.Start(ctx, id, &auditCtx{}, state.WithInitialState("open")) if err != nil { panic(err) } for _, ev := range []string{"tick", "tick", "close"} { if _, ferr := h.Fire(ctx, ev); ferr != nil { panic(ferr) } }
// Enumerate the recorded steps, then reconstruct the count at each one. steps, err := durable.Steps(ctx, store, id) if err != nil { panic(err) } for _, step := range steps { view, verr := durable.StateAt(ctx, m, store, id, step) if verr != nil { panic(verr) } fmt.Printf("step %d: state=%s count=%d\n", step, view.Snapshot().Current, view.Snapshot().Context.Count) }}
// auditCtx is a JSON-marshalable context counting the bumps a time-travel read// reconstructs at each step.type auditCtx struct { Count int `json:"count"`}Output
Section titled “Output”step 0: state=open count=1step 1: state=open count=2step 2: state=closed count=3func (*TimeTravelView[S, E, C]) Instance
Section titled “func (*TimeTravelView[S, E, C]) Instance”func (v *TimeTravelView[S, E, C]) Instance() *state.Instance[S, E, C]Instance returns the reconstructed kernel Instance, for reads such as Snapshot, Current, or Configuration. It is a detached read-only view; drive a live instance through a Handle from Recover instead.
func (*TimeTravelView[S, E, C]) Snapshot
Section titled “func (*TimeTravelView[S, E, C]) Snapshot”func (v *TimeTravelView[S, E, C]) Snapshot() state.Snapshot[S, E, C]Snapshot returns the reconstructed instance’s kernel Snapshot at the target step, a convenience over Instance().Snapshot() for the common audit read.
func (*TimeTravelView[S, E, C]) Step
Section titled “func (*TimeTravelView[S, E, C]) Step”func (v *TimeTravelView[S, E, C]) Step() intStep returns the recorded step the view was reconstructed at (BaselineStep for the start baseline).
Generated by gomarkdoc