Skip to content

sink

import "github.com/stablekernel/crucible/sink"

Package sink is a fire-and-forget fan-out emitter: a service calls one Manifold.Sink and the payload fans out to every attached Outlet — SQL, DynamoDB, StatsD, a webhook, a log — without the call site knowing which destinations are wired.

A Manifold holds a set of [Outlet]s and fans each payload out to all of them. Manifold.Sink is the only emit path and returns nothing: outlet failures are routed to the configured logger and the sink.failed metric, not back to the caller. A caller that needs confirmation for one critical destination holds that Outlet directly and calls its Sink, which returns an honest per-destination error. A Manifold is therefore not itself an Outlet; nest one in another with OutletFunc.

A destination is usually an Emitter, which binds a typed client C to a Registry of Op values keyed by payload type. Register maps a concrete payload type to a transformer that builds the Op persisting it; an unregistered payload yields ErrUnregistered, which the Manifold treats as a silent skip. There is no package-global registry — every Registry is constructed and injected.

Reservoir wraps an Outlet to buffer payloads and release them in batches, by size or on an interval, using an injected clock. Poller periodically samples state through a CollectFunc and sinks the results. Both take their clock as an option so tests are deterministic and sleep-free.

The Manifold’s seams are first-class and have no-op defaults: a discarding log/slog.Logger, the no-op tracer and meter from github.com/stablekernel/crucible/telemetry. With real seams wired, each Sink starts a “sink.Sink” span (whose context is propagated to every Outlet, so a downstream span nests beneath it) and records the sink.sunk, sink.failed, sink.skipped, and sink.dropped counters plus the sink.batch_size and sink.flush_latency_ms histograms.

Experimental (pre-v1). The API is feature-complete and intended to become v1.0.0 after cross-module review; until then it may change.

ErrUnregistered reports that an outlet has no transformer registered for a payload’s concrete type. The Manifold treats it as a silent skip — not a failure — so attaching an outlet that only handles some payload types is normal and uncounted. Match it with errors.Is.

var ErrUnregistered = errors.New("sink: no transformer registered for payload type")

func RecordsOf[T any](b *Bucket) []T

RecordsOf returns the recorded payloads whose concrete type is T, in arrival order. It is a free function because Go does not permit type parameters on methods.

func Register[P any, D any](r *Registry[D], fn func(ctx context.Context, p P) D)

Register binds payload type P to fn in r. It is a free function rather than a method because Go does not permit type parameters on methods; P is inferred from fn. Registering the same P again overwrites the prior transformer.

BatchOutlet is an Outlet that can accept many payloads in one call. The Reservoir uses it on flush when the wrapped outlet supports it, falling back to a Sink loop otherwise.

type BatchOutlet interface {
SinkBatch(ctx context.Context, payloads []any) error
}

Bucket is an in-memory Outlet for tests: it records every payload it receives and never errors. Pair it with All or RecordsOf to assert on what a Manifold fanned out. It is safe for concurrent use.

type Bucket struct {
// contains filtered or unexported fields
}

func NewBucket() *Bucket

NewBucket returns an empty Bucket.

func (b *Bucket) All() []any

All returns a copy of the recorded payloads in arrival order.

func (b *Bucket) Reset()

Reset clears the recorded payloads.

func (b *Bucket) Sink(_ context.Context, payload any) error

Sink records payload and returns nil.

CollectFunc samples some state and yields zero or more payloads by calling sink for each. It runs on the Poller’s interval; it must not block indefinitely.

type CollectFunc func(ctx context.Context, sink func(payload any))

Emitter binds a typed destination client C and a registry of Op[C] into an Outlet. Named destination packages (sql, dynamo, …) wrap it with a narrow client interface and a default registry; any consumer can also use it directly with their own client type, no new package required.

On Sink, the Emitter looks up the payload’s transformer, builds the Op, and applies it to the client. A lookup miss returns ErrUnregistered (which the Manifold treats as a silent skip); an Apply failure is wrapped as *Error with PhaseApply.

