sink/sql
import "github.com/stablekernel/crucible/sink/sql"Package sql is a sink destination that persists payloads through the standard library’s database/sql. It depends only on the standard library and crucible/sink — there is no driver or ORM dependency. Register a transformer that turns each payload type into an Exec operation, then attach the result of New to a sink.Manifold.
Stability
Section titled “Stability”Experimental (pre-v1); the API may change until the suite locks v1.0.0.
- func Exec(query string, args …any) csink.Op[Tx]
- func New(tx Tx, reg *csink.Registry[csink.Op[Tx]], opts …csink.EmitterOption) csink.Outlet
- func NewRegistry() *csink.Registry[csink.Op[Tx]]
- type Tx
func Exec
Section titled “func Exec”func Exec(query string, args ...any) csink.Op[Tx]Exec returns an Op that runs a single statement with positional arguments. It is the workhorse constructor: a registry maps each payload type to the Exec (or OpFunc) that persists it.
func New
Section titled “func New”func New(tx Tx, reg *csink.Registry[csink.Op[Tx]], opts ...csink.EmitterOption) csink.OutletNew builds an Outlet that applies each payload’s registered Op[Tx] to tx. The outlet is named “sql” unless overridden with sink.WithName.
Example
package main
import ( "context" "database/sql" "fmt"
csink "github.com/stablekernel/crucible/sink" sqlsink "github.com/stablekernel/crucible/sink/sql")
// recordingTx is a stand-in Tx that records the statements it runs.type recordingTx struct{ stmts []string }
func (r *recordingTx) ExecContext(_ context.Context, query string, _ ...any) (sql.Result, error) { r.stmts = append(r.stmts, query) return nil, nil}
type userRegistered struct{ Email string }
func main() { tx := &recordingTx{} reg := sqlsink.NewRegistry() csink.Register(reg, func(_ context.Context, u userRegistered) csink.Op[sqlsink.Tx] { return sqlsink.Exec("INSERT INTO users(email) VALUES (?)", u.Email) })
outlet := sqlsink.New(tx, reg) _ = outlet.Sink(context.Background(), userRegistered{Email: "a@example.com"})
fmt.Println(tx.stmts[0])}Output
Section titled “Output”INSERT INTO users(email) VALUES (?)func NewRegistry
Section titled “func NewRegistry”func NewRegistry() *csink.Registry[csink.Op[Tx]]NewRegistry returns an empty registry of Op[Tx] for callers to populate with sink.Register.
type Tx
Section titled “type Tx”Tx is the narrow database/sql surface this destination needs. It is satisfied by *sql.DB, *sql.Tx, and *sql.Conn, so a consumer wires whichever scope fits without this package importing a driver.
type Tx interface { ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)}Generated by gomarkdoc