Codecs and headers
A message arrives as bytes. A codec turns those bytes into the typed event a
handler wants, and resolves the generic T for a TypedHandler[T].
Instance-scoped, never global
Section titled “Instance-scoped, never global”The registry is constructed and injected, never a package-level singleton. This deliberately avoids the global-registry anti-pattern: two consumers in the same process never share decode state, and a codec is selected per message by its content-type or header.
reg := source.NewCodecRegistry()source.RegisterCodec[OrderPlaced](reg, jsonCodec) // resolves T for the typed handlerJSON, proto, and Avro are the built-in shapes; CloudEvents lives in its own
module. Headers are typed accessors over extension attributes, not a
map[string]string of magic strings, so a missing or mistyped attribute is a
typed lookup, not a silent empty string.
cloudevents
Section titled “cloudevents”crucible/source/cloudevents is the CloudEvents codec. It supports both content
modes from the spec:
- structured mode, where the whole event (envelope plus data) is one encoded body, and
- binary mode, where the event attributes ride in headers and the data is the raw body.
The two content modes are the codec seam itself, so the same handler works whichever mode a producer chose. Because the registry is instance-scoped, the CloudEvents codec never registers itself globally, and a service can run a CloudEvents consumer and a plain-JSON consumer side by side without interference.
The decoded type then flows into your handler, and for the
state-machine binding it becomes the event the
router fires. A decode failure is a typed *DecodeError the engine classifies
as poison and routes to the DLQ.
crucible/source/cdc is the change-data-capture codec. It decodes the standard
Debezium JSON change-event envelope (also the de-facto OpenCDC normalized record
shape) into a typed ChangeEvent:
- an
Operation(create, snapshot read, update, delete, or tombstone), - the
beforeandafterrow images, kept as deferred JSON so one codec serves every table on a topic without binding to a row type at decode time, - a decoded
Sourcemetadata block (connector, database, schema, table, snapshot marker, log position, transaction id), and - the commit
Timestampthe connector reported.
Recover the value with DecodeEvent, project a row image into a concrete type
with BeforeAs[T] / AfterAs[T], and read the source metadata as typed
source.Headers through SourceHeaders. A log-compaction tombstone (an empty
payload) decodes to an OpTombstone event rather than a decode failure, so a
handler routes it (a delete-and-forget for the key) or skips it. A malformed
envelope is a typed *DecodeError the engine classifies as poison.
Scope: envelope plus topic pattern, not a native connector
Section titled “Scope: envelope plus topic pattern, not a native connector”This codec covers the change-event envelope and the pattern for driving a
statechart from a change-event topic. The intended shape is to let an
existing connector (Debezium, or any producer emitting the same envelope) write
row changes to a topic, consume that topic through a backend inlet such as
source/kafka, decode each message with this codec,
and drive a statechart per primary key through the
state-machine binding. Because the codec is
instance-scoped, a service can run a CDC consumer alongside a plain-JSON or
CloudEvents consumer with no shared decode state.
A native database write-ahead-log connector (reading a Postgres logical replication slot or a MySQL binlog directly, without a broker in between) is deliberately out of scope and tracked as future work. The codec gives you the typed change event; the connector that produces those events stays a separate, operational concern.
// Decode a Debezium change event, then route its after-image into a transition.registry := source.NewRegistry().SetDefault(cdc.New())
router := func(m source.Message) (Key, Event, error) { change, err := cdc.DecodeEvent(registry, m) if err != nil { return zeroKey, zeroEvent, err } row, err := cdc.AfterAs[Row](change) if err != nil { return zeroKey, zeroEvent, err } return keyOf(row), eventOf(change.Operation, row), nil}