transport
import "github.com/stablekernel/crucible/transport"Package transport is a gRPC network transport for the Crucible cluster runtime. It carries actor deliver and spawn operations between nodes over real gRPC (HTTP/2), implementing cluster.Transport on the client side and serving a node’s cluster.WireEndpoint on the server side.
It lives in its own module so the gRPC dependency stays out of the cluster core, which remains stdlib-only: a deployment that uses only the in-memory transport never compiles gRPC in. Payloads (events and spawn inputs) cross the wire as the JSON the WireEndpoint seam already produces and consumes, via a JSON gRPC codec — no protobuf schema or codegen is involved.
- func NewServer(ep cluster.WireEndpoint, opts …grpc.ServerOption) *grpc.Server
- func NewTimeTravelServer(ep TimeTravelEndpoint, opts …grpc.ServerOption) *grpc.Server
- func RegisterServer(gs grpc.ServiceRegistrar, ep cluster.WireEndpoint)
- func RegisterTimeTravel(gs grpc.ServiceRegistrar, ep TimeTravelEndpoint)
- type DeliverRequest
- type DeliverResponse
- type DurableTimeTravel
- type SpawnRequest
- type SpawnResponse
- type StateAtRequest
- type StateAtResponse
- type TimeTravelEndpoint
- type Transport
- func New() *Transport
- func (t *Transport) AddNode(node string, conn grpc.ClientConnInterface)
- func (t *Transport) Deliver(ctx context.Context, ref state.ActorRef, event any) (bool, error)
- func (t *Transport) Spawn(ctx context.Context, node, src, id string, input map[string]any) (state.ActorRef, error)
- func (t *Transport) StateAt(ctx context.Context, node, id string, step int) ([]byte, error)
func NewServer
Section titled “func NewServer”func NewServer(ep cluster.WireEndpoint, opts ...grpc.ServerOption) *grpc.ServerNewServer builds a gRPC server preconfigured with the JSON codec and the node’s transport service registered. The caller serves it on a listener and owns its lifecycle. Pass extra ServerOptions (interceptors, credentials, keepalives) as needed.
func NewTimeTravelServer
Section titled “func NewTimeTravelServer”func NewTimeTravelServer(ep TimeTravelEndpoint, opts ...grpc.ServerOption) *grpc.ServerNewTimeTravelServer builds a gRPC server preconfigured with the JSON codec and the time-travel service registered. The caller serves it and owns its lifecycle.
func RegisterServer
Section titled “func RegisterServer”func RegisterServer(gs grpc.ServiceRegistrar, ep cluster.WireEndpoint)RegisterServer registers a node’s cluster.WireEndpoint as the transport service on an existing gRPC server, so deliveries and spawns arriving over the wire are decoded into the node’s concrete types and applied to its local actor system.
func RegisterTimeTravel
Section titled “func RegisterTimeTravel”func RegisterTimeTravel(gs grpc.ServiceRegistrar, ep TimeTravelEndpoint)RegisterTimeTravel registers a node’s TimeTravelEndpoint as the time-travel service on an existing gRPC server, so remote nodes can reconstruct the past state of instances this node hosts.
type DeliverRequest
Section titled “type DeliverRequest”DeliverRequest carries a delivery to the actor named by Ref. Event is the JSON-encoded event the owning node decodes into its event type.
type DeliverRequest struct { Ref state.ActorRef `json:"ref"` Event []byte `json:"event,omitempty"`}type DeliverResponse
Section titled “type DeliverResponse”DeliverResponse reports whether the addressed actor accepted the event.
type DeliverResponse struct { Delivered bool `json:"delivered"`}type DurableTimeTravel
Section titled “type DurableTimeTravel”DurableTimeTravel adapts a durable Store and machine into a TimeTravelEndpoint by running durable.StateAt and marshaling the reconstructed snapshot. It is read-only: it runs no service, re-instantiates no actor, and dispatches no effect.
type DurableTimeTravel[S comparable, E comparable, C any] struct { // contains filtered or unexported fields}func NewDurableTimeTravel
Section titled “func NewDurableTimeTravel”func NewDurableTimeTravel[S comparable, E comparable, C any](m *state.Machine[S, E, C], store durable.Store) *DurableTimeTravel[S, E, C]NewDurableTimeTravel binds a machine and the durable Store its instances were recorded into. The Store should retain full history (a durable.HistoryStore) to reach arbitrary steps; otherwise reconstruction is limited to the latest checkpoint and tail.
func (*DurableTimeTravel[S, E, C]) StateAt
Section titled “func (*DurableTimeTravel[S, E, C]) StateAt”func (d *DurableTimeTravel[S, E, C]) StateAt(ctx context.Context, id string, step int) ([]byte, error)StateAt reconstructs the instance’s state as of step and marshals it.
type SpawnRequest
Section titled “type SpawnRequest”SpawnRequest asks the owning node to start an actor with ID from Src, with the JSON-encoded Input.
type SpawnRequest struct { Src string `json:"src"` ID string `json:"id"` Input []byte `json:"input,omitempty"`}type SpawnResponse
Section titled “type SpawnResponse”SpawnResponse returns a ref to the spawned actor.
type SpawnResponse struct { Ref state.ActorRef `json:"ref"`}type StateAtRequest
Section titled “type StateAtRequest”StateAtRequest asks for an instance’s reconstructed state as of a recorded step.
type StateAtRequest struct { ID string `json:"id"` Step int `json:"step"`}type StateAtResponse
Section titled “type StateAtResponse”StateAtResponse carries the reconstructed snapshot as marshaled JSON; the caller decodes it with state.UnmarshalSnapshot for its own (S, E, C).
type StateAtResponse struct { Snapshot []byte `json:"snapshot"`}type TimeTravelEndpoint
Section titled “type TimeTravelEndpoint”TimeTravelEndpoint serves read-only historical state reconstruction for durable instances a node hosts. DurableTimeTravel adapts a durable Store + machine into one; a node registers it with RegisterTimeTravel so other nodes can query an instance’s past state over the wire.
type TimeTravelEndpoint interface { // StateAt reconstructs the instance's state as of step and returns it as a // marshaled kernel snapshot. StateAt(ctx context.Context, id string, step int) (snapshot []byte, err error)}type Transport
Section titled “type Transport”Transport is the client side: it routes cluster operations to the gRPC server of the node that owns the target actor. It satisfies cluster.Transport. Register each reachable node’s client connection with AddNode. It is safe for concurrent use.
type Transport struct { // contains filtered or unexported fields}func New
Section titled “func New”func New() *TransportNew returns a Transport with no nodes; register reachable nodes with AddNode.
func (*Transport) AddNode
Section titled “func (*Transport) AddNode”func (t *Transport) AddNode(node string, conn grpc.ClientConnInterface)AddNode registers the client connection used to reach node. The caller dials the node (grpc.NewClient) and owns the connection’s lifecycle. Registering a node again replaces its connection.
func (*Transport) Deliver
Section titled “func (*Transport) Deliver”func (t *Transport) Deliver(ctx context.Context, ref state.ActorRef, event any) (bool, error)Deliver routes event to the actor named by ref on its owning node over gRPC. The event is JSON-encoded here and decoded into the owning node’s event type there.
func (*Transport) Spawn
Section titled “func (*Transport) Spawn”func (t *Transport) Spawn(ctx context.Context, node, src, id string, input map[string]any) (state.ActorRef, error)Spawn asks node to start an actor with id from src, passing input, over gRPC, and returns a ref to it.
func (*Transport) StateAt
Section titled “func (*Transport) StateAt”func (t *Transport) StateAt(ctx context.Context, node, id string, step int) ([]byte, error)StateAt asks node to reconstruct an instance’s state as of step and returns the marshaled snapshot, which the caller decodes with state.UnmarshalSnapshot for its own (S, E, C). It reuses the transport’s registered connections.
Generated by gomarkdoc