Skip to content

source/cdc

import "github.com/stablekernel/crucible/source/cdc"

Package cdc is a source codec that decodes change-data-capture (CDC) events into a typed ChangeEvent. It plugs into a [source.Registry] as an instance-scoped [source.Codec]: construct one with New and register it under the content types your CDC topic carries, or set it as the registry default. There is no package-global registration; every codec is constructed and injected, never shared by import.

This codec handles the change-event envelope only: the standard Debezium JSON shape (an “op” of c/r/u/d, “before”/“after” row images, a “source” metadata block, and a “ts_ms” commit timestamp), which is also the de-facto OpenCDC normalized record shape. It decodes one row-change message into a ChangeEvent whose row images stay as deferred JSON (RawJSON) so a handler recovers a concrete row type only when it needs one, via BeforeAs / AfterAs.

A native database write-ahead-log connector (reading a Postgres logical replication slot, a MySQL binlog, and so on) is deliberately out of scope and tracked as future work. The intended pattern is to let an existing CDC connector (Debezium, or any producer emitting the same envelope) write change events to a topic, consume that topic through a backend inlet such as source/kafka, decode each message with this codec, and drive a statechart per primary key through source/statemachine.

Decode yields a ChangeEvent (by value). Recover it from a registry result with EventOf or the one-call DecodeEvent; project its row images into a concrete type with BeforeAs / AfterAs. Useful fields from the source metadata block are surfaced as core [source.Headers] (see SourceHeaders) so a handler reads them through the same typed-header surface as any other inbound metadata instead of a magic-string map.

A Kafka log-compaction tombstone (an empty payload) decodes to a ChangeEvent whose Operation is OpTombstone and whose row images are both absent. It is a valid, retryable-free outcome, not a decode failure: a handler routes it (a delete-and-forget for the key) or skips it.

Experimental (pre-v1); the API may change until the suite locks v1.0.0.

Example

Example shows wiring the CDC codec into a source.Registry and decoding a Debezium JSON change event into a typed row image. The codec is instance-scoped: it is registered on a registry the caller owns, never on a process-global table.

Apache-2.0
package main
import (
"fmt"
"github.com/stablekernel/crucible/source"
"github.com/stablekernel/crucible/source/cdc"
)
// Example shows wiring the CDC codec into a source.Registry and decoding a
// Debezium JSON change event into a typed row image. The codec is
// instance-scoped: it is registered on a registry the caller owns, never on a
// process-global table.
func main() {
type user struct {
ID int64 `json:"id"`
Name string `json:"name"`
}
// Register the codec under the Debezium JSON content type, and as the
// default so a topic that carries no content-type header still routes to it.
codec := cdc.New()
registry := source.NewRegistry().
Register(cdc.DebeziumJSONContentType, codec).
SetDefault(codec)
// A Debezium update envelope for the users table: both row images present.
msg := exampleMessage{
subject: "shop.public.users",
value: []byte(`{
"op":"u",
"before":{"id":1,"name":"ada"},
"after":{"id":1,"name":"ada lovelace"},
"source":{"connector":"postgresql","db":"shop","schema":"public","table":"users"},
"ts_ms":1700000000000
}`),
}
event, err := cdc.DecodeEvent(registry, msg)
if err != nil {
fmt.Println("decode failed:", err)
return
}
after, err := cdc.AfterAs[user](event)
if err != nil {
fmt.Println("after image:", err)
return
}
table, _ := cdc.SourceHeaders(event).Get(cdc.TableHeader)
fmt.Printf("op=%s table=%s id=%d name=%q\n",
event.Operation, table, after.ID, after.Name)
}
// exampleMessage is a minimal source.Message for the example.
type exampleMessage struct {
subject string
value []byte
}
func (m exampleMessage) Key() []byte { return nil }
func (m exampleMessage) Value() []byte { return m.value }
func (m exampleMessage) Headers() source.Headers { return nil }
func (m exampleMessage) Subject() string { return m.subject }
func (m exampleMessage) PartitionKey() string { return "" }
func (m exampleMessage) Cursor() source.Cursor { return nil }
func (m exampleMessage) As(any) bool { return false }
op=u table=users id=1 name="ada lovelace"

