Skip to content

source/cloudevents

import "github.com/stablekernel/crucible/source/cloudevents"

Package cloudevents is a source codec that decodes inbound messages into CloudEvents. It plugs into a source.Registry as an instance-scoped [source.Codec]: construct one with New and register it under the content types you accept; there is no global format registration (the CloudEvents SDK’s package-level format registry is deliberately not used, so two codecs in one process never share mutable state).

The CloudEvents spec defines two ways an event rides a transport, and this codec accepts both, selecting between them by the message’s content type:

  • Structured mode: the entire event — attributes and data — is one JSON document in the body, carried under “application/cloudevents+json”.
  • Binary mode: the event’s attributes ride as “ce-”-prefixed headers and the data is the raw body, with the body’s own media type in the “datacontenttype” header (or the message’s content-type).

A content type whose media type begins with “application/cloudevents” (the structured prefix) decodes as structured; anything else decodes as binary. See Detect.

Decode yields a cloudevents.Event (the SDK’s canonical event). Recover it from a handler with EventOf, and decode its data payload into a concrete type with DataAs or the generic DecodeData helper. Extension attributes are surfaced through the core [source.Headers] (see Extensions) rather than a magic-string map, so a handler reads them the same way it reads any other inbound metadata.

Experimental (pre-v1); the API may change until the suite locks v1.0.0.

Example

Example shows wiring the CloudEvents codec into a source.Registry and decoding a binary-mode message into a typed payload. The codec is instance-scoped: it is registered on a registry the caller owns, never on a process-global format table.

Apache-2.0
package main
import (
"encoding/json"
"fmt"
"github.com/stablekernel/crucible/source"
"github.com/stablekernel/crucible/source/cloudevents"
)
// Example shows wiring the CloudEvents codec into a source.Registry and
// decoding a binary-mode message into a typed payload. The codec is
// instance-scoped: it is registered on a registry the caller owns, never on a
// process-global format table.
func main() {
type orderCreated struct {
ID string `json:"id"`
Qty int `json:"qty"`
}
// Register one codec for both content modes: structured under its media
// type, and as the default so binary-mode messages (which carry a data
// content type such as application/json) also route to it.
codec := cloudevents.New()
registry := source.NewRegistry().
Register(cloudevents.StructuredContentType, codec).
SetDefault(codec)
// A binary-mode message: attributes ride as ce- headers, data is the body.
body, _ := json.Marshal(orderCreated{ID: "o-42", Qty: 3})
msg := exampleMessage{
value: body,
headers: source.Headers{
{Key: source.ContentTypeHeader, Value: "application/json"},
{Key: "ce-specversion", Value: "1.0"},
{Key: "ce-id", Value: "evt-1"},
{Key: "ce-source", Value: "/shop/checkout"},
{Key: "ce-type", Value: "com.example.order.created"},
{Key: "ce-region", Value: "us-east"}, // extension attribute
},
}
event, data, err := cloudevents.DecodeData[orderCreated](registry, msg)
if err != nil {
fmt.Println("decode failed:", err)
return
}
region, _ := cloudevents.Extensions(event).Get(cloudevents.ExtensionHeaderPrefix + "region")
fmt.Printf("type=%s id=%s order=%s qty=%d region=%s\n",
event.Type(), event.ID(), data.ID, data.Qty, region)
}
// exampleMessage is a minimal source.Message for the example.
type exampleMessage struct {
value []byte
headers source.Headers
}
func (m exampleMessage) Key() []byte { return nil }
func (m exampleMessage) Value() []byte { return m.value }
func (m exampleMessage) Headers() source.Headers { return m.headers }
func (m exampleMessage) Subject() string { return "orders" }
func (m exampleMessage) PartitionKey() string { return "" }
func (m exampleMessage) Cursor() source.Cursor { return nil }
func (m exampleMessage) As(any) bool { return false }
type=com.example.order.created id=evt-1 order=o-42 qty=3 region=us-east

ExtensionHeaderPrefix is the key prefix under which Extensions surfaces a decoded event’s extension attributes through the core [source.Headers]. An extension named “traceparent” is exposed as the header “ce-ext-traceparent”, keeping extensions legible as typed headers instead of a separate magic-string map.

