source/kafka
import "github.com/stablekernel/crucible/source/kafka"Package kafka is a source ingress adapter that consumes records from Apache Kafka (and API-compatible brokers such as RedPanda) through the pure-Go franz-go client. It depends only on the standard library, crucible/source, and franz-go. Construct an Inlet with New, hand it to a source.Hopper, and the engine drives the consume loop, decoding, ordering, and settlement.
Ack model
Section titled “Ack model”Delivery is at-least-once: the adapter never commits an offset before its handler reports success. The franz-go client is configured with AutoCommitMarks, so only records the engine settles successfully are marked, and the marked offsets are committed on graceful drain and on rebalance. Each handler [source.Result] maps onto Kafka as follows:
- Ack marks the record for commit (commit-after-process).
- Nak does NOT mark the record, so it is re-read on restart or rebalance. A requeue delay is best-effort, applied by pausing and re-seeking the record’s partition; this is a documented divergence from JetStream’s native delayed nak (Kafka has no per-message redelivery delay).
- Term produces the record to the configured dead-letter topic, then marks it for commit so it is not re-read.
- InProgress is a no-op: Kafka has no per-message ack deadline to extend.
- Manual is a no-op: the handler settled the record itself through [source.Message.As] and the underlying *kgo.Client.
Capabilities
Section titled “Capabilities”The [Subscription] this adapter opens satisfies several optional capability interfaces the engine discovers by type assertion, without leaking vendor types: [source.Seekable] (live offset reposition via SetOffsets), [source.ConsumerGroups] (rebalance assign/revoke/lost hooks with drain-and-commit on revoke), [source.PartitionOrdered] (per-partition order), [source.LagReporter] (end-offset lag), and [source.Transactional] (Kafka exactly-once, via a group transact session) when constructed with WithTransactional.
Vendor escape hatch
Section titled “Vendor escape hatch”No franz-go type appears in an exported signature. A power user who must drop to the driver reaches the underlying *kgo.Client through the inlet’s As method (Inlet.As) and a delivered record through [source.Message.As] with a **kgo.Record target.
Stability
Section titled “Stability”Experimental (pre-v1); the API may change until the suite locks v1.0.0.
- Variables
- type Inlet
- type Option
- func WithBalancer(balancers …kgo.GroupBalancer) Option
- func WithClient(client *kgo.Client) Option
- func WithClientID(id string) Option
- func WithClientOptions(opts …kgo.Opt) Option
- func WithDLQTopic(topic string) Option
- func WithMaxPollRecords(n int) Option
- func WithSASL(mechanisms …sasl.Mechanism) Option
- func WithSeedBrokers(brokers …string) Option
- func WithTLS(cfg *tls.Config) Option
- func WithTransactional() Option
Variables
Section titled “Variables”ErrNoDLQTopic reports that a record was termed (dead-lettered) but the inlet was constructed without a dead-letter topic. Configure one with WithDLQTopic. Match it with errors.Is.
var ErrNoDLQTopic = errors.New("source/kafka: term requested but no dead-letter topic configured")ErrNoSeedBrokers reports that New was called without any seed brokers and without an injected client. Match it with errors.Is.
var ErrNoSeedBrokers = errors.New("source/kafka: no seed brokers configured")type Inlet
Section titled “type Inlet”Inlet is a franz-go-backed [source.Inlet]. It builds (or wraps) a *kgo.Client and opens a [source.Subscription] that the engine drives. One Inlet maps to one client; open a single Subscription per Inlet (the Hopper drives one).
Inlet is safe for concurrent use across its own methods, though a single Subscribe is the expected pattern.
type Inlet struct { // contains filtered or unexported fields}func New
Section titled “func New”func New(opts ...Option) (*Inlet, error)New constructs an Inlet from opts. It does not dial: the franz-go client is built lazily on the first Subscribe (so the consumer group and topics are known), unless a client was injected with WithClient. New fails only if no seed brokers and no client were provided.
Example
ExampleNew builds an Inlet over a Kafka cluster and reports its name. In a real program the Inlet is handed to a source.Hopper, which drives the consume loop, decoding, ordering, and settlement; here we only construct it to keep the example broker-free.
package main
import ( "context" "fmt"
kafkasource "github.com/stablekernel/crucible/source/kafka")
func main() { inlet, err := kafkasource.New( kafkasource.WithSeedBrokers("localhost:9092"), kafkasource.WithClientID("orders-consumer"), kafkasource.WithDLQTopic("orders.DLQ"), ) if err != nil { fmt.Println("new:", err) return } defer func() { _ = inlet.Close() }()
// The inlet is a source.Inlet: hand it to a Hopper to consume. // sub, _ := inlet.Subscribe(ctx, source.SubscribeConfig{Topics: []string{"orders"}, Group: "orders"}) // hopper.Run(ctx, sub, handler) _ = context.Background()
fmt.Println("inlet ready")}Output
Section titled “Output”inlet readyfunc (*Inlet) As
Section titled “func (*Inlet) As”func (in *Inlet) As(target any) boolAs assigns the underlying *kgo.Client to target if target is a **kgo.Client, returning whether it did. It is the documented escape hatch for power users who must reach the franz-go client (custom admin calls, manual commits); prefer the neutral [source.Inlet] surface. It reports false before the first Subscribe builds the client.
func (*Inlet) Close
Section titled “func (*Inlet) Close”func (in *Inlet) Close() errorClose releases the inlet’s client when the inlet built it. When a client was injected with WithClient the caller owns its lifecycle and Close is a no-op for it. Close live subscriptions first; closing the inlet does not settle in-flight records.
func (*Inlet) Subscribe
Section titled “func (*Inlet) Subscribe”func (in *Inlet) Subscribe(_ context.Context, cfg source.SubscribeConfig) (source.Subscription, error)Subscribe builds (or reuses) the franz-go client for cfg and returns a live [source.Subscription]. At least one topic is required. The returned subscription owns the rebalance hooks, so it must be installed before the client begins polling; this method wires them when it builds the client.
type Option
Section titled “type Option”Option configures an Inlet. Options are additive with zero-value defaults; a nil or empty value passed to a With* option leaves the default in place.
type Option func(*config)func WithBalancer
Section titled “func WithBalancer”func WithBalancer(balancers ...kgo.GroupBalancer) OptionWithBalancer sets the consumer-group partition-assignment balancer(s), in preference order, used when a subscription joins a group. The default is franz-go’s cooperative-sticky balancer. Construct balancers with kgo.CooperativeStickyBalancer, kgo.RangeBalancer, and friends. Nil entries are ignored.
func WithClient
Section titled “func WithClient”func WithClient(client *kgo.Client) OptionWithClient injects a pre-built *kgo.Client, bypassing this package’s client construction entirely. The inlet then neither dials nor closes the client: its lifecycle is the caller’s. Use it to share a client or apply configuration this package does not expose. A nil client is ignored.
A client injected this way must already be configured to consume as a group member with AutoCommitMarks (and BlockRebalanceOnPoll) for the ack model to hold; the typed options are the supported path.
func WithClientID
Section titled “func WithClientID”func WithClientID(id string) OptionWithClientID sets the Kafka client ID the broker logs and quotas requests under. The default is franz-go’s. An empty ID is ignored.
func WithClientOptions
Section titled “func WithClientOptions”func WithClientOptions(opts ...kgo.Opt) OptionWithClientOptions appends raw franz-go options for tuning this package does not surface (fetch sizes, timeouts, instrumentation). They are applied after the options this package derives, so they win on conflict. It is the power-user seam; prefer the typed With* options above.
func WithDLQTopic
Section titled “func WithDLQTopic”func WithDLQTopic(topic string) OptionWithDLQTopic sets the topic an ActionTerm result produces a rejected record to before committing it. Without a dead-letter topic a Term fails with ErrNoDLQTopic rather than silently dropping the record. An empty topic is ignored.
func WithMaxPollRecords
Section titled “func WithMaxPollRecords”func WithMaxPollRecords(n int) OptionWithMaxPollRecords bounds how many records a single fetch yields, the lever the engine’s bounded in-flight window rides on. The default (0) lets franz-go decide. A value < 1 is ignored.
func WithSASL
Section titled “func WithSASL”func WithSASL(mechanisms ...sasl.Mechanism) OptionWithSASL configures one or more SASL authentication mechanisms (PLAIN, SCRAM, OAUTHBEARER, AWS_MSK_IAM) built from franz-go’s pkg/sasl helpers. The franz-go sasl.Mechanism is an interface, not a struct, so this keeps no franz-go data type in the exported surface beyond the auth abstraction itself. Nil entries are ignored.
func WithSeedBrokers
Section titled “func WithSeedBrokers”func WithSeedBrokers(brokers ...string) OptionWithSeedBrokers sets the broker addresses (host:port) the inlet dials to discover the cluster. At least one is required unless a client is injected with WithClient. Empty input is ignored.
func WithTLS
Section titled “func WithTLS”func WithTLS(cfg *tls.Config) OptionWithTLS enables TLS on broker connections using cfg. A nil cfg is ignored (connections stay plaintext); pass a non-nil &tls.Config{} for system-default TLS.
func WithTransactional
Section titled “func WithTransactional”func WithTransactional() OptionWithTransactional enables Kafka exactly-once semantics: the subscription satisfies [source.Transactional], fencing settlement of consumed records inside a producer transaction so consume-process-produce is atomic. It requires a transactional ID, which franz-go derives; the inlet builds a kgo.GroupTransactSession instead of a plain client.
Generated by gomarkdoc