Skip to content

sink/statsd

import "github.com/stablekernel/crucible/sink/statsd"

Package statsd is a sink destination that aggregates payloads into StatsD metrics and flushes them to a StatsD client on an interval and on demand. It depends on crucible/sink and a single StatsD SDK whose client satisfies the narrow Client interface declared here.

Two surfaces are offered. The primary one is the Aggregator: it folds counters (summed) and gauges (last write wins) by metric identity, buffers histograms, distributions, timings, and sets as raw samples, and emits them to the Client on a flush interval and on Flush or Shutdown. It implements sink.Outlet, sink.Flusher, and sink.Shutdowner so a sink.Manifold drives its lifecycle. The secondary surface is the Emitter path (NewRegistry and New) for callers who want a payload-to-operation mapping with no in-process aggregation.

Experimental (pre-v1); the API may change until the suite locks v1.0.0.

func Count(name string, value int64, tags []string, rate float64) csink.Op[Client]

Count returns an Op that emits a StatsD count.

func Distribution(name string, value float64, tags []string, rate float64) csink.Op[Client]

Distribution returns an Op that emits a StatsD distribution sample.

func Gauge(name string, value float64, tags []string, rate float64) csink.Op[Client]

Gauge returns an Op that emits a StatsD gauge.

func Histogram(name string, value float64, tags []string, rate float64) csink.Op[Client]

Histogram returns an Op that emits a StatsD histogram sample.

func New(client Client, reg *csink.Registry[csink.Op[Client]], opts ...csink.EmitterOption) csink.Outlet

New builds an Outlet that applies each payload’s registered Op[Client] to the client, emitting one StatsD call per Sink with no aggregation. The outlet is named “statsd” unless overridden with sink.WithName.

func NewAggregator(client Client, opts ...AggregatorOption) csink.Outlet

NewAggregator builds an Aggregator bound to client and returns it as a sink.Outlet. The background flush loop starts on the first Sink, so an idle Aggregator holds no goroutine. Attach the result to a sink.Manifold, whose Flush and Shutdown drive the matching methods.

Example

ExampleNewAggregator folds two counts of the same metric into one summed emission and keeps the last gauge write, emitting on Flush.

Apache-2.0
package main
import (
"context"
"fmt"
"sort"
"time"
csink "github.com/stablekernel/crucible/sink"
statsdsink "github.com/stablekernel/crucible/sink/statsd"
)
// printingClient writes each StatsD call to stdout for the example output.
type printingClient struct{ lines []string }
func (p *printingClient) Count(n string, v int64, _ []string, _ float64) error {
p.lines = append(p.lines, fmt.Sprintf("count %s=%d", n, v))
return nil
}
func (p *printingClient) Gauge(n string, v float64, _ []string, _ float64) error {
p.lines = append(p.lines, fmt.Sprintf("gauge %s=%g", n, v))
return nil
}
func (p *printingClient) Histogram(string, float64, []string, float64) error { return nil }
func (p *printingClient) Distribution(string, float64, []string, float64) error { return nil }
func (p *printingClient) Timing(string, time.Duration, []string, float64) error { return nil }
func (p *printingClient) Set(string, string, []string, float64) error { return nil }
// ExampleNewAggregator folds two counts of the same metric into one summed
// emission and keeps the last gauge write, emitting on Flush.
func main() {
pc := &printingClient{}
agg := statsdsink.NewAggregator(pc, statsdsink.WithInterval(0))
ctx := context.Background()
_ = agg.Sink(ctx, statsdsink.Metric{Type: statsdsink.TypeCount, Name: "orders.placed", Int: 2, Rate: 1})
_ = agg.Sink(ctx, statsdsink.Metric{Type: statsdsink.TypeCount, Name: "orders.placed", Int: 3, Rate: 1})
_ = agg.Sink(ctx, statsdsink.Metric{Type: statsdsink.TypeGauge, Name: "queue.depth", Value: 1, Rate: 1})
_ = agg.Sink(ctx, statsdsink.Metric{Type: statsdsink.TypeGauge, Name: "queue.depth", Value: 9, Rate: 1})
_ = agg.(csink.Flusher).Flush(ctx)
sort.Strings(pc.lines)
for _, l := range pc.lines {
fmt.Println(l)
}
}
count orders.placed=5
gauge queue.depth=9

func NewMetricRegistry() *csink.Registry[Metric]

NewMetricRegistry returns an empty registry mapping payload types to Metric values, for use with WithRegistry. Populate it with sink.Register.

func NewRegistry() *csink.Registry[csink.Op[Client]]

NewRegistry returns an empty registry of Op[Client] for callers to populate with sink.Register, for use with New.

func Set(name string, value string, tags []string, rate float64) csink.Op[Client]

Set returns an Op that emits a StatsD set sample.

