Fire-and-forget fan-out

Manifold.Sink is the only emit path, and it returns nothing:
func (m *Manifold) Sink(ctx context.Context, payload any)For each attached outlet, the Manifold calls outlet.Sink(ctx, payload) and
sorts the result:
- success → the
sink.sunkcounter, ErrUnregistered→ thesink.skippedcounter (a normal “this outlet does not handle this type”), nothing logged,- any other error → the
sink.failedcounter, recorded on the emit span, and logged atERRORon the configured*slog.Logger.
One outlet failing never stops the others, and it never propagates to the caller. That is the contract: the call site fires and moves on.
Why no synchronous result
Section titled “Why no synchronous result”There is deliberately no SinkWait. A buffered outlet (see
Reservoir below) can only confirm admission to its
buffer, not the eventual write, so a synchronous “all confirmed” return would
be a dishonest guarantee. When you genuinely need confirmation for one critical
destination, hold that Outlet directly and call it. You get an honest,
per-destination error:
if err := auditOutlet.Sink(ctx, payload); err != nil { return err // 500, retry, compensate: your call}m.Sink(ctx, payload) // everything else fans out fire-and-forgetErrors that do surface (through a held outlet) are typed and wrap cleanly:
var se *sink.Errorif errors.As(err, &se) { log.Error("sink failed", "outlet", se.Outlet, "phase", se.Phase, "type", se.PayloadType)}Reservoir: batching
Section titled “Reservoir: batching”Wrap any outlet in a Reservoir to buffer payloads and release them in batches,
by size or on an interval:
batched := sink.Reservoir(s3Outlet, sink.WithBatchSize(100), sink.WithBatchInterval(5*time.Second),)m.Attach(batched)On flush, if the wrapped outlet implements BatchOutlet the Reservoir calls
SinkBatch once; otherwise it loops Sink. It records sink.batch_size and
sink.flush_latency_ms, drops past an optional WithMaxBuffered cap (counted on
sink.dropped), and reads its clock through WithReservoirClock, so tests
drive flushes deterministically with no sleeps. The returned value is itself
an Outlet (and a Flusher and Shutdowner), so it composes anywhere an outlet
goes.
Poller: periodic sampling
Section titled “Poller: periodic sampling”Where a Reservoir reacts to payloads pushed in, a Poller pulls: on an
interval it runs a CollectFunc that yields payloads, and sinks each to a target
outlet.
p := sink.NewPoller(metricsOutlet, func(ctx context.Context, emit func(any)) { emit(QueueDepth{N: queue.Len()})}, sink.WithPollInterval(time.Second))p.Start(ctx)defer p.Stop()Start is idempotent and fluent; Stop cancels and waits. Like the Reservoir,
the Poller takes its clock as an option for deterministic tests.