Header keys under which SourceHeaders surfaces a change event’s source metadata through the core [source.Headers], keeping CDC metadata legible as typed headers instead of a magic-string map.

const (
// OperationHeader carries the change operation's code (see [Operation.String]).
OperationHeader = "cdc-op"
// ConnectorHeader carries the source connector name.
ConnectorHeader = "cdc-connector"
// DatabaseHeader carries the source database.
DatabaseHeader = "cdc-database"
// SchemaHeader carries the source schema.
SchemaHeader = "cdc-schema"
// TableHeader carries the source table.
TableHeader = "cdc-table"
// SnapshotHeader carries "true" when the record came from the initial
// snapshot, and is omitted otherwise.
SnapshotHeader = "cdc-snapshot"
// LSNHeader carries the source log sequence number / position, when present.
LSNHeader = "cdc-lsn"
// TxIDHeader carries the source transaction id, when present.
TxIDHeader = "cdc-txid"
)

DebeziumJSONContentType is the media type a Debezium JSON converter stamps on change-event messages. Register the Codec under this type to decode a Debezium topic, or set the codec as the registry default when the topic carries no content type.

const DebeziumJSONContentType = "application/vnd.debezium.cdc+json"

Sentinel errors a malformed envelope reports. Each is wrapped by the codec with context and, through the [source.Registry], in a [*source.DecodeError] that reports [source.ErrPoison]: a structurally invalid change event cannot be retried into validity. Match them with errors.Is.

var (
// ErrMalformedEnvelope reports a payload that is not a JSON object or whose
// top-level shape is not a change-event envelope.
ErrMalformedEnvelope = errors.New("cdc: malformed change-event envelope")
// ErrUnknownOperation reports an "op" value the codec does not recognize.
ErrUnknownOperation = errors.New("cdc: unknown operation")
// ErrMissingImage reports a typed-image projection ([BeforeAs] / [AfterAs])
// for a row image the operation does not carry (a "before" image on a create,
// say).
ErrMissingImage = errors.New("cdc: row image absent")
)

func AfterAs[T any](e ChangeEvent) (T, error)

AfterAs decodes a change event’s after-image into a fresh value of type T: the typed view of the row after the change. It returns ErrMissingImage when the operation carries no after-image (a delete, a tombstone).

func BeforeAs[T any](e ChangeEvent) (T, error)

BeforeAs decodes a change event’s before-image into a fresh value of type T: the typed view of the row prior to the change. It returns ErrMissingImage when the operation carries no before-image (a create, a snapshot read, a tombstone).

func SourceHeaders(e ChangeEvent) source.Headers

SourceHeaders surfaces a change event’s operation and source metadata as core [source.Headers], so a handler reads CDC metadata through the same typed-header surface as any other inbound metadata. Only non-empty fields are emitted; the snapshot header appears only when the record came from a snapshot. The result is a fresh slice in a stable order the caller may retain.

ChangeEvent is a decoded CDC envelope: one row change with its before/after images, source metadata, and commit timestamp. The row images are deferred (RawJSON); project them with BeforeAs / AfterAs (or ChangeEvent.Before / ChangeEvent.After directly).

type ChangeEvent struct {
// Operation is the kind of change (create, read/snapshot, update, delete, or
// tombstone).
Operation Operation
// Before is the row image prior to the change ([RawJSON]); absent (nil) on a
// create, a snapshot read, and a tombstone.
Before RawJSON
// After is the row image after the change ([RawJSON]); absent (nil) on a
// delete and a tombstone.
After RawJSON
// Source is the decoded connector metadata block.
Source SourceMetadata
// Timestamp is the commit time the connector reported ("ts_ms"), in UTC; the
// zero time when the envelope carried none.
Timestamp time.Time
}

func DecodeEvent(r *source.Registry, m source.Message) (ChangeEvent, error)

DecodeEvent decodes m through r and recovers the ChangeEvent, the convenience path a CDC handler uses instead of calling [source.Registry.Decode] and EventOf in sequence. A decode failure returns the [*source.DecodeError] from the registry; a value that is not a ChangeEvent (some other codec matched) returns a *source.DecodeError wrapping [source.ErrPoison], since a payload that decoded to the wrong shape cannot be retried into the right one.