const ExtensionHeaderPrefix = "ce-ext-"

StructuredContentType is the media type that carries a CloudEvent in structured mode: attributes and data together as one JSON document. Register the Codec under this type to decode structured events.

const StructuredContentType = "application/cloudevents+json"

func DataAs(e cloudevents.Event, out any) error

DataAs decodes the event’s data payload into out, honoring the event’s data content type (JSON, for the common case). It is a thin pass-through to the SDK’s typed-data decode, named for symmetry with the rest of the codec surface. A decode failure is returned plainly; a caller routing through a Codec will already have a valid event, so a failure here is a data-shape mismatch the caller decides how to classify.

func DecodeData[T any](r *source.Registry, m source.Message) (cloudevents.Event, T, error)

DecodeData decodes m through r into a cloudevents.Event and then decodes that event’s data payload into a fresh value of type T: the one-call typed path a handler uses to go from raw message to typed CloudEvents data. A decode failure (of the event or its data) returns an error; the event-level failure is the registry’s [*source.DecodeError], and a data-shape failure is wrapped with context.

func DecodeEvent(r *source.Registry, m source.Message) (cloudevents.Event, error)

DecodeEvent decodes m through r and recovers the cloudevents.Event, the convenience path a CloudEvents handler uses instead of calling [source.Registry.Decode] and EventOf in sequence. A decode failure returns the [*source.DecodeError] from the registry; a value that is not a CloudEvent (some other codec matched) returns a *source.DecodeError wrapping [source.ErrPoison], since a payload that decoded to the wrong shape cannot be retried into the right one.

func EventOf(v any) (cloudevents.Event, bool)

EventOf recovers the cloudevents.Event a Codec decoded from a registry result. It is the typed bridge between [source.Registry.Decode] (which returns any) and a handler that works with CloudEvents: pass the decoded value, get back the event and whether the value was in fact a CloudEvent.

A false return means the registry routed the message to a different codec (the content type matched something other than this codec); it is not a decode failure.

func Extensions(e cloudevents.Event) source.Headers

Extensions surfaces an event’s extension attributes as core [source.Headers], each key prefixed with ExtensionHeaderPrefix, so a handler reads CloudEvents extensions through the same typed-header surface as any other inbound metadata instead of a separate magic-string map. Values are rendered to their string form; non-string extensions use fmt’s default rendering. The result is a fresh slice in stable (sorted-by-name) order the caller may retain.

Codec decodes a [source.Message] into a cloudevents.Event, handling both CloudEvents content modes. It holds no mutable state and is safe for concurrent use; the [source.Hopper] decodes from per-lane worker goroutines.

Construct it with New and register it on a [source.Registry] (or set it as the default). It is the instance seam — there is no package-global registration.

type Codec struct{}

func New() Codec

New returns a Codec. It is a constructor for symmetry with the rest of the suite and to leave room for future options; the zero value is equally valid since the codec carries no state.

func (Codec) Decode(data []byte, h source.Headers) (any, error)

Decode turns a message’s bytes and headers into a cloudevents.Event, selecting the content mode from the content-type header via Detect. The returned value is a cloudevents.Event (by value); recover it with EventOf and read its data with DataAs or DecodeData.

A malformed structured payload, a binary event missing a required attribute, or an event that fails CloudEvents validation returns an error the [source.Registry] wraps in a [*source.DecodeError] (which reports [source.ErrPoison]): a structurally invalid event cannot be retried into validity.

Mode is the CloudEvents content mode a message arrived in.

type Mode int

const (
// Binary is the content mode in which context attributes ride as headers
// and the data is the raw body.
Binary Mode = iota
// Structured is the content mode in which the whole event is one JSON
// document in the body.
Structured
)

func Detect(contentType string) Mode

Detect reports the CloudEvents content mode for a content type. A media type beginning with the structured prefix (“application/cloudevents”) is Structured; everything else, including the empty string, is Binary. Any parameters after a ”;” (charset, for one) are ignored.

func (m Mode) String() string

String renders the mode for logs and diagnostics.

Generated by gomarkdoc