source/cloudevents
import "github.com/stablekernel/crucible/source/cloudevents"Package cloudevents is a source codec that decodes inbound messages into CloudEvents. It plugs into a source.Registry as an instance-scoped [source.Codec]: construct one with New and register it under the content types you accept; there is no global format registration (the CloudEvents SDK’s package-level format registry is deliberately not used, so two codecs in one process never share mutable state).
Content modes
Section titled “Content modes”The CloudEvents spec defines two ways an event rides a transport, and this codec accepts both, selecting between them by the message’s content type:
- Structured mode: the entire event — attributes and data — is one JSON document in the body, carried under “application/cloudevents+json”.
- Binary mode: the event’s attributes ride as “ce-”-prefixed headers and the data is the raw body, with the body’s own media type in the “datacontenttype” header (or the message’s content-type).
A content type whose media type begins with “application/cloudevents” (the structured prefix) decodes as structured; anything else decodes as binary. See Detect.
Decoded value
Section titled “Decoded value”Decode yields a cloudevents.Event (the SDK’s canonical event). Recover it from a handler with EventOf, and decode its data payload into a concrete type with DataAs or the generic DecodeData helper. Extension attributes are surfaced through the core [source.Headers] (see Extensions) rather than a magic-string map, so a handler reads them the same way it reads any other inbound metadata.
Stability
Section titled “Stability”Experimental (pre-v1); the API may change until the suite locks v1.0.0.
Example
Example shows wiring the CloudEvents codec into a source.Registry and decoding a binary-mode message into a typed payload. The codec is instance-scoped: it is registered on a registry the caller owns, never on a process-global format table.
package main
import ( "encoding/json" "fmt"
"github.com/stablekernel/crucible/source" "github.com/stablekernel/crucible/source/cloudevents")
// Example shows wiring the CloudEvents codec into a source.Registry and// decoding a binary-mode message into a typed payload. The codec is// instance-scoped: it is registered on a registry the caller owns, never on a// process-global format table.func main() { type orderCreated struct { ID string `json:"id"` Qty int `json:"qty"` }
// Register one codec for both content modes: structured under its media // type, and as the default so binary-mode messages (which carry a data // content type such as application/json) also route to it. codec := cloudevents.New() registry := source.NewRegistry(). Register(cloudevents.StructuredContentType, codec). SetDefault(codec)
// A binary-mode message: attributes ride as ce- headers, data is the body. body, _ := json.Marshal(orderCreated{ID: "o-42", Qty: 3}) msg := exampleMessage{ value: body, headers: source.Headers{ {Key: source.ContentTypeHeader, Value: "application/json"}, {Key: "ce-specversion", Value: "1.0"}, {Key: "ce-id", Value: "evt-1"}, {Key: "ce-source", Value: "/shop/checkout"}, {Key: "ce-type", Value: "com.example.order.created"}, {Key: "ce-region", Value: "us-east"}, // extension attribute }, }
event, data, err := cloudevents.DecodeData[orderCreated](registry, msg) if err != nil { fmt.Println("decode failed:", err) return }
region, _ := cloudevents.Extensions(event).Get(cloudevents.ExtensionHeaderPrefix + "region") fmt.Printf("type=%s id=%s order=%s qty=%d region=%s\n", event.Type(), event.ID(), data.ID, data.Qty, region)
}
// exampleMessage is a minimal source.Message for the example.type exampleMessage struct { value []byte headers source.Headers}
func (m exampleMessage) Key() []byte { return nil }func (m exampleMessage) Value() []byte { return m.value }func (m exampleMessage) Headers() source.Headers { return m.headers }func (m exampleMessage) Subject() string { return "orders" }func (m exampleMessage) PartitionKey() string { return "" }func (m exampleMessage) Cursor() source.Cursor { return nil }func (m exampleMessage) As(any) bool { return false }Output
Section titled “Output”type=com.example.order.created id=evt-1 order=o-42 qty=3 region=us-east- Constants
- func DataAs(e cloudevents.Event, out any) error
- func DecodeData[T any](r *source.Registry, m source.Message) (cloudevents.Event, T, error)
- func DecodeEvent(r *source.Registry, m source.Message) (cloudevents.Event, error)
- func EventOf(v any) (cloudevents.Event, bool)
- func Extensions(e cloudevents.Event) source.Headers
- type Codec
- type Mode
Constants
Section titled “Constants”ExtensionHeaderPrefix is the key prefix under which Extensions surfaces a decoded event’s extension attributes through the core [source.Headers]. An extension named “traceparent” is exposed as the header “ce-ext-traceparent”, keeping extensions legible as typed headers instead of a separate magic-string map.
const ExtensionHeaderPrefix = "ce-ext-"StructuredContentType is the media type that carries a CloudEvent in structured mode: attributes and data together as one JSON document. Register the Codec under this type to decode structured events.
const StructuredContentType = "application/cloudevents+json"func DataAs
Section titled “func DataAs”func DataAs(e cloudevents.Event, out any) errorDataAs decodes the event’s data payload into out, honoring the event’s data content type (JSON, for the common case). It is a thin pass-through to the SDK’s typed-data decode, named for symmetry with the rest of the codec surface. A decode failure is returned plainly; a caller routing through a Codec will already have a valid event, so a failure here is a data-shape mismatch the caller decides how to classify.
func DecodeData
Section titled “func DecodeData”func DecodeData[T any](r *source.Registry, m source.Message) (cloudevents.Event, T, error)DecodeData decodes m through r into a cloudevents.Event and then decodes that event’s data payload into a fresh value of type T: the one-call typed path a handler uses to go from raw message to typed CloudEvents data. A decode failure (of the event or its data) returns an error; the event-level failure is the registry’s [*source.DecodeError], and a data-shape failure is wrapped with context.
func DecodeEvent
Section titled “func DecodeEvent”func DecodeEvent(r *source.Registry, m source.Message) (cloudevents.Event, error)DecodeEvent decodes m through r and recovers the cloudevents.Event, the convenience path a CloudEvents handler uses instead of calling [source.Registry.Decode] and EventOf in sequence. A decode failure returns the [*source.DecodeError] from the registry; a value that is not a CloudEvent (some other codec matched) returns a *source.DecodeError wrapping [source.ErrPoison], since a payload that decoded to the wrong shape cannot be retried into the right one.
func EventOf
Section titled “func EventOf”func EventOf(v any) (cloudevents.Event, bool)EventOf recovers the cloudevents.Event a Codec decoded from a registry result. It is the typed bridge between [source.Registry.Decode] (which returns any) and a handler that works with CloudEvents: pass the decoded value, get back the event and whether the value was in fact a CloudEvent.
A false return means the registry routed the message to a different codec (the content type matched something other than this codec); it is not a decode failure.
func Extensions
Section titled “func Extensions”func Extensions(e cloudevents.Event) source.HeadersExtensions surfaces an event’s extension attributes as core [source.Headers], each key prefixed with ExtensionHeaderPrefix, so a handler reads CloudEvents extensions through the same typed-header surface as any other inbound metadata instead of a separate magic-string map. Values are rendered to their string form; non-string extensions use fmt’s default rendering. The result is a fresh slice in stable (sorted-by-name) order the caller may retain.
type Codec
Section titled “type Codec”Codec decodes a [source.Message] into a cloudevents.Event, handling both CloudEvents content modes. It holds no mutable state and is safe for concurrent use; the [source.Hopper] decodes from per-lane worker goroutines.
Construct it with New and register it on a [source.Registry] (or set it as the default). It is the instance seam — there is no package-global registration.
type Codec struct{}func New
Section titled “func New”func New() CodecNew returns a Codec. It is a constructor for symmetry with the rest of the suite and to leave room for future options; the zero value is equally valid since the codec carries no state.
func (Codec) Decode
Section titled “func (Codec) Decode”func (Codec) Decode(data []byte, h source.Headers) (any, error)Decode turns a message’s bytes and headers into a cloudevents.Event, selecting the content mode from the content-type header via Detect. The returned value is a cloudevents.Event (by value); recover it with EventOf and read its data with DataAs or DecodeData.
A malformed structured payload, a binary event missing a required attribute, or an event that fails CloudEvents validation returns an error the [source.Registry] wraps in a [*source.DecodeError] (which reports [source.ErrPoison]): a structurally invalid event cannot be retried into validity.
type Mode
Section titled “type Mode”Mode is the CloudEvents content mode a message arrived in.
type Mode intconst ( // Binary is the content mode in which context attributes ride as headers // and the data is the raw body. Binary Mode = iota // Structured is the content mode in which the whole event is one JSON // document in the body. Structured)func Detect
Section titled “func Detect”func Detect(contentType string) ModeDetect reports the CloudEvents content mode for a content type. A media type beginning with the structured prefix (“application/cloudevents”) is Structured; everything else, including the empty string, is Binary. Any parameters after a ”;” (charset, for one) are ignored.
func (Mode) String
Section titled “func (Mode) String”func (m Mode) String() stringString renders the mode for logs and diagnostics.
Generated by gomarkdoc