func EventOf(v any) (ChangeEvent, bool)

EventOf recovers the ChangeEvent a Codec decoded from a registry result. It is the typed bridge between [source.Registry.Decode] (which returns any) and a handler that works with change events: pass the decoded value, get back the event and whether the value was in fact a ChangeEvent.

A false return means the registry routed the message to a different codec (the content type matched something other than this codec); it is not a decode failure.

Codec decodes a [source.Message] carrying a Debezium/OpenCDC JSON change-event envelope into a ChangeEvent. It holds no mutable state and is safe for concurrent use; the [source.Hopper] decodes from per-lane worker goroutines.

Construct it with New and register it on a [source.Registry] (or set it as the default). It is the instance seam: there is no package-global registration.

type Codec struct{}

func New() Codec

New returns a Codec. It is a constructor for symmetry with the rest of the suite and to leave room for future options; the zero value is equally valid since the codec carries no state.

func (Codec) Decode(data []byte, _ source.Headers) (any, error)

Decode turns a message’s bytes into a ChangeEvent. An empty payload is a log-compaction tombstone and decodes to a ChangeEvent with OpTombstone. A non-empty payload must be a Debezium JSON envelope; a payload that is not a JSON object, or whose “op” is unrecognized, returns an error the [source.Registry] wraps in a [*source.DecodeError] (which reports [source.ErrPoison]).

Headers are not consulted: the change-event shape is self-describing in the body. Recover the value with EventOf / DecodeEvent and project its row images with BeforeAs / AfterAs.

Operation is the kind of row change a ChangeEvent carries, mirroring the Debezium “op” field.

type Operation uint8

const (
// OpUnknown is the zero value: an envelope whose operation has not been set.
OpUnknown Operation = iota
// OpCreate is an insert ("c"): only an after-image is present.
OpCreate
// OpRead is a snapshot read ("r"): a row captured during the connector's
// initial snapshot, carrying an after-image and no before-image.
OpRead
// OpUpdate is an update ("u"): both before- and after-images are present
// when the connector is configured to capture them.
OpUpdate
// OpDelete is a delete ("d"): only a before-image is present; the after-image
// is null.
OpDelete
// OpTombstone is a log-compaction tombstone: an empty message that follows a
// delete on a compacted topic. It carries no images.
OpTombstone
)

func (o Operation) String() string

String renders the operation as its Debezium op code (“c”, “r”, “u”, “d”), “tombstone”, or “unknown” for diagnostics and headers.

RawJSON is a deferred row image: the raw JSON bytes of a “before” or “after” row, decoded into a concrete type on demand via BeforeAs / AfterAs (or directly with RawJSON.As). It is nil when the image is absent. Keeping the image deferred lets one codec serve every table on a topic without binding to a row type at decode time.

type RawJSON []byte

func (r RawJSON) As(out any) error

As decodes the deferred row image into out. It returns ErrMissingImage when the image is absent (nil), and the json.Unmarshal error when the bytes do not fit out’s shape.

func (r RawJSON) Present() bool

Present reports whether the row image is set (non-nil). A create has no before-image and a delete has no after-image, so a handler checks Present before projecting.

SourceMetadata is the decoded “source” block of a change event: the connector metadata Debezium attaches to every record. Fields absent from a given connector’s payload stay at their zero value. The full block is retained as SourceMetadata.Raw for fields not surfaced here.

type SourceMetadata struct {
// Connector is the Debezium connector name ("postgresql", "mysql", ...).
Connector string
// Name is the logical server / database-server name configured on the
// connector.
Name string
// Database is the source database the change came from.
Database string
// Schema is the source schema (Postgres) the table lives in.
Schema string
// Table is the source table the row belongs to.
Table string
// Snapshot reports whether the record was captured during the connector's
// initial snapshot (the Debezium "snapshot" marker is truthy).
Snapshot bool
// LSN is the source log sequence number / position, as a string to span the
// per-connector representations (a Postgres LSN, a MySQL binlog coordinate).
LSN string
// TxID is the source transaction identifier, when the connector reports one.
TxID string
// Raw is the undecoded source block, for fields not surfaced above.
Raw RawJSON
}

Generated by gomarkdoc