Skip to content

source/kafka

import "github.com/stablekernel/crucible/source/kafka"

Package kafka is a source ingress adapter that consumes records from Apache Kafka (and API-compatible brokers such as RedPanda) through the pure-Go franz-go client. It depends only on the standard library, crucible/source, and franz-go. Construct an Inlet with New, hand it to a source.Hopper, and the engine drives the consume loop, decoding, ordering, and settlement.

Delivery is at-least-once: the adapter never commits an offset before its handler reports success. The franz-go client is configured with AutoCommitMarks, so only records the engine settles successfully are marked, and the marked offsets are committed on graceful drain and on rebalance. Each handler [source.Result] maps onto Kafka as follows:

  • Ack marks the record for commit (commit-after-process).
  • Nak does NOT mark the record, so it is re-read on restart or rebalance. A requeue delay is best-effort, applied by pausing and re-seeking the record’s partition; this is a documented divergence from JetStream’s native delayed nak (Kafka has no per-message redelivery delay).
  • Term produces the record to the configured dead-letter topic, then marks it for commit so it is not re-read.
  • InProgress is a no-op: Kafka has no per-message ack deadline to extend.
  • Manual is a no-op: the handler settled the record itself through [source.Message.As] and the underlying *kgo.Client.

The [Subscription] this adapter opens satisfies several optional capability interfaces the engine discovers by type assertion, without leaking vendor types: [source.Seekable] (live offset reposition via SetOffsets), [source.ConsumerGroups] (rebalance assign/revoke/lost hooks with drain-and-commit on revoke), [source.PartitionOrdered] (per-partition order), [source.LagReporter] (end-offset lag), and [source.Transactional] (Kafka exactly-once, via a group transact session) when constructed with WithTransactional.

No franz-go type appears in an exported signature. A power user who must drop to the driver reaches the underlying *kgo.Client through the inlet’s As method (Inlet.As) and a delivered record through [source.Message.As] with a **kgo.Record target.

Experimental (pre-v1); the API may change until the suite locks v1.0.0.

ErrNoDLQTopic reports that a record was termed (dead-lettered) but the inlet was constructed without a dead-letter topic. Configure one with WithDLQTopic. Match it with errors.Is.

var ErrNoDLQTopic = errors.New("source/kafka: term requested but no dead-letter topic configured")

ErrNoSeedBrokers reports that New was called without any seed brokers and without an injected client. Match it with errors.Is.

var ErrNoSeedBrokers = errors.New("source/kafka: no seed brokers configured")

Inlet is a franz-go-backed [source.Inlet]. It builds (or wraps) a *kgo.Client and opens a [source.Subscription] that the engine drives. One Inlet maps to one client; open a single Subscription per Inlet (the Hopper drives one).

Inlet is safe for concurrent use across its own methods, though a single Subscribe is the expected pattern.

type Inlet struct {
// contains filtered or unexported fields
}

func New(opts ...Option) (*Inlet, error)

New constructs an Inlet from opts. It does not dial: the franz-go client is built lazily on the first Subscribe (so the consumer group and topics are known), unless a client was injected with WithClient. New fails only if no seed brokers and no client were provided.

Example

ExampleNew builds an Inlet over a Kafka cluster and reports its name. In a real program the Inlet is handed to a source.Hopper, which drives the consume loop, decoding, ordering, and settlement; here we only construct it to keep the example broker-free.

package main
import (
"context"
"fmt"
kafkasource "github.com/stablekernel/crucible/source/kafka"
)
func main() {
inlet, err := kafkasource.New(
kafkasource.WithSeedBrokers("localhost:9092"),
kafkasource.WithClientID("orders-consumer"),
kafkasource.WithDLQTopic("orders.DLQ"),
)
if err != nil {
fmt.Println("new:", err)
return
}
defer func() { _ = inlet.Close() }()
// The inlet is a source.Inlet: hand it to a Hopper to consume.
// sub, _ := inlet.Subscribe(ctx, source.SubscribeConfig{Topics: []string{"orders"}, Group: "orders"})
// hopper.Run(ctx, sub, handler)
_ = context.Background()
fmt.Println("inlet ready")
}
inlet ready

