Skip to content

cluster

import "github.com/stablekernel/crucible/cluster"

Package cluster is the host-side distribution runtime for the Crucible state kernel. It lets a parent machine on one node address and drive a child-machine actor running on another node, supervises actor failures with restart and backoff strategies, and migrates a running instance between nodes — all over a pluggable transport, with the kernel left pure and stdlib-only.

The package is additive over the state kernel. It consumes the kernel’s already-reserved seams — the opaque ActorRef (whose Node locator names the owning host), the injectable ActorSystem host-driver, the Snapshot / Restore pair, and the typed ActorEscalation / EscalationHandler — without requiring any change to the kernel beyond the additive ActorRef.Node locator.

System wraps a local state.ActorSystem and a node identity into a distributed actor system. Delivery to a ref the local node owns (an empty Node, or a Node equal to this node) is delegated straight to the wrapped ActorSystem; delivery to a ref another node owns is routed over the Transport. A System with no Transport configured serves local actors transparently and reports ErrNoTransport for a remote ref, so the in-process projection of the actor model keeps working unchanged.

Transport is the seam that moves an actor operation (deliver, spawn) to the node that owns the target actor. It is host-supplied, so the kernel and this package’s core carry no network dependency: the in-tree InMemoryTransport drives multi-node tests and single-process development, and a real network transport (gRPC) lives behind the same interface.

Supervisor turns the kernel’s typed ActorEscalation into a per-source supervision policy. Each failed actor is routed to a Decision by the src it was spawned from: Escalate forwards the failure to a sink up the hierarchy, Stop contains it, Restart re-spawns the actor through a Respawner within a per-source budget, and Backoff defers the re-spawn behind an exponentially growing delay applied by the host through Tick. It plugs into the seam with ActorSystem.WithEscalationHandler.

Capture snapshots a running instance, its actor tree, and its machine definition into a wire-shippable Checkpoint; Restore rebuilds it on another node, resuming in place. Restore gates the move on schema compatibility — it diffs the source and target machine definitions with state/evolution and refuses a breaking target with ErrIncompatibleMigration — so an instance never resumes against a definition that would misread its persisted configuration.

ErrIncompatibleMigration is returned by Restore when the target node’s machine definition differs from the source’s in a backward-incompatible (breaking) way, so resuming the instance there would misinterpret its persisted configuration.

var ErrIncompatibleMigration = errors.New("cluster: target machine is a breaking change from the source")

ErrNoTransport is returned when a delivery targets a ref owned by another node but the System has no Transport configured to reach it. A System without a Transport still serves its local actors; it simply cannot reach remote ones.

var ErrNoTransport = errors.New("cluster: ref names a remote node but no transport is configured")

ErrNodeUnreachable is returned when a delivery targets a node the transport cannot reach: for the in-memory transport, a node that was never registered.

var ErrNodeUnreachable = errors.New("cluster: target node is unreachable")

func Restore[S comparable, E comparable, C any](ctx context.Context, cp Checkpoint, machine *state.Machine[S, E, C], opts ...RestoreOption) (*state.Instance[S, E, C], *state.ActorSystem[S, E, C], error)

Restore rebuilds a captured instance on machine and reconstructs its actor tree, gating the move on schema compatibility: if machine is a breaking change from the source definition the Checkpoint carries, it refuses with ErrIncompatibleMigration rather than resume an instance against a definition that would misread its state. An additive (or identical) target is allowed. It returns the resumed instance and its actor system; the instance is resumed in place (no entry actions re-run), and a host re-arms timers/services by absorbing the instance’s ResumeEffects through its drivers.

Checkpoint is a migratable capture of a running instance: its kernel snapshot, its actor tree, and the source machine’s IR so the target can gate the move on schema compatibility. Every field is JSON, so a Checkpoint ships over any transport as bytes.

