Skip to content

source/idempotency

import "github.com/stablekernel/crucible/source/idempotency"

Package idempotency provides a deduplicating [source.Middleware] for the crucible source seam: it suppresses re-processing of a message whose idempotency key has already been handled, acking the duplicate ([source.Skip], classified [source.Drop]) without re-running the handler. It is backed by the core [source.Deduper] capability, or any caller-supplied Store; the no-op default deduplicates nothing, so adding the middleware without a store is safe and inert.

The dedup key is derived by a configurable KeyFunc read from the [source.Message] (default: the message key, falling back to a header), never a magic-string lookup baked into the engine. For the state-machine binding the key is the machine’s state version, making redelivery provably idempotent with no external store; this package supplies the generic, store-backed form.

DefaultKeyHeader is the typed header key the default KeyFunc reads an idempotency key from when the message carries no [source.Message.Key]. It is a named constant so producers and consumers agree on one key without an inline string.

const DefaultKeyHeader = "crucible-idempotency-key"

func DefaultKeyFunc(m source.Message) (string, bool)

DefaultKeyFunc derives the key from the message key bytes, falling back to the DefaultKeyHeader header, and reports ok=false when neither is present.

func Middleware(opts ...Option) source.Middleware

Middleware returns a [source.Middleware] that deduplicates by idempotency key. Before invoking the wrapped handler it derives the key (KeyFunc) and asks the Store whether it has been seen:

  • already seen: the handler is skipped and the message is acked-and-dropped ([source.Skip]); the duplicate is settled without side effects.
  • not seen (now recorded): the handler runs normally and its [source.Result] is returned unchanged.
  • no derivable key, or a store error: the middleware fails open and runs the handler, never silently dropping a message it cannot confidently call a duplicate.

With no store configured the middleware is a pass-through, so it is safe to add unconditionally and supply a store later.

Example

Apache-2.0
package main
import (
"context"
"fmt"
"sync"
"github.com/stablekernel/crucible/source"
"github.com/stablekernel/crucible/source/idempotency"
)
// setStore is a tiny in-memory idempotency.Store backed by a set.
type setStore struct {
mu sync.Mutex
seen map[string]bool
}
func (s *setStore) Seen(_ context.Context, key string) (bool, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.seen[key] {
return true, nil
}
s.seen[key] = true
return false, nil
}
func main() {
store := &setStore{seen: map[string]bool{}}
processed := 0
base := func(_ context.Context, m source.Message) source.Result {
processed++
fmt.Printf("processed %s\n", m.Key())
return source.Ack()
}
h := idempotency.Middleware(idempotency.WithStore(store))(base)
// The same message delivered twice runs the handler only once; the duplicate
// is acked and dropped.
msg := stubMsg{key: []byte("order-1")}
r1 := h(context.Background(), msg)
r2 := h(context.Background(), msg)
fmt.Printf("first: %s/%s\n", r1.Action, r1.Class)
fmt.Printf("second: %s/%s\n", r2.Action, r2.Class)
fmt.Printf("handler runs: %d\n", processed)
}
processed order-1
first: ack/unclassified
second: ack/drop
handler runs: 1

KeyFunc derives the idempotency key for a message and reports whether a key could be derived. A false ok means “no key” — the middleware then lets the message through undeduplicated rather than collapsing all keyless messages onto one empty key.

type KeyFunc func(m source.Message) (key string, ok bool)

Option configures the idempotency Middleware. Options are additive with no-op defaults.

type Option func(*config)

func WithDeduper(d source.Deduper) Option

WithDeduper sets the dedup backend from a core [source.Deduper], adapting it to a Store. The default is none. A nil deduper is ignored.

func WithKeyFunc(f KeyFunc) Option

WithKeyFunc sets the KeyFunc that extracts the idempotency key from a message. The default is DefaultKeyFunc. A nil function is ignored.

func WithStore(s Store) Option

WithStore sets the Store the middleware deduplicates against. The default is none (a pass-through). A nil store is ignored.

Store records and reports which idempotency keys have been processed. It is the caller-provided seam the middleware deduplicates against — an in-memory set, a Redis SETNX, a database unique index. It mirrors [source.Deduper] so any existing Deduper is also a Store via FromDeduper. Implementations must be safe for concurrent use.

type Store interface {
// Seen reports whether key has already been processed and, if not, records it
// — atomically, so two concurrent deliveries of the same key cannot both
// observe false. A non-nil error is treated as "unknown": the middleware fails
// open and lets the handler run rather than dropping a possibly-new message.
Seen(ctx context.Context, key string) (seen bool, err error)
}

func FromDeduper(d source.Deduper) Store

FromDeduper adapts a [source.Deduper] to a Store, so the engine’s dedup capability and this middleware share one backend.

Generated by gomarkdoc