source/idempotency
import "github.com/stablekernel/crucible/source/idempotency"Package idempotency provides a deduplicating [source.Middleware] for the crucible source seam: it suppresses re-processing of a message whose idempotency key has already been handled, acking the duplicate ([source.Skip], classified [source.Drop]) without re-running the handler. It is backed by the core [source.Deduper] capability, or any caller-supplied Store; the no-op default deduplicates nothing, so adding the middleware without a store is safe and inert.
The dedup key is derived by a configurable KeyFunc read from the [source.Message] (default: the message key, falling back to a header), never a magic-string lookup baked into the engine. For the state-machine binding the key is the machine’s state version, making redelivery provably idempotent with no external store; this package supplies the generic, store-backed form.
- Constants
- func DefaultKeyFunc(m source.Message) (string, bool)
- func Middleware(opts …Option) source.Middleware
- type KeyFunc
- type Option
- type Store
Constants
Section titled “Constants”DefaultKeyHeader is the typed header key the default KeyFunc reads an idempotency key from when the message carries no [source.Message.Key]. It is a named constant so producers and consumers agree on one key without an inline string.
const DefaultKeyHeader = "crucible-idempotency-key"func DefaultKeyFunc
Section titled “func DefaultKeyFunc”func DefaultKeyFunc(m source.Message) (string, bool)DefaultKeyFunc derives the key from the message key bytes, falling back to the DefaultKeyHeader header, and reports ok=false when neither is present.
func Middleware
Section titled “func Middleware”func Middleware(opts ...Option) source.MiddlewareMiddleware returns a [source.Middleware] that deduplicates by idempotency key. Before invoking the wrapped handler it derives the key (KeyFunc) and asks the Store whether it has been seen:
- already seen: the handler is skipped and the message is acked-and-dropped ([source.Skip]); the duplicate is settled without side effects.
- not seen (now recorded): the handler runs normally and its [source.Result] is returned unchanged.
- no derivable key, or a store error: the middleware fails open and runs the handler, never silently dropping a message it cannot confidently call a duplicate.
With no store configured the middleware is a pass-through, so it is safe to add unconditionally and supply a store later.
Example
package main
import ( "context" "fmt" "sync"
"github.com/stablekernel/crucible/source" "github.com/stablekernel/crucible/source/idempotency")
// setStore is a tiny in-memory idempotency.Store backed by a set.type setStore struct { mu sync.Mutex seen map[string]bool}
func (s *setStore) Seen(_ context.Context, key string) (bool, error) { s.mu.Lock() defer s.mu.Unlock() if s.seen[key] { return true, nil } s.seen[key] = true return false, nil}
func main() { store := &setStore{seen: map[string]bool{}}
processed := 0 base := func(_ context.Context, m source.Message) source.Result { processed++ fmt.Printf("processed %s\n", m.Key()) return source.Ack() } h := idempotency.Middleware(idempotency.WithStore(store))(base)
// The same message delivered twice runs the handler only once; the duplicate // is acked and dropped. msg := stubMsg{key: []byte("order-1")} r1 := h(context.Background(), msg) r2 := h(context.Background(), msg)
fmt.Printf("first: %s/%s\n", r1.Action, r1.Class) fmt.Printf("second: %s/%s\n", r2.Action, r2.Class) fmt.Printf("handler runs: %d\n", processed)}Output
Section titled “Output”processed order-1first: ack/unclassifiedsecond: ack/drophandler runs: 1type KeyFunc
Section titled “type KeyFunc”KeyFunc derives the idempotency key for a message and reports whether a key could be derived. A false ok means “no key” — the middleware then lets the message through undeduplicated rather than collapsing all keyless messages onto one empty key.
type KeyFunc func(m source.Message) (key string, ok bool)type Option
Section titled “type Option”Option configures the idempotency Middleware. Options are additive with no-op defaults.
type Option func(*config)func WithDeduper
Section titled “func WithDeduper”func WithDeduper(d source.Deduper) OptionWithDeduper sets the dedup backend from a core [source.Deduper], adapting it to a Store. The default is none. A nil deduper is ignored.
func WithKeyFunc
Section titled “func WithKeyFunc”func WithKeyFunc(f KeyFunc) OptionWithKeyFunc sets the KeyFunc that extracts the idempotency key from a message. The default is DefaultKeyFunc. A nil function is ignored.
func WithStore
Section titled “func WithStore”func WithStore(s Store) OptionWithStore sets the Store the middleware deduplicates against. The default is none (a pass-through). A nil store is ignored.
type Store
Section titled “type Store”Store records and reports which idempotency keys have been processed. It is the caller-provided seam the middleware deduplicates against — an in-memory set, a Redis SETNX, a database unique index. It mirrors [source.Deduper] so any existing Deduper is also a Store via FromDeduper. Implementations must be safe for concurrent use.
type Store interface { // Seen reports whether key has already been processed and, if not, records it // — atomically, so two concurrent deliveries of the same key cannot both // observe false. A non-nil error is treated as "unknown": the middleware fails // open and lets the handler run rather than dropping a possibly-new message. Seen(ctx context.Context, key string) (seen bool, err error)}func FromDeduper
Section titled “func FromDeduper”func FromDeduper(d source.Deduper) StoreFromDeduper adapts a [source.Deduper] to a Store, so the engine’s dedup capability and this middleware share one backend.
Generated by gomarkdoc