Skip to content

transport

import "github.com/stablekernel/crucible/transport"

Package transport is a gRPC network transport for the Crucible cluster runtime. It carries actor deliver and spawn operations between nodes over real gRPC (HTTP/2), implementing cluster.Transport on the client side and serving a node’s cluster.WireEndpoint on the server side.

It lives in its own module so the gRPC dependency stays out of the cluster core, which remains stdlib-only: a deployment that uses only the in-memory transport never compiles gRPC in. Payloads (events and spawn inputs) cross the wire as the JSON the WireEndpoint seam already produces and consumes, via a JSON gRPC codec — no protobuf schema or codegen is involved.

func NewServer(ep cluster.WireEndpoint, opts ...grpc.ServerOption) *grpc.Server

NewServer builds a gRPC server preconfigured with the JSON codec and the node’s transport service registered. The caller serves it on a listener and owns its lifecycle. Pass extra ServerOptions (interceptors, credentials, keepalives) as needed.

func NewTimeTravelServer(ep TimeTravelEndpoint, opts ...grpc.ServerOption) *grpc.Server

NewTimeTravelServer builds a gRPC server preconfigured with the JSON codec and the time-travel service registered. The caller serves it and owns its lifecycle.

func RegisterServer(gs grpc.ServiceRegistrar, ep cluster.WireEndpoint)

RegisterServer registers a node’s cluster.WireEndpoint as the transport service on an existing gRPC server, so deliveries and spawns arriving over the wire are decoded into the node’s concrete types and applied to its local actor system.

func RegisterTimeTravel(gs grpc.ServiceRegistrar, ep TimeTravelEndpoint)

RegisterTimeTravel registers a node’s TimeTravelEndpoint as the time-travel service on an existing gRPC server, so remote nodes can reconstruct the past state of instances this node hosts.

DeliverRequest carries a delivery to the actor named by Ref. Event is the JSON-encoded event the owning node decodes into its event type.

type DeliverRequest struct {
Ref state.ActorRef `json:"ref"`
Event []byte `json:"event,omitempty"`
}

DeliverResponse reports whether the addressed actor accepted the event.

type DeliverResponse struct {
Delivered bool `json:"delivered"`
}

DurableTimeTravel adapts a durable Store and machine into a TimeTravelEndpoint by running durable.StateAt and marshaling the reconstructed snapshot. It is read-only: it runs no service, re-instantiates no actor, and dispatches no effect.

type DurableTimeTravel[S comparable, E comparable, C any] struct {
// contains filtered or unexported fields
}

func NewDurableTimeTravel[S comparable, E comparable, C any](m *state.Machine[S, E, C], store durable.Store) *DurableTimeTravel[S, E, C]

NewDurableTimeTravel binds a machine and the durable Store its instances were recorded into. The Store should retain full history (a durable.HistoryStore) to reach arbitrary steps; otherwise reconstruction is limited to the latest checkpoint and tail.

func (*DurableTimeTravel[S, E, C]) StateAt

Section titled “func (*DurableTimeTravel[S, E, C]) StateAt”
func (d *DurableTimeTravel[S, E, C]) StateAt(ctx context.Context, id string, step int) ([]byte, error)

StateAt reconstructs the instance’s state as of step and marshals it.

SpawnRequest asks the owning node to start an actor with ID from Src, with the JSON-encoded Input.

type SpawnRequest struct {
Src string `json:"src"`
ID string `json:"id"`
Input []byte `json:"input,omitempty"`
}

SpawnResponse returns a ref to the spawned actor.

type SpawnResponse struct {
Ref state.ActorRef `json:"ref"`
}

StateAtRequest asks for an instance’s reconstructed state as of a recorded step.

type StateAtRequest struct {
ID string `json:"id"`
Step int `json:"step"`
}

StateAtResponse carries the reconstructed snapshot as marshaled JSON; the caller decodes it with state.UnmarshalSnapshot for its own (S, E, C).

type StateAtResponse struct {
Snapshot []byte `json:"snapshot"`
}

TimeTravelEndpoint serves read-only historical state reconstruction for durable instances a node hosts. DurableTimeTravel adapts a durable Store + machine into one; a node registers it with RegisterTimeTravel so other nodes can query an instance’s past state over the wire.

type TimeTravelEndpoint interface {
// StateAt reconstructs the instance's state as of step and returns it as a
// marshaled kernel snapshot.
StateAt(ctx context.Context, id string, step int) (snapshot []byte, err error)
}

Transport is the client side: it routes cluster operations to the gRPC server of the node that owns the target actor. It satisfies cluster.Transport. Register each reachable node’s client connection with AddNode. It is safe for concurrent use.

type Transport struct {
// contains filtered or unexported fields
}

func New() *Transport

New returns a Transport with no nodes; register reachable nodes with AddNode.

func (t *Transport) AddNode(node string, conn grpc.ClientConnInterface)

AddNode registers the client connection used to reach node. The caller dials the node (grpc.NewClient) and owns the connection’s lifecycle. Registering a node again replaces its connection.

func (t *Transport) Deliver(ctx context.Context, ref state.ActorRef, event any) (bool, error)

Deliver routes event to the actor named by ref on its owning node over gRPC. The event is JSON-encoded here and decoded into the owning node’s event type there.

func (t *Transport) Spawn(ctx context.Context, node, src, id string, input map[string]any) (state.ActorRef, error)

Spawn asks node to start an actor with id from src, passing input, over gRPC, and returns a ref to it.

func (t *Transport) StateAt(ctx context.Context, node, id string, step int) ([]byte, error)

StateAt asks node to reconstruct an instance’s state as of step and returns the marshaled snapshot, which the caller decodes with state.UnmarshalSnapshot for its own (S, E, C). It reuses the transport’s registered connections.

Generated by gomarkdoc