type Checkpoint struct {
// Snapshot is the marshaled kernel Snapshot of the migrating instance.
Snapshot json.RawMessage `json:"snapshot"`
// Actors is the instance's actor tree as SnapshotActors produced it, keyed by
// actor id; empty when the instance runs no actors.
Actors map[string]json.RawMessage `json:"actors,omitempty"`
// MachineIR is the source machine's serialized definition, diffed against the
// target machine to gate the migration on backward compatibility.
MachineIR json.RawMessage `json:"machineIR"`
}

func Capture[S comparable, E comparable, C any](inst *state.Instance[S, E, C], sys *state.ActorSystem[S, E, C], machine *state.Machine[S, E, C]) (Checkpoint, error)

Capture snapshots a running instance, its actor tree, and its machine definition into a Checkpoint ready to ship to another node. It is a pure read: it neither fires the instance nor mutates any actor.

The instance snapshot and the actor-tree snapshot are read as two separate operations, not under one combined lock, so Capture does not by itself produce a point-in-time-consistent pair. The caller must ensure no Fire or actor delivery runs against the instance for the duration of the call — that is what “quiescent” means here. Captured under concurrent firing, the snapshot and the actor tree may reflect different instants and Restore could rebuild a tree that does not match the kernel configuration. Quiesce the instance (stop driving it) before Capture.

Decision is how a Supervisor reacts to a child actor’s escalated failure.

type Decision int

const (
// Escalate forwards the failure to the supervisor's escalation sink, propagating
// it up the supervision hierarchy. With no sink configured it leaves the failure
// as the kernel's recorded default. It is the zero value.
Escalate Decision = iota
// Stop contains the failure at this level: the failed actor stays down and the
// failure is not forwarded.
Stop
// Restart re-spawns the failed actor through the configured Respawner, bounded
// by the per-src restart budget set with WithRestart. When the budget is spent
// (or no Respawner is wired) the failure escalates instead. Configure it with
// WithRestart, not WithDecision.
Restart
// Backoff defers the restart: it schedules the re-spawn after an exponentially
// growing delay and the host applies due restarts via Tick, so a failing actor
// is not hammered with immediate restarts. Bounded by the per-src budget set
// with WithBackoff; on exhaustion the failure escalates. Configure it with
// WithBackoff.
Backoff
)

func (d Decision) String() string

String renders a Decision for diagnostics.

Endpoint is a node’s local endpoint as a transport sees it: it delivers events to, and spawns actors in, that node’s local ActorSystem. A *System satisfies it, so registering a node with an InMemoryTransport is just handing it that node’s System.

type Endpoint interface {
Deliver(ctx context.Context, ref state.ActorRef, event any) (bool, error)
SpawnLocal(ctx context.Context, src, id string, input map[string]any) (state.ActorRef, error)
}

HandledEscalation records one failure a Supervisor processed, for observability.

type HandledEscalation struct {
// ActorID is the registry id of the actor that failed.
ActorID string
// Src is the actor ref name the failed actor was spawned from.
Src string
// Decision is the strategy the supervisor applied for that src.
Decision Decision
// Err is the underlying failure that escalated.
Err error
}

InMemoryTransport routes deliveries between node-scoped Systems living in the same process, with no network involved. It is the reference Transport: it makes the multi-node actor model fully exercisable in tests and single-process development, and a real network transport implements the same Transport interface out of tree. It is safe for concurrent use.

type InMemoryTransport struct {
// contains filtered or unexported fields
}

func NewInMemoryTransport() *InMemoryTransport

NewInMemoryTransport returns an empty transport. Register each node’s System before routing to it.

func (t *InMemoryTransport) Deliver(ctx context.Context, ref state.ActorRef, event any) (bool, error)

Deliver routes event to the node that owns ref and delegates to that node’s local delivery. A ref naming an unregistered node returns ErrNodeUnreachable; a registered node that has no such actor returns (false, nil) — reached, but nothing to deliver to.

func (t *InMemoryTransport) Register(node string, e Endpoint)

Register wires node’s local endpoint into the transport, so an operation targeting node routes to e. Registering a node again replaces its endpoint.

