Skip to content

Adapters

Every backend is its own optional Go module with its own go.mod, kept out of go.work and built GOWORK=off. The source core imports no vendor SDK; you add crucible/source/kafka only if you consume from Kafka, and its franz-go dependency never touches a service that does not. Each module exposes a narrow surface and ships a testcontainers integration leg against the real broker.

crucible/source/kafka is a group consumer over franz-go:

in, _ := kafka.New(kafka.WithBrokers("localhost:9092"), kafka.WithGroup("orders-svc"))
sub, _ := in.Subscribe(ctx, source.SubscribeConfig{Topic: "orders"})
sub.Receive(ctx, handler)

It does cooperative rebalance with drain-on-revoke, mark-commit-after-process, pause/resume backpressure, and seek/replay, and it supports transactional consume-process-produce (see Exactly-once (Kafka EOS) below). The shard is the partition; the high-water-mark commit subtlety is handled in the engine.

crucible/source/jetstream is a pull consumer over nats.go:

in, _ := jetstream.New(jetstream.WithURL("nats://localhost:4222"), jetstream.WithStream("ORDERS"))
sub, _ := in.Subscribe(ctx, source.SubscribeConfig{Subject: "orders.>"})
sub.Receive(ctx, handler)

It does Ack/Nak/Term/InProgress, MaxAckPending backpressure, the OrderedConsumer for single-threaded ordered delivery, and replay through the deliver-by-start options. A JetStream durable consumer is the grouping analog of a Kafka consumer group, but it has no partitions and no assignment callbacks, so JetStream does not pretend to be a ConsumerGroups backend.

crucible/source/redis is a consumer-group reader over go-redis:

in, _ := redis.New(redis.WithAddr("localhost:6379"), redis.WithGroup("orders-svc"), redis.WithConsumer("worker-1"))
sub, _ := in.Subscribe(ctx, source.SubscribeConfig{Topics: []string{"orders"}})
m, _ := sub.Next(ctx)
_ = sub.Settle(ctx, m, source.Ack())

It reads a Redis Stream with XREADGROUP, acks with XACK, leaves a naked entry in the pending list for a later XPENDING plus XCLAIM redelivery, and routes a terminated entry to a configured dead-letter stream before acking the original. Replay is by entry ID through XRANGE, and lag comes from XINFO GROUPS with an XLEN fallback. A Redis consumer group is the grouping analog of a Kafka consumer group, but it has no partitions and no assignment callbacks, so Redis does not pretend to be a ConsumerGroups backend.

Capabilities are detected by interface assertion, once, inside the engine. The table is honest: an adapter satisfies a capability only when its backend truly supports it, and a compile-time var _ Seekable = ... assertion in each module keeps it accurate.

CapabilitykafkajetstreamredisNotes
Seekableyesyesyeslive SetOffsets on Kafka; consumer-recreate on JetStream; entry-ID XRANGE on Redis
ConsumerGroupsyesnonoKafka rebalance hooks; JetStream and Redis have no partition assignment
SharedDurablenoyesyesthe JetStream durable-consumer and Redis consumer-group grouping analogs
PartitionOrderedyesnonoKafka per-partition order
OrderedDeliverynoyesnoJetStream OrderedConsumer, single-threaded
Batchedyesyesnobatched fetch on Kafka and JetStream
TransactionalyesnonoKafka EOS only; JetStream and Redis do not, and do not fake it
Deduperyesyesnodedup seam (see reliability)
LagReporteryesyesyesconsumer-lag gauge; Redis reports group lag from XINFO GROUPS

The divergences are documented, never papered over. A Nak(delay) is a real delayed redelivery on JetStream but is best-effort on Kafka (pause plus reseek), and that is called out where it matters.

Most ingress libraries stop at offset EOS: the broker commits offsets in a transaction so a record is consumed once. source goes one step further on Kafka and ties the records a transition emits to that same commit, so a consume-process-produce cycle is atomic: the produced records and the consumed offset are committed together, or neither is.

This is the Transactional capability, satisfied by the Kafka adapter only. JetStream and the in-memory adapter do not implement it, so the capability is absent rather than faked; on those backends use the at-least-once path. Build a transactional inlet with a stable, unique transactional id:

in, _ := kafka.New(
kafka.WithSeedBrokers("localhost:9092"),
kafka.WithTransactional("orders-eos-v1"), // the producer fencing token
)
sub, _ := in.Subscribe(ctx, source.SubscribeConfig{Topics: []string{"orders"}, Group: "orders-svc"})
if tx, ok := sub.(source.Transactional); ok {
// Begin fences the produce side and the offset of m in one transaction.
err := tx.Begin(ctx, m, func(ctx context.Context, t source.Tx) error {
return t.Produce(ctx, source.ProducedRecord{Topic: "orders.out", Value: out})
})
// err == nil means the produced records AND m's offset committed atomically.
}

Begin runs your work function inside a Kafka producer transaction. If it returns nil, the records you produced through the Tx are flushed and the consumed offset is committed as one unit. If it returns an error, or a rebalance fences the producer, the transaction aborts: the produced records are discarded, the offset is not committed, and the input is redelivered. Begin is the full settle path for that message, so the engine takes no further settle action.

Under the hood the adapter builds a franz-go GroupTransactSession with read-committed fetch isolation (so it never reads another producer’s uncommitted records) and no auto-commit (the transaction commits offsets). No franz-go type appears in the seam: you produce neutral source.ProducedRecord values, and the adapter maps them to the wire.

  • Kafka only. EOS here is the Kafka transactional protocol. JetStream and other backends fall back to the at-least-once path; they do not pretend to offer it.
  • One transactional id per logical consumer. The id is a producer fencing token. Two live consumers sharing one id fence each other; keep it stable across restarts and unique per consumer.
  • Idempotent downstream still matters across the persist gap. When the bridge persists machine state inside the transaction, a broker abort after a successful save can leave the instance advanced but the offset uncommitted. The redelivery is then deduplicated by the machine’s state version (see driving a statechart with EOS), so it acks as a no-op rather than double-applying.

source/cdc (Debezium/OpenCDC change-data-capture) is on the roadmap. The catalog is a convenience: any type that satisfies the Inlet interface is an inlet, and the memsource in-memory adapter is itself an Inlet, so you can drive the whole engine in a unit test with no broker at all.