type Emitter[C any] struct {
// contains filtered or unexported fields
}

func NewEmitter[C any](client C, registry *Registry[Op[C]], opts ...EmitterOption) *Emitter[C]

NewEmitter binds client and registry into an Emitter. The registry maps each payload type to the Op that persists it against C.

Example

package main
import (
"context"
"fmt"
"github.com/stablekernel/crucible/sink"
)
// orderPlaced is a sample domain event a service might emit.
type orderPlaced struct {
ID string
}
// inMemoryStore is a stand-in destination client.
type inMemoryStore struct {
saved []string
}
func (s *inMemoryStore) save(id string) { s.saved = append(s.saved, id) }
func main() {
store := &inMemoryStore{}
reg := sink.NewRegistry[sink.Op[*inMemoryStore]]()
sink.Register(reg, func(_ context.Context, o orderPlaced) sink.Op[*inMemoryStore] {
return sink.OpFunc[*inMemoryStore](func(_ context.Context, s *inMemoryStore) error {
s.save(o.ID)
return nil
})
})
emitter := sink.NewEmitter[*inMemoryStore](store, reg, sink.WithName("store"))
_ = emitter.Sink(context.Background(), orderPlaced{ID: "B-1"})
fmt.Println(store.saved)
}
[B-1]

func (e *Emitter[C]) Sink(ctx context.Context, payload any) error

Sink looks up payload’s Op and applies it to the bound client.

EmitterOption configures an Emitter.

type EmitterOption func(*emitterConfig)

func WithName(name string) EmitterOption

WithName sets the outlet name used in errors and logs. The default is the type name of the client C.

Error wraps a destination failure with structured context: which outlet failed, during which Phase, for which payload type, and the underlying error. It is errors.Is / errors.As friendly via Unwrap; never match on its Error string.

type Error struct {
// Outlet is the name of the outlet that failed.
Outlet string
// Phase is the stage of work that failed.
Phase Phase
// PayloadType is the concrete type name of the payload being sunk.
PayloadType string
// Err is the wrapped underlying error.
Err error
}

func (e *Error) Error() string

Error implements error.

func (e *Error) Unwrap() error

Unwrap returns the wrapped error so errors.Is / errors.As see through it.

Flusher is an Outlet that buffers and can be forced to emit. Manifold.Flush calls Flush on every attached Flusher.

type Flusher interface {
Flush(ctx context.Context) error
}

Manifold fans a payload out to every attached Outlet, fire-and-forget. Sink is the only emit path and returns nothing: outlet failures are routed to the logger and the sink.failed counter, not back to the caller. A caller that needs per-destination confirmation holds that outlet directly and calls its Sink (see OutletFunc). The zero value is unusable; construct with NewManifold.

A Manifold is safe for concurrent Sink, Attach, Flush, and Shutdown.

type Manifold struct {
// contains filtered or unexported fields
}

func NewManifold(opts ...Option) *Manifold

NewManifold constructs a Manifold with the given options. With no options it is silent and untraced: a discarding logger, the no-op tracer, and the no-op meter.

func (m *Manifold) Attach(outlets ...Outlet) *Manifold

Attach adds outlets to the Manifold and returns it for chaining. It is safe for concurrent use.

func (m *Manifold) Close() error

Close implements io.Closer by calling Shutdown with a background context.

func (m *Manifold) Flush(ctx context.Context) error

Flush forces every attached Flusher to emit its buffered payloads. Errors from all flushers are joined and returned.

func (m *Manifold) Shutdown(ctx context.Context) error

Shutdown flushes, then shuts down every attached Shutdowner, draining in-flight work within ctx’s deadline. Errors from the flush and from each shutdowner are joined and returned.

func (m *Manifold) Sink(ctx context.Context, payload any)