func (t *InMemoryTransport) Spawn(ctx context.Context, node, src, id string, input map[string]any) (state.ActorRef, error)

Spawn routes a spawn request to node’s local endpoint, returning a ref to the new actor. A request for an unregistered node returns ErrNodeUnreachable.

Option configures a System. New capabilities arrive as additional options, so the constructor signature never breaks.

type Option func(*config)

func WithTransport(t Transport) Option

WithTransport supplies the Transport a System uses to reach actors on other nodes. Without it, a System serves only local actors and reports ErrNoTransport for a remote ref.

Respawner re-creates a failed actor in its local system, replacing the dead instance registered under id. *System satisfies it (via Respawn), so wiring restart is just handing the supervisor the System.

type Respawner interface {
Respawn(ctx context.Context, src, id string, input map[string]any) (state.ActorRef, error)
}

RestoreOption configures a Restore. New capabilities arrive as additional options, so the signature never breaks.

type RestoreOption func(*restoreConfig)

func WithActorBehaviors(behaviors map[string]state.ActorBehavior) RestoreOption

WithActorBehaviors registers the child-machine behaviors the target node binds before its actor tree is rebuilt, keyed by the src ref name — exactly the palette the source registered. An actor whose src is absent here is skipped.

Supervisor turns the kernel’s raw escalation seam into a per-src supervision policy: each child actor failure is routed to a Decision by the src it was spawned from, with a default for any src not listed. It is the host-side supervision-strategy layer over state.ActorEscalation / EscalationHandler — restart and backoff build on the same routing. A Supervisor is safe for concurrent use.

type Supervisor struct {
// contains filtered or unexported fields
}

func NewSupervisor(opts ...SupervisorOption) *Supervisor

NewSupervisor builds a Supervisor. Wire it into a system with ActorSystem.WithEscalationHandler(sup.Handle).

func (s *Supervisor) DecisionFor(src string) Decision

DecisionFor returns the decision configured for src, or the default.

func (s *Supervisor) Forget(actorID string)

Forget discards the supervisor’s per-actor restart bookkeeping for actorID: its spent-restart counter and any not-yet-applied backoff restart scheduled for it. A host calls it when an actor is permanently stopped (not restarted), so the supervisor’s restart map does not accumulate one entry per distinct actor id for the process lifetime under churn. Forgetting an unknown id is a no-op. After Forget a re-spawn of the same id starts a fresh restart budget, so call it only for a genuine teardown, never between a failure and its restart.

func (s *Supervisor) Handle(ctx context.Context, esc *state.ActorEscalation)

Handle is the state.EscalationHandler the Supervisor exposes: it applies the src’s decision — re-spawning for Restart (within budget), forwarding to the sink for Escalate, or containing it for Stop — and records the decision it actually applied. A Restart whose budget is spent or whose Respawner is missing escalates instead, and that fallthrough is what gets recorded. Wire it with ActorSystem.WithEscalationHandler(sup.Handle).

func (s *Supervisor) Handled() []HandledEscalation

Handled returns a snapshot of the failures this supervisor has processed, in order. The returned slice is a copy and safe to retain.

func (s *Supervisor) SetRespawner(r Respawner)

SetRespawner binds the Respawner a Restart decision re-spawns through, after construction. It is the ergonomic path when the System (the respawner) is built after the Supervisor, since the System’s ActorSystem is wired with sup.Handle.

func (s *Supervisor) Tick(ctx context.Context) int

Tick performs every scheduled backoff restart that is now due, re-spawning each through the Respawner, and returns how many it restarted. A host calls it from its own timer loop (or a test after advancing a fake clock); it is a no-op when nothing is due.

SupervisorOption configures a Supervisor. New strategies arrive as additional options, so the constructor signature never breaks.

type SupervisorOption func(*Supervisor)

func WithBackoff(src string, maxRestarts int, initial, maxDelay time.Duration, factor float64) SupervisorOption