func Timing(name string, value time.Duration, tags []string, rate float64) csink.Op[Client]

Timing returns an Op that emits a StatsD timing sample.

Aggregator folds StatsD payloads in process and emits them to a Client on an interval and on Flush or Shutdown. It is safe for concurrent use: Sink folds under a mutex, and flush swaps the live window for a fresh one under the same mutex before emitting outside the lock, so a slow Client never blocks producers. It implements sink.Outlet, sink.Flusher, and sink.Shutdowner.

type Aggregator struct {
// contains filtered or unexported fields
}

func (a *Aggregator) Flush(_ context.Context) error

Flush swaps the live window for a fresh one and emits the captured window to the Client. It is safe to call concurrently with Sink and is a no-op on an empty window. The first emit error is returned (wrapped as *sink.Error with PhaseFlush); the remaining metrics in the window are still attempted.

func (a *Aggregator) Shutdown(_ context.Context) error

Shutdown stops the background loop, performs a final flush, and returns any emit error from that flush. It is idempotent: a second call is a no-op returning nil.

func (a *Aggregator) Sink(_ context.Context, payload any) error

Sink folds payload into the current window. A Metric payload is folded directly; any other payload is resolved through the registry installed with WithRegistry, and an unregistered type returns sink.ErrUnregistered. The first Sink starts the background flush loop when an interval is configured.

AggregatorOption configures an Aggregator. Options are additive and have no-op defaults; a zero or nil value is ignored, leaving the default in place.

type AggregatorOption func(*Aggregator)

func WithClock(now func() time.Time) AggregatorOption

WithClock injects the time source the background loop schedules against, enabling deterministic tests that advance time without sleeping. The default is time.Now. A nil clock is ignored.

func WithInterval(d time.Duration) AggregatorOption

WithInterval sets the background flush interval. A non-positive interval disables the background loop, leaving Flush and Shutdown as the only emit triggers. The default is 10 seconds.

func WithName(name string) AggregatorOption

WithName sets the outlet name used when wrapping emit failures. The default is “statsd”. An empty name is ignored.

func WithRegistry(reg *csink.Registry[Metric]) AggregatorOption

WithRegistry installs a transformer registry so the Aggregator can fold arbitrary payload types, not only Metric values. A payload with no registered transformer is skipped (sink.ErrUnregistered). A nil registry is ignored.

Client is the narrow StatsD surface this destination needs. It is satisfied structurally by the StatsD SDK client (and its no-op and mock variants), so tests wire a hand-rolled fake and consumers wire the real client without this package depending on a concrete SDK type in any exported signature.

type Client interface {
Count(name string, value int64, tags []string, rate float64) error
Gauge(name string, value float64, tags []string, rate float64) error
Histogram(name string, value float64, tags []string, rate float64) error
Distribution(name string, value float64, tags []string, rate float64) error
Timing(name string, value time.Duration, tags []string, rate float64) error
Set(name string, value string, tags []string, rate float64) error
}

func Dial(addr string) (Client, error)

Dial opens an SDK client to addr (for example “127.0.0.1:8125”) and returns it as a Client ready to pass to NewAggregator or New. It is a thin convenience over the SDK constructor; callers that need SDK-specific options should construct the SDK client directly, since *dogstatsd.Client satisfies Client structurally.

Metric is the payload an Aggregator folds. Name and Type are required; the value field that matters depends on Type. Count reads Int, Set reads Str, Timing reads Dur, and the float types (Gauge, Histogram, Distribution) read Value. Rate is the StatsD sample rate (1 means every sample); a non-positive Rate is treated as 1.

type Metric struct {
// Name is the metric name, for example "orders.placed".
Name string
// Str is the sample for Set metrics.
Str string
// Tags are the StatsD tags applied to the metric.
Tags []string
// Value is the sample for Gauge, Histogram, and Distribution metrics.
Value float64
// Rate is the StatsD sample rate; non-positive is treated as 1.
Rate float64
// Int is the sample for Count metrics.
Int int64
// Dur is the sample for Timing metrics.
Dur time.Duration
// Type selects which StatsD metric and value field apply.
Type MetricType
}

MetricType identifies which StatsD metric a Metric carries.

type MetricType uint8

const (
// TypeCount is a monotonic counter; samples with the same identity are summed.
TypeCount MetricType = iota
// TypeGauge is a point-in-time value; the last sample for an identity wins.
TypeGauge
// TypeHistogram is a statistical sample emitted verbatim, one per sample.
TypeHistogram
// TypeDistribution is a global distribution sample emitted verbatim.
TypeDistribution
// TypeTiming is a duration sample emitted verbatim.
TypeTiming
// TypeSet is a unique-value sample emitted verbatim.
TypeSet
)

Generated by gomarkdoc