sink
import "github.com/stablekernel/crucible/sink"Package sink is a fire-and-forget fan-out emitter: a service calls one Manifold.Sink and the payload fans out to every attached Outlet — SQL, DynamoDB, StatsD, a webhook, a log — without the call site knowing which destinations are wired.
Manifold and Outlets
Section titled “Manifold and Outlets”A Manifold holds a set of [Outlet]s and fans each payload out to all of them. Manifold.Sink is the only emit path and returns nothing: outlet failures are routed to the configured logger and the sink.failed metric, not back to the caller. A caller that needs confirmation for one critical destination holds that Outlet directly and calls its Sink, which returns an honest per-destination error. A Manifold is therefore not itself an Outlet; nest one in another with OutletFunc.
Op and Registry
Section titled “Op and Registry”A destination is usually an Emitter, which binds a typed client C to a Registry of Op values keyed by payload type. Register maps a concrete payload type to a transformer that builds the Op persisting it; an unregistered payload yields ErrUnregistered, which the Manifold treats as a silent skip. There is no package-global registry — every Registry is constructed and injected.
Reservoir and Poller
Section titled “Reservoir and Poller”Reservoir wraps an Outlet to buffer payloads and release them in batches, by size or on an interval, using an injected clock. Poller periodically samples state through a CollectFunc and sinks the results. Both take their clock as an option so tests are deterministic and sleep-free.
Observability
Section titled “Observability”The Manifold’s seams are first-class and have no-op defaults: a discarding log/slog.Logger, the no-op tracer and meter from github.com/stablekernel/crucible/telemetry. With real seams wired, each Sink starts a “sink.Sink” span (whose context is propagated to every Outlet, so a downstream span nests beneath it) and records the sink.sunk, sink.failed, sink.skipped, and sink.dropped counters plus the sink.batch_size and sink.flush_latency_ms histograms.
Stability
Section titled “Stability”Experimental (pre-v1). The API is feature-complete and intended to become v1.0.0 after cross-module review; until then it may change.
- Variables
- func RecordsOf[T any](b *Bucket) []T
- func Register[P any, D any](r *Registry[D], fn func(ctx context.Context, p P) D)
- type BatchOutlet
- type Bucket
- type CollectFunc
- type Emitter
- type EmitterOption
- type Error
- type Flusher
- type Manifold
- type Op
- type OpFunc
- type Option
- type Outlet
- type OutletFunc
- type Phase
- type Poller
- type PollerOption
- type Registry
- type ReservoirOption
- type Shutdowner
Variables
Section titled “Variables”ErrUnregistered reports that an outlet has no transformer registered for a payload’s concrete type. The Manifold treats it as a silent skip — not a failure — so attaching an outlet that only handles some payload types is normal and uncounted. Match it with errors.Is.
var ErrUnregistered = errors.New("sink: no transformer registered for payload type")func RecordsOf
Section titled “func RecordsOf”func RecordsOf[T any](b *Bucket) []TRecordsOf returns the recorded payloads whose concrete type is T, in arrival order. It is a free function because Go does not permit type parameters on methods.
func Register
Section titled “func Register”func Register[P any, D any](r *Registry[D], fn func(ctx context.Context, p P) D)Register binds payload type P to fn in r. It is a free function rather than a method because Go does not permit type parameters on methods; P is inferred from fn. Registering the same P again overwrites the prior transformer.
type BatchOutlet
Section titled “type BatchOutlet”BatchOutlet is an Outlet that can accept many payloads in one call. The Reservoir uses it on flush when the wrapped outlet supports it, falling back to a Sink loop otherwise.
type BatchOutlet interface { SinkBatch(ctx context.Context, payloads []any) error}type Bucket
Section titled “type Bucket”Bucket is an in-memory Outlet for tests: it records every payload it receives and never errors. Pair it with All or RecordsOf to assert on what a Manifold fanned out. It is safe for concurrent use.
type Bucket struct { // contains filtered or unexported fields}func NewBucket
Section titled “func NewBucket”func NewBucket() *BucketNewBucket returns an empty Bucket.
func (*Bucket) All
Section titled “func (*Bucket) All”func (b *Bucket) All() []anyAll returns a copy of the recorded payloads in arrival order.
func (*Bucket) Reset
Section titled “func (*Bucket) Reset”func (b *Bucket) Reset()Reset clears the recorded payloads.
func (*Bucket) Sink
Section titled “func (*Bucket) Sink”func (b *Bucket) Sink(_ context.Context, payload any) errorSink records payload and returns nil.
type CollectFunc
Section titled “type CollectFunc”CollectFunc samples some state and yields zero or more payloads by calling sink for each. It runs on the Poller’s interval; it must not block indefinitely.
type CollectFunc func(ctx context.Context, sink func(payload any))type Emitter
Section titled “type Emitter”Emitter binds a typed destination client C and a registry of Op[C] into an Outlet. Named destination packages (sql, dynamo, …) wrap it with a narrow client interface and a default registry; any consumer can also use it directly with their own client type, no new package required.
On Sink, the Emitter looks up the payload’s transformer, builds the Op, and applies it to the client. A lookup miss returns ErrUnregistered (which the Manifold treats as a silent skip); an Apply failure is wrapped as *Error with PhaseApply.
type Emitter[C any] struct { // contains filtered or unexported fields}func NewEmitter
Section titled “func NewEmitter”func NewEmitter[C any](client C, registry *Registry[Op[C]], opts ...EmitterOption) *Emitter[C]NewEmitter binds client and registry into an Emitter. The registry maps each payload type to the Op that persists it against C.
Example
package main
import ( "context" "fmt"
"github.com/stablekernel/crucible/sink")
// orderPlaced is a sample domain event a service might emit.type orderPlaced struct { ID string}
// inMemoryStore is a stand-in destination client.type inMemoryStore struct { saved []string}
func (s *inMemoryStore) save(id string) { s.saved = append(s.saved, id) }
func main() { store := &inMemoryStore{} reg := sink.NewRegistry[sink.Op[*inMemoryStore]]() sink.Register(reg, func(_ context.Context, o orderPlaced) sink.Op[*inMemoryStore] { return sink.OpFunc[*inMemoryStore](func(_ context.Context, s *inMemoryStore) error { s.save(o.ID) return nil }) })
emitter := sink.NewEmitter[*inMemoryStore](store, reg, sink.WithName("store")) _ = emitter.Sink(context.Background(), orderPlaced{ID: "B-1"})
fmt.Println(store.saved)}Output
Section titled “Output”[B-1]func (*Emitter[C]) Sink
Section titled “func (*Emitter[C]) Sink”func (e *Emitter[C]) Sink(ctx context.Context, payload any) errorSink looks up payload’s Op and applies it to the bound client.
type EmitterOption
Section titled “type EmitterOption”EmitterOption configures an Emitter.
type EmitterOption func(*emitterConfig)func WithName
Section titled “func WithName”func WithName(name string) EmitterOptionWithName sets the outlet name used in errors and logs. The default is the type name of the client C.
type Error
Section titled “type Error”Error wraps a destination failure with structured context: which outlet failed, during which Phase, for which payload type, and the underlying error. It is errors.Is / errors.As friendly via Unwrap; never match on its Error string.
type Error struct { // Outlet is the name of the outlet that failed. Outlet string // Phase is the stage of work that failed. Phase Phase // PayloadType is the concrete type name of the payload being sunk. PayloadType string // Err is the wrapped underlying error. Err error}func (*Error) Error
Section titled “func (*Error) Error”func (e *Error) Error() stringError implements error.
func (*Error) Unwrap
Section titled “func (*Error) Unwrap”func (e *Error) Unwrap() errorUnwrap returns the wrapped error so errors.Is / errors.As see through it.
type Flusher
Section titled “type Flusher”Flusher is an Outlet that buffers and can be forced to emit. Manifold.Flush calls Flush on every attached Flusher.
type Flusher interface { Flush(ctx context.Context) error}type Manifold
Section titled “type Manifold”Manifold fans a payload out to every attached Outlet, fire-and-forget. Sink is the only emit path and returns nothing: outlet failures are routed to the logger and the sink.failed counter, not back to the caller. A caller that needs per-destination confirmation holds that outlet directly and calls its Sink (see OutletFunc). The zero value is unusable; construct with NewManifold.
A Manifold is safe for concurrent Sink, Attach, Flush, and Shutdown.
type Manifold struct { // contains filtered or unexported fields}func NewManifold
Section titled “func NewManifold”func NewManifold(opts ...Option) *ManifoldNewManifold constructs a Manifold with the given options. With no options it is silent and untraced: a discarding logger, the no-op tracer, and the no-op meter.
func (*Manifold) Attach
Section titled “func (*Manifold) Attach”func (m *Manifold) Attach(outlets ...Outlet) *ManifoldAttach adds outlets to the Manifold and returns it for chaining. It is safe for concurrent use.
func (*Manifold) Close
Section titled “func (*Manifold) Close”func (m *Manifold) Close() errorClose implements io.Closer by calling Shutdown with a background context.
func (*Manifold) Flush
Section titled “func (*Manifold) Flush”func (m *Manifold) Flush(ctx context.Context) errorFlush forces every attached Flusher to emit its buffered payloads. Errors from all flushers are joined and returned.
func (*Manifold) Shutdown
Section titled “func (*Manifold) Shutdown”func (m *Manifold) Shutdown(ctx context.Context) errorShutdown flushes, then shuts down every attached Shutdowner, draining in-flight work within ctx’s deadline. Errors from the flush and from each shutdowner are joined and returned.
func (*Manifold) Sink
Section titled “func (*Manifold) Sink”func (m *Manifold) Sink(ctx context.Context, payload any)Sink fans payload out to every attached outlet, fire-and-forget. It starts a “sink.Sink” span and propagates the returned context to each Outlet.Sink, so a downstream span (an outlet’s own, or another module’s) nests under the emit span. A nil-skip (ErrUnregistered) is counted as skipped; any other error is logged at ERROR, recorded on the span, and counted as failed. Success is counted as sunk.
Example
package main
import ( "context" "fmt"
"github.com/stablekernel/crucible/sink")
// orderPlaced is a sample domain event a service might emit.type orderPlaced struct { ID string}
func main() { bucket := sink.NewBucket() m := sink.NewManifold(sink.WithOutlets(bucket))
m.Sink(context.Background(), orderPlaced{ID: "A-1"}) m.Sink(context.Background(), orderPlaced{ID: "A-2"})
for _, o := range sink.RecordsOf[orderPlaced](bucket) { fmt.Println(o.ID) }}Output
Section titled “Output”A-1A-2type Op
Section titled “type Op”Op is a unit of work against a typed destination client C. A destination package ships Op constructors covering its API surface (puts, updates, deletes, transactional and batch writes); the registry maps a payload type to the Op that persists it.
type Op[C any] interface { Apply(ctx context.Context, client C) error}type OpFunc
Section titled “type OpFunc”OpFunc adapts a plain function to an Op. It is the bring-your-own-logic escape hatch: any func with the right shape becomes an Op without a named type.
type OpFunc[C any] func(ctx context.Context, client C) errorfunc (OpFunc[C]) Apply
Section titled “func (OpFunc[C]) Apply”func (f OpFunc[C]) Apply(ctx context.Context, client C) errorApply calls the underlying function.
type Option
Section titled “type Option”Option configures a Manifold. Options are additive and have no-op defaults; a nil value passed to a With* option is ignored, leaving the default in place.
type Option func(*config)func WithLogger
Section titled “func WithLogger”func WithLogger(l *slog.Logger) OptionWithLogger sets the structured logger the Manifold writes outlet failures to. The default discards all records. A nil logger is ignored.
func WithMeter
Section titled “func WithMeter”func WithMeter(m telemetry.Meter) OptionWithMeter sets the meter the Manifold records its counters on. The default is telemetry.NopMeter(). A nil meter is ignored.
func WithOutlets
Section titled “func WithOutlets”func WithOutlets(outlets ...Outlet) OptionWithOutlets attaches outlets at construction time, equivalent to a subsequent Attach call. It is additive across repeated uses.
func WithTracer
Section titled “func WithTracer”func WithTracer(t telemetry.Tracer) OptionWithTracer sets the tracer the Manifold starts emit spans on. The default is telemetry.NopTracer(). A nil tracer is ignored.
type Outlet
Section titled “type Outlet”Outlet is a single destination a payload can be sunk to. Sink returns ErrUnregistered when the outlet has no transformer for the payload’s concrete type — a normal, silent skip the Manifold does not count as a failure. Any other error is a real failure the Manifold logs and meters; a caller that holds an outlet directly may also inspect it for per-destination confirmation.
Implementations must be safe for concurrent use: a Manifold fans a single payload out to every attached Outlet from the calling goroutine, and the same outlet may be shared across manifolds.
type Outlet interface { Sink(ctx context.Context, payload any) error}func Reservoir
Section titled “func Reservoir”func Reservoir(inner Outlet, opts ...ReservoirOption) OutletReservoir wraps inner so payloads are buffered and flushed in batches, by reaching the batch size or on the interval tick. The returned Outlet is also a Flusher and a Shutdowner; call Shutdown (or attach it to a Manifold) to stop the background loop and drain. Defaults: size 100, interval 5s, now = time.Now, meter = telemetry.NopMeter(), unbounded buffer. When inner is a BatchOutlet, flushes use SinkBatch; otherwise each buffered payload is sunk in turn.
Example
package main
import ( "context" "fmt"
"github.com/stablekernel/crucible/sink")
func main() { bucket := sink.NewBucket() // Buffer until three payloads accumulate, then release as a batch. batched := sink.Reservoir(bucket, sink.WithBatchSize(3), sink.WithBatchInterval(0))
for i := 1; i <= 3; i++ { _ = batched.Sink(context.Background(), i) }
fmt.Println(len(bucket.All()))}Output
Section titled “Output”3type OutletFunc
Section titled “type OutletFunc”OutletFunc adapts a plain function to an Outlet. It is also the escape hatch for nesting one Manifold inside another, since a Manifold is intentionally not itself an Outlet (its Sink is fire-and-forget and returns nothing):
parent.Attach(sink.OutletFunc(func(ctx context.Context, p any) error { child.Sink(ctx, p) return nil}))type OutletFunc func(ctx context.Context, payload any) errorfunc (OutletFunc) Sink
Section titled “func (OutletFunc) Sink”func (f OutletFunc) Sink(ctx context.Context, payload any) errorSink calls the underlying function.
type Phase
Section titled “type Phase”Phase names the stage of an outlet’s work that failed, for diagnostics.
type Phase stringconst ( // PhaseTransform marks a failure turning a payload into a destination // operation (a registry transformer returning an error-bearing op). PhaseTransform Phase = "transform" // PhaseApply marks a failure applying the operation to the destination // client (the write itself). PhaseApply Phase = "apply" // PhaseFlush marks a failure flushing a buffered outlet. PhaseFlush Phase = "flush")type Poller
Section titled “type Poller”Poller periodically runs a CollectFunc and sinks each yielded payload to a target Outlet (via SinkBatch when the target supports it). Construct with NewPoller; drive it with Start and stop it with Stop.
type Poller struct { // contains filtered or unexported fields}func NewPoller
Section titled “func NewPoller”func NewPoller(target Outlet, collect CollectFunc, opts ...PollerOption) *PollerNewPoller binds a target and a CollectFunc into a Poller. Defaults: interval 60s, now = time.Now.
func (*Poller) Start
Section titled “func (*Poller) Start”func (p *Poller) Start(ctx context.Context) *PollerStart launches the sampling loop and returns the Poller for chaining. It is idempotent: a second call while running is a no-op.
func (*Poller) Stop
Section titled “func (*Poller) Stop”func (p *Poller) Stop()Stop cancels the sampling loop and waits for it to exit. It is safe to call when not started and is idempotent.
type PollerOption
Section titled “type PollerOption”PollerOption configures a Poller.
type PollerOption func(*pollerConfig)func WithPollInterval
Section titled “func WithPollInterval”func WithPollInterval(d time.Duration) PollerOptionWithPollInterval sets the sampling period. The default is 60s.
func WithPollerClock
Section titled “func WithPollerClock”func WithPollerClock(now func() time.Time) PollerOptionWithPollerClock injects the clock the Poller reads, for deterministic tests. The default is time.Now. A nil clock is ignored.
type Registry
Section titled “type Registry”Registry maps a payload’s concrete type to a transformer producing a value of type D (typically Op[C] for some destination client C). There is no package-level registry and no init-time state: every Registry is constructed with NewRegistry and injected, so two registries never share entries. It is safe for concurrent Register and Lookup.
type Registry[D any] struct { // contains filtered or unexported fields}func NewRegistry
Section titled “func NewRegistry”func NewRegistry[D any]() *Registry[D]NewRegistry returns an empty Registry.
func (*Registry[D]) Lookup
Section titled “func (*Registry[D]) Lookup”func (r *Registry[D]) Lookup(payload any) (func(context.Context, any) D, bool)Lookup returns the transformer registered for payload’s concrete type and whether one was found. The returned func accepts payload as any and asserts it back to the registered concrete type internally.
type ReservoirOption
Section titled “type ReservoirOption”ReservoirOption configures a Reservoir. Options are additive with no-op defaults.
type ReservoirOption func(*reservoirConfig)func WithBatchInterval
Section titled “func WithBatchInterval”func WithBatchInterval(d time.Duration) ReservoirOptionWithBatchInterval sets the period of the background time-triggered flush. An interval <= 0 disables the background loop (size/manual flush only).
func WithBatchSize
Section titled “func WithBatchSize”func WithBatchSize(n int) ReservoirOptionWithBatchSize sets the buffered-payload count that triggers a flush. A size <= 0 disables size-triggered flushing (interval only).
func WithMaxBuffered
Section titled “func WithMaxBuffered”func WithMaxBuffered(n int) ReservoirOptionWithMaxBuffered caps the number of payloads held between flushes. Payloads arriving over the cap are dropped and counted on sink.dropped. The default (0) is unbounded.
func WithReservoirClock
Section titled “func WithReservoirClock”func WithReservoirClock(now func() time.Time) ReservoirOptionWithReservoirClock injects the clock the Reservoir reads to time flush latency, for deterministic tests. The default is time.Now. A nil clock is ignored.
func WithReservoirMeter
Section titled “func WithReservoirMeter”func WithReservoirMeter(m telemetry.Meter) ReservoirOptionWithReservoirMeter sets the meter the Reservoir records batch-size, flush latency, and drop counts on. The default is telemetry.NopMeter(). A nil meter is ignored.
type Shutdowner
Section titled “type Shutdowner”Shutdowner is an Outlet that holds resources (a background flush loop, a connection) to release. Manifold.Shutdown calls Shutdown on every attached Shutdowner after flushing, draining in-flight work within ctx’s deadline.
type Shutdowner interface { Shutdown(ctx context.Context) error}Generated by gomarkdoc