Sink fans payload out to every attached outlet, fire-and-forget. It starts a “sink.Sink” span and propagates the returned context to each Outlet.Sink, so a downstream span (an outlet’s own, or another module’s) nests under the emit span. A nil-skip (ErrUnregistered) is counted as skipped; any other error is logged at ERROR, recorded on the span, and counted as failed. Success is counted as sunk.

Example

package main
import (
"context"
"fmt"
"github.com/stablekernel/crucible/sink"
)
// orderPlaced is a sample domain event a service might emit.
type orderPlaced struct {
ID string
}
func main() {
bucket := sink.NewBucket()
m := sink.NewManifold(sink.WithOutlets(bucket))
m.Sink(context.Background(), orderPlaced{ID: "A-1"})
m.Sink(context.Background(), orderPlaced{ID: "A-2"})
for _, o := range sink.RecordsOf[orderPlaced](bucket) {
fmt.Println(o.ID)
}
}
A-1
A-2

Op is a unit of work against a typed destination client C. A destination package ships Op constructors covering its API surface (puts, updates, deletes, transactional and batch writes); the registry maps a payload type to the Op that persists it.

type Op[C any] interface {
Apply(ctx context.Context, client C) error
}

OpFunc adapts a plain function to an Op. It is the bring-your-own-logic escape hatch: any func with the right shape becomes an Op without a named type.

type OpFunc[C any] func(ctx context.Context, client C) error

func (f OpFunc[C]) Apply(ctx context.Context, client C) error

Apply calls the underlying function.

Option configures a Manifold. Options are additive and have no-op defaults; a nil value passed to a With* option is ignored, leaving the default in place.

type Option func(*config)

func WithLogger(l *slog.Logger) Option

WithLogger sets the structured logger the Manifold writes outlet failures to. The default discards all records. A nil logger is ignored.

func WithMeter(m telemetry.Meter) Option

WithMeter sets the meter the Manifold records its counters on. The default is telemetry.NopMeter(). A nil meter is ignored.

func WithOutlets(outlets ...Outlet) Option

WithOutlets attaches outlets at construction time, equivalent to a subsequent Attach call. It is additive across repeated uses.

func WithTracer(t telemetry.Tracer) Option

WithTracer sets the tracer the Manifold starts emit spans on. The default is telemetry.NopTracer(). A nil tracer is ignored.

Outlet is a single destination a payload can be sunk to. Sink returns ErrUnregistered when the outlet has no transformer for the payload’s concrete type — a normal, silent skip the Manifold does not count as a failure. Any other error is a real failure the Manifold logs and meters; a caller that holds an outlet directly may also inspect it for per-destination confirmation.

Implementations must be safe for concurrent use: a Manifold fans a single payload out to every attached Outlet from the calling goroutine, and the same outlet may be shared across manifolds.

type Outlet interface {
Sink(ctx context.Context, payload any) error
}

func Reservoir(inner Outlet, opts ...ReservoirOption) Outlet

Reservoir wraps inner so payloads are buffered and flushed in batches, by reaching the batch size or on the interval tick. The returned Outlet is also a Flusher and a Shutdowner; call Shutdown (or attach it to a Manifold) to stop the background loop and drain. Defaults: size 100, interval 5s, now = time.Now, meter = telemetry.NopMeter(), unbounded buffer. When inner is a BatchOutlet, flushes use SinkBatch; otherwise each buffered payload is sunk in turn.

Example

package main
import (
"context"
"fmt"
"github.com/stablekernel/crucible/sink"
)
func main() {
bucket := sink.NewBucket()
// Buffer until three payloads accumulate, then release as a batch.
batched := sink.Reservoir(bucket, sink.WithBatchSize(3), sink.WithBatchInterval(0))
for i := 1; i <= 3; i++ {
_ = batched.Sink(context.Background(), i)
}
fmt.Println(len(bucket.All()))
}
3

OutletFunc adapts a plain function to an Outlet. It is also the escape hatch for nesting one Manifold inside another, since a Manifold is intentionally not itself an Outlet (its Sink is fire-and-forget and returns nothing):

parent.Attach(sink.OutletFunc(func(ctx context.Context, p any) error {
child.Sink(ctx, p)
return nil
}))
type OutletFunc func(ctx context.Context, payload any) error

func (f OutletFunc) Sink(ctx context.Context, payload any) error

Sink calls the underlying function.

Phase names the stage of an outlet’s work that failed, for diagnostics.

type Phase string

const (
// PhaseTransform marks a failure turning a payload into a destination
// operation (a registry transformer returning an error-bearing op).
PhaseTransform Phase = "transform"
// PhaseApply marks a failure applying the operation to the destination
// client (the write itself).
PhaseApply Phase = "apply"
// PhaseFlush marks a failure flushing a buffered outlet.
PhaseFlush Phase = "flush"
)

Poller periodically runs a CollectFunc and sinks each yielded payload to a target Outlet (via SinkBatch when the target supports it). Construct with NewPoller; drive it with Start and stop it with Stop.

type Poller struct {
// contains filtered or unexported fields
}

func NewPoller(target Outlet, collect CollectFunc, opts ...PollerOption) *Poller

NewPoller binds a target and a CollectFunc into a Poller. Defaults: interval 60s, now = time.Now.

func (p *Poller) Start(ctx context.Context) *Poller

Start launches the sampling loop and returns the Poller for chaining. It is idempotent: a second call while running is a no-op.

func (p *Poller) Stop()

Stop cancels the sampling loop and waits for it to exit. It is safe to call when not started and is idempotent.

PollerOption configures a Poller.

type PollerOption func(*pollerConfig)

func WithPollInterval(d time.Duration) PollerOption

WithPollInterval sets the sampling period. The default is 60s.

func WithPollerClock(now func() time.Time) PollerOption

WithPollerClock injects the clock the Poller reads, for deterministic tests. The default is time.Now. A nil clock is ignored.

Registry maps a payload’s concrete type to a transformer producing a value of type D (typically Op[C] for some destination client C). There is no package-level registry and no init-time state: every Registry is constructed with NewRegistry and injected, so two registries never share entries. It is safe for concurrent Register and Lookup.

type Registry[D any] struct {
// contains filtered or unexported fields
}

func NewRegistry[D any]() *Registry[D]

NewRegistry returns an empty Registry.

func (r *Registry[D]) Lookup(payload any) (func(context.Context, any) D, bool)

Lookup returns the transformer registered for payload’s concrete type and whether one was found. The returned func accepts payload as any and asserts it back to the registered concrete type internally.

ReservoirOption configures a Reservoir. Options are additive with no-op defaults.

type ReservoirOption func(*reservoirConfig)

func WithBatchInterval(d time.Duration) ReservoirOption

WithBatchInterval sets the period of the background time-triggered flush. An interval <= 0 disables the background loop (size/manual flush only).

func WithBatchSize(n int) ReservoirOption

WithBatchSize sets the buffered-payload count that triggers a flush. A size <= 0 disables size-triggered flushing (interval only).

func WithMaxBuffered(n int) ReservoirOption

WithMaxBuffered caps the number of payloads held between flushes. Payloads arriving over the cap are dropped and counted on sink.dropped. The default (0) is unbounded.

func WithReservoirClock(now func() time.Time) ReservoirOption

WithReservoirClock injects the clock the Reservoir reads to time flush latency, for deterministic tests. The default is time.Now. A nil clock is ignored.

func WithReservoirMeter(m telemetry.Meter) ReservoirOption

WithReservoirMeter sets the meter the Reservoir records batch-size, flush latency, and drop counts on. The default is telemetry.NopMeter(). A nil meter is ignored.

Shutdowner is an Outlet that holds resources (a background flush loop, a connection) to release. Manifold.Shutdown calls Shutdown on every attached Shutdowner after flushing, draining in-flight work within ctx’s deadline.

type Shutdowner interface {
Shutdown(ctx context.Context) error
}

Generated by gomarkdoc