WithBackoff sets the Backoff decision for failures of actors spawned from src: the actor is re-spawned up to maxRestarts times, the nth restart deferred by initial*factor^n (n counted from zero), capped at max. When the budget is spent the failure escalates. Backoff needs a Respawner (WithRespawner / SetRespawner) and reads time through the supervisor’s clock (WithClock, default the system clock); the host applies due restarts with Tick.

func WithClock(c state.Clock) SupervisorOption

WithClock sets the time source the Backoff decision schedules against. Without it a Supervisor uses the system clock; a test wires a fake clock to drive backoff deterministically.

func WithDecision(src string, d Decision) SupervisorOption

WithDecision sets the decision applied to failures of actors spawned from src.

func WithDefaultDecision(d Decision) SupervisorOption

WithDefaultDecision sets the decision applied to a failure whose src has no explicit decision. Without it the default is Escalate.

func WithEscalationSink(h state.EscalationHandler) SupervisorOption

WithEscalationSink sets where an Escalate decision forwards the failure — the next handler up the supervision hierarchy. Without it, an Escalate decision leaves the failure as the kernel’s recorded default.

func WithRespawner(r Respawner) SupervisorOption

WithRespawner wires the Respawner a Restart decision re-spawns through. The node’s *System is the usual respawner. SetRespawner does the same after construction, for the common case where the System is built after the Supervisor.

func WithRestart(src string, maxRestarts int) SupervisorOption

WithRestart sets the Restart decision for failures of actors spawned from src, re-spawning the actor up to maxRestarts times (counted per actor id). When the budget is spent the failure escalates instead, so a crash-looping actor cannot restart-storm. Restart needs a Respawner wired with WithRespawner or SetRespawner.

System is the distributed actor system for one node: a local state.ActorSystem wrapped with a node identity and an optional Transport. A holder delivers to an ActorRef without caring where the actor runs — the System delegates to the local ActorSystem when it owns the actor and routes over the Transport when another node does.

type System[S comparable, E comparable, C any] struct {
// contains filtered or unexported fields
}

func NewSystem[S comparable, E comparable, C any](node string, local *state.ActorSystem[S, E, C], opts ...Option) *System[S, E, C]

NewSystem wraps local into the distributed system for node. The node identifier is the value a remote ref carries in ActorRef.Node to address actors this System owns; it must be unique across the cluster. The local ActorSystem keeps driving this node’s actors exactly as before.

func (s *System[S, E, C]) Deliver(ctx context.Context, ref state.ActorRef, event any) (bool, error)

Deliver routes event to the actor named by ref. When this node owns the actor the event is delivered straight through the local ActorSystem; otherwise it is routed over the Transport to the owning node. It reports whether the actor accepted the event, and an error only for a transport-level failure reaching a remote node (ErrNoTransport when no transport is configured).

func (s *System[S, E, C]) DeliverByID(ctx context.Context, id string, event any) bool

DeliverByID delivers to a local actor by its registry id. It addresses only this node’s actors; use Deliver with a remote ref to reach another node.

func (s *System[S, E, C]) DeliverWire(ctx context.Context, ref state.ActorRef, eventJSON []byte) (bool, error)

DeliverWire decodes a JSON-encoded event into this system’s event type E and delivers it to the local actor named by ref. It is the receive half a network transport calls on the actor’s owning node; the sending node produced eventJSON with json.Marshal of the original event. An empty or null payload decodes to E’s zero value.

func (s *System[S, E, C]) Local() *state.ActorSystem[S, E, C]

Local returns the wrapped in-process ActorSystem, the escape hatch for the kernel-level driver operations (Absorb, Step, Register, snapshots) the System does not itself surface.

func (s *System[S, E, C]) Node() string

Node returns this system’s node identifier.

func (s *System[S, E, C]) Ref(id string) (state.ActorRef, bool)

Ref resolves a local actor id to its ref, reporting whether this node runs it.

func (s *System[S, E, C]) Respawn(ctx context.Context, src, id string, input map[string]any) (state.ActorRef, error)

