Skip to content

Ingest, drive, emit

The getting started walkthrough fires a machine by hand. In a real service the events arrive from a stream and the transition’s effects leave for the outside world. This quickstart wires the three seams into one loop: source consumes a message, the message drives a state transition, and a sink emits the effect. None of the three cores imports another; the source/statemachine bridge is the only thing that depends on all of them.

Terminal window
go get github.com/stablekernel/crucible/source
go get github.com/stablekernel/crucible/source/statemachine
go get github.com/stablekernel/crucible/sink
go get github.com/stablekernel/crucible/state

A toy turnstile that emits an Opened effect when it unlocks. The effect is pure data; the machine performs no IO.

type Turnstile struct{ Coins int } // C
type Opened struct{ Coins int }
// State and event identifiers are plain strings, so ForgeFor fixes S and E to
// string and leaves only the context type to spell.
machine := state.ForgeFor[Turnstile]("turnstile").
// An action returns an effect (pure data) for the transition to emit.
Action("announceOpen", func(a state.ActionCtx[Turnstile]) (state.Effect, error) {
return Opened{Coins: a.Entity.Coins}, nil
}).
Initial("Locked").
Transition("Locked").On("Coin").GoTo("Unlocked").Do("announceOpen").
Quench()

statemachine.Drive binds the consume loop to the machine. A Router resolves each message to an instance key and the event to fire; the bridge loads the instance through a Store, fires the event, hands the emitted effects to the configured Sink, persists the new state, and only then acks. Here the Sink is a sink.Manifold fanning the effect out to its outlets.

// Egress: a Manifold that fans each emitted effect out to its outlets.
manifold := sink.NewManifold(sink.WithOutlets(sink.OutletFunc(
func(ctx context.Context, payload any) error {
log.Printf("opened: %+v", payload) // a real outlet writes SQL, Dynamo, a webhook
return nil
},
)))
// Durable instance state for the bridge (in-memory for a single process).
store := statemachine.NewMemStore[string, string, string, Turnstile]()
// Route a message to its instance key and the event to fire.
router := func(m source.Message) (string, string, error) {
return m.Headers().Get("turnstile-id"), "Coin", nil
}
handler := statemachine.Drive(machine, store, router,
statemachine.WithSink(statemachine.SinkFunc(
func(ctx context.Context, effect any) error {
manifold.Sink(ctx, effect) // fire-and-forget fan-out
return nil
},
)),
)

Open a subscription on any inlet (Kafka, JetStream, or the in-memory test source) and hand the bridge handler to Receive. The engine runs the consume loop until the context is cancelled.

sub, err := inlet.Subscribe(ctx, source.SubscribeConfig{Topic: "turnstile"})
if err != nil {
return err
}
// consume -> route -> Fire(Coin) -> emit Opened -> persist -> ack
return sub.Receive(ctx, handler)

That is the whole loop:

flowchart LR
    K[(stream)] --> S{{source}}
    S --> R[route by key]
    R --> F["Fire(Coin)"]
    F --> P[persist new state]
    P --> A[ack]
    F -.Opened effect.-> M{{sink Manifold}}
    M --> O1[outlet]
    M --> O2[outlet]

The ack is tied to the durable transition, so an at-least-once stream never applies an event twice and never acks an event it failed to persist. A redelivery of an already-applied message is a no-op ack, keyed on the machine’s own state version with no external dedup store. An event that is illegal for the current state is a guard rejection, classified as poison and routed to the DLQ, distinct from a transient error that retries.