Ingest, drive, emit
The getting started walkthrough fires a
machine by hand. In a real service the events arrive from a stream and the
transition’s effects leave for the outside world. This quickstart wires the three
seams into one loop: source consumes a message,
the message drives a state transition, and a
sink emits the effect. None of the three cores
imports another; the source/statemachine bridge
is the only thing that depends on all of them.
Install
Section titled “Install”go get github.com/stablekernel/crucible/sourcego get github.com/stablekernel/crucible/source/statemachinego get github.com/stablekernel/crucible/sinkgo get github.com/stablekernel/crucible/stateThe machine
Section titled “The machine”A toy turnstile that emits an Opened effect when it unlocks. The effect is pure
data; the machine performs no IO.
type Turnstile struct{ Coins int } // C
type Opened struct{ Coins int }
// State and event identifiers are plain strings, so ForgeFor fixes S and E to// string and leaves only the context type to spell.machine := state.ForgeFor[Turnstile]("turnstile"). // An action returns an effect (pure data) for the transition to emit. Action("announceOpen", func(a state.ActionCtx[Turnstile]) (state.Effect, error) { return Opened{Coins: a.Entity.Coins}, nil }). Initial("Locked"). Transition("Locked").On("Coin").GoTo("Unlocked").Do("announceOpen"). Quench()Wire the loop
Section titled “Wire the loop”statemachine.Drive binds the consume loop to the machine. A Router resolves
each message to an instance key and the event to fire; the bridge loads the
instance through a Store, fires the event, hands the emitted effects to the
configured Sink, persists the new state, and only then acks. Here the Sink is
a sink.Manifold fanning the effect out to its outlets.
// Egress: a Manifold that fans each emitted effect out to its outlets.manifold := sink.NewManifold(sink.WithOutlets(sink.OutletFunc( func(ctx context.Context, payload any) error { log.Printf("opened: %+v", payload) // a real outlet writes SQL, Dynamo, a webhook return nil },)))
// Durable instance state for the bridge (in-memory for a single process).store := statemachine.NewMemStore[string, string, string, Turnstile]()
// Route a message to its instance key and the event to fire.router := func(m source.Message) (string, string, error) { return m.Headers().Get("turnstile-id"), "Coin", nil}
handler := statemachine.Drive(machine, store, router, statemachine.WithSink(statemachine.SinkFunc( func(ctx context.Context, effect any) error { manifold.Sink(ctx, effect) // fire-and-forget fan-out return nil }, )),)Run it
Section titled “Run it”Open a subscription on any inlet (Kafka,
JetStream, or the in-memory test source) and hand the bridge handler to
Receive. The engine runs the consume loop until the context is cancelled.
sub, err := inlet.Subscribe(ctx, source.SubscribeConfig{Topic: "turnstile"})if err != nil { return err}// consume -> route -> Fire(Coin) -> emit Opened -> persist -> ackreturn sub.Receive(ctx, handler)That is the whole loop:
flowchart LR
K[(stream)] --> S{{source}}
S --> R[route by key]
R --> F["Fire(Coin)"]
F --> P[persist new state]
P --> A[ack]
F -.Opened effect.-> M{{sink Manifold}}
M --> O1[outlet]
M --> O2[outlet]
The ack is tied to the durable transition, so an at-least-once stream never applies an event twice and never acks an event it failed to persist. A redelivery of an already-applied message is a no-op ack, keyed on the machine’s own state version with no external dedup store. An event that is illegal for the current state is a guard rejection, classified as poison and routed to the DLQ, distinct from a transient error that retries.
- Driving a statechart from a stream: the bridge in full, including exactly-once consume-process-produce on Kafka.
- Fire-and-forget fan-out: the Manifold’s emit semantics, errors, and batching.
- Effects and purity: why the machine emits effects as data instead of performing IO.