Skip to content

sink/sql

import "github.com/stablekernel/crucible/sink/sql"

Package sql is a sink destination that persists payloads through the standard library’s database/sql. It depends only on the standard library and crucible/sink — there is no driver or ORM dependency. Register a transformer that turns each payload type into an Exec operation, then attach the result of New to a sink.Manifold.

Experimental (pre-v1); the API may change until the suite locks v1.0.0.

func Exec(query string, args ...any) csink.Op[Tx]

Exec returns an Op that runs a single statement with positional arguments. It is the workhorse constructor: a registry maps each payload type to the Exec (or OpFunc) that persists it.

func New(tx Tx, reg *csink.Registry[csink.Op[Tx]], opts ...csink.EmitterOption) csink.Outlet

New builds an Outlet that applies each payload’s registered Op[Tx] to tx. The outlet is named “sql” unless overridden with sink.WithName.

Example

Apache-2.0
package main
import (
"context"
"database/sql"
"fmt"
csink "github.com/stablekernel/crucible/sink"
sqlsink "github.com/stablekernel/crucible/sink/sql"
)
// recordingTx is a stand-in Tx that records the statements it runs.
type recordingTx struct{ stmts []string }
func (r *recordingTx) ExecContext(_ context.Context, query string, _ ...any) (sql.Result, error) {
r.stmts = append(r.stmts, query)
return nil, nil
}
type userRegistered struct{ Email string }
func main() {
tx := &recordingTx{}
reg := sqlsink.NewRegistry()
csink.Register(reg, func(_ context.Context, u userRegistered) csink.Op[sqlsink.Tx] {
return sqlsink.Exec("INSERT INTO users(email) VALUES (?)", u.Email)
})
outlet := sqlsink.New(tx, reg)
_ = outlet.Sink(context.Background(), userRegistered{Email: "a@example.com"})
fmt.Println(tx.stmts[0])
}
INSERT INTO users(email) VALUES (?)

func NewRegistry() *csink.Registry[csink.Op[Tx]]

NewRegistry returns an empty registry of Op[Tx] for callers to populate with sink.Register.

Tx is the narrow database/sql surface this destination needs. It is satisfied by *sql.DB, *sql.Tx, and *sql.Conn, so a consumer wires whichever scope fits without this package importing a driver.

type Tx interface {
ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
}

Generated by gomarkdoc