func (in *Inlet) As(target any) bool

As assigns the underlying *kgo.Client to target if target is a **kgo.Client, returning whether it did. It is the documented escape hatch for power users who must reach the franz-go client (custom admin calls, manual commits); prefer the neutral [source.Inlet] surface. It reports false before the first Subscribe builds the client.

func (in *Inlet) Close() error

Close releases the inlet’s client when the inlet built it. When a client was injected with WithClient the caller owns its lifecycle and Close is a no-op for it. Close live subscriptions first; closing the inlet does not settle in-flight records.

func (in *Inlet) Subscribe(_ context.Context, cfg source.SubscribeConfig) (source.Subscription, error)

Subscribe builds (or reuses) the franz-go client for cfg and returns a live [source.Subscription]. At least one topic is required. The returned subscription owns the rebalance hooks, so it must be installed before the client begins polling; this method wires them when it builds the client.

Option configures an Inlet. Options are additive with zero-value defaults; a nil or empty value passed to a With* option leaves the default in place.

type Option func(*config)

func WithBalancer(balancers ...kgo.GroupBalancer) Option

WithBalancer sets the consumer-group partition-assignment balancer(s), in preference order, used when a subscription joins a group. The default is franz-go’s cooperative-sticky balancer. Construct balancers with kgo.CooperativeStickyBalancer, kgo.RangeBalancer, and friends. Nil entries are ignored.

func WithClient(client *kgo.Client) Option

WithClient injects a pre-built *kgo.Client, bypassing this package’s client construction entirely. The inlet then neither dials nor closes the client: its lifecycle is the caller’s. Use it to share a client or apply configuration this package does not expose. A nil client is ignored.

A client injected this way must already be configured to consume as a group member with AutoCommitMarks (and BlockRebalanceOnPoll) for the ack model to hold; the typed options are the supported path.

func WithClientID(id string) Option

WithClientID sets the Kafka client ID the broker logs and quotas requests under. The default is franz-go’s. An empty ID is ignored.

func WithClientOptions(opts ...kgo.Opt) Option

WithClientOptions appends raw franz-go options for tuning this package does not surface (fetch sizes, timeouts, instrumentation). They are applied after the options this package derives, so they win on conflict. It is the power-user seam; prefer the typed With* options above.

func WithDLQTopic(topic string) Option

WithDLQTopic sets the topic an ActionTerm result produces a rejected record to before committing it. Without a dead-letter topic a Term fails with ErrNoDLQTopic rather than silently dropping the record. An empty topic is ignored.

func WithMaxPollRecords(n int) Option

WithMaxPollRecords bounds how many records a single fetch yields, the lever the engine’s bounded in-flight window rides on. The default (0) lets franz-go decide. A value < 1 is ignored.

func WithSASL(mechanisms ...sasl.Mechanism) Option

WithSASL configures one or more SASL authentication mechanisms (PLAIN, SCRAM, OAUTHBEARER, AWS_MSK_IAM) built from franz-go’s pkg/sasl helpers. The franz-go sasl.Mechanism is an interface, not a struct, so this keeps no franz-go data type in the exported surface beyond the auth abstraction itself. Nil entries are ignored.

func WithSeedBrokers(brokers ...string) Option

WithSeedBrokers sets the broker addresses (host:port) the inlet dials to discover the cluster. At least one is required unless a client is injected with WithClient. Empty input is ignored.

func WithTLS(cfg *tls.Config) Option

WithTLS enables TLS on broker connections using cfg. A nil cfg is ignored (connections stay plaintext); pass a non-nil &tls.Config{} for system-default TLS.

func WithTransactional() Option

WithTransactional enables Kafka exactly-once semantics: the subscription satisfies [source.Transactional], fencing settlement of consumed records inside a producer transaction so consume-process-produce is atomic. It requires a transactional ID, which franz-go derives; the inlet builds a kgo.GroupTransactSession instead of a plain client.

Generated by gomarkdoc