Respawn replaces the actor registered under id with a fresh instance from src: it first tears down any existing actor with that id (a failed actor stays registered as done until removed), then spawns anew. It is the primitive a supervisor’s Restart decision drives, satisfying Respawner. Stopping a missing id is a no-op, so Respawn also works as a plain spawn.

func (s *System[S, E, C]) Running() int

Running reports how many actors this node currently runs.

func (s *System[S, E, C]) Spawn(ctx context.Context, node, src, id string, input map[string]any) (state.ActorRef, error)

Spawn starts an actor with the given id from the behavior registered under src, on node. When node is this node (or empty) the actor is spawned in the local ActorSystem; otherwise the spawn is routed over the Transport to node. It returns a ref to the new actor with its Node set so the caller can address it wherever it runs. A remote spawn with no Transport configured returns ErrNoTransport.

func (s *System[S, E, C]) SpawnLocal(ctx context.Context, src, id string, input map[string]any) (state.ActorRef, error)

SpawnLocal starts an actor with the given id from the behavior the local ActorSystem registered under src, passing input, and returns a ref to it stamped with this node so it is addressable from other nodes. It is the local half of Spawn and the operation a Transport invokes on the owning node. An error reports that the spawn did not start (for example, no behavior is registered under src).

func (s *System[S, E, C]) SpawnWire(ctx context.Context, src, id string, inputJSON []byte) (state.ActorRef, error)

SpawnWire decodes a JSON-encoded input map and spawns an actor with the given id from src in this node’s local system, returning a ref stamped with this node. It is the receive half of a network transport’s Spawn. An empty payload spawns with a nil input.

func (s *System[S, E, C]) Stop(ref state.ActorRef)

Stop tears down a local actor (and its children); stopping a ref this node does not own is a no-op, since teardown of a remote actor is the owning node’s job.

Transport carries actor operations to the node that owns the target actor. It is the host-supplied seam that keeps the kernel and this package’s core free of any network dependency: an in-memory transport drives tests, and a real network transport implements the same interface.

type Transport interface {
// Deliver routes event to the actor named by ref on ref's owning node and
// reports whether the actor accepted it. The node identifier to dial is
// ref.Node, which the implementation resolves to a concrete address. An error
// reports a transport-level failure (the node was unreachable, the wire call
// failed); a nil error with delivered=false means the node was reached but had
// no such running actor.
Deliver(ctx context.Context, ref state.ActorRef, event any) (delivered bool, err error)
// Spawn asks node to start an actor with the given id from the behavior its
// local system registered under src, passing input, and returns a ref to it
// (its Node set to node). An error reports a transport-level failure reaching
// node or a spawn that did not start on the far side.
Spawn(ctx context.Context, node, src, id string, input map[string]any) (state.ActorRef, error)
}

WireEndpoint is a node’s receive side as a network transport sees it: it accepts an operation whose payload arrived as JSON over the wire and applies it to the node’s local actor system, decoding the payload into the node’s own concrete types. A *System satisfies it, so a network transport serving a node just holds that node’s System behind this type-erased interface.

It is the network counterpart to the in-process Endpoint: where InMemoryTransport passes Go values between same-process systems unchanged, a network transport serializes the event/input on the sending node and hands the bytes to the owning node’s WireEndpoint, which decodes them into its event type E (or input map) and delivers locally. Decoding on the owning node is what lets the transport stay type-erased while the kernel keeps its concrete, typed events.

type WireEndpoint interface {
// DeliverWire decodes eventJSON into the node's event type and delivers it to
// the local actor named by ref, reporting whether the actor accepted it.
DeliverWire(ctx context.Context, ref state.ActorRef, eventJSON []byte) (bool, error)
// SpawnWire decodes inputJSON into the spawn input map and spawns an actor with
// the given id from src in the local system, returning a ref to it.
SpawnWire(ctx context.Context, src, id string, inputJSON []byte) (state.ActorRef, error)
}

Generated by gomarkdoc