Convert Figma logo to code with AI

confluentinc logoconfluent-kafka-go

Confluent's Apache Kafka Golang client

4,685
662
4,685
263

Top Related Projects

11,634

Sarama is a Go library for Apache Kafka.

Kafka library in Go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 3.8+. Producing, consuming, transacting, administrating, etc.

2,368

Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go.

Quick Overview

Confluent-kafka-go is the official Golang client for Apache Kafka, developed and maintained by Confluent. It provides a high-performance, pure Go library for producing and consuming messages to and from Kafka topics, with support for various Kafka features and configurations.

Pros

  • High performance and low overhead due to its use of librdkafka under the hood
  • Comprehensive support for Kafka features, including transactions, exactly-once semantics, and consumer groups
  • Well-documented and actively maintained by Confluent
  • Supports both low-level and high-level consumer implementations

Cons

  • Requires CGO and librdkafka to be installed, which can complicate cross-compilation and deployment
  • Learning curve can be steep for developers new to Kafka concepts
  • Some users report occasional stability issues or unexpected behavior in edge cases
  • Limited support for older Kafka versions

Code Examples

  1. Producing a message:
producer, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
if err != nil {
    panic(err)
}
defer producer.Close()

topic := "my-topic"
producer.Produce(&kafka.Message{
    TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
    Value:          []byte("Hello, Kafka!"),
}, nil)
  1. Consuming messages:
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
    "bootstrap.servers": "localhost:9092",
    "group.id":          "my-group",
    "auto.offset.reset": "earliest",
})
if err != nil {
    panic(err)
}
defer consumer.Close()

consumer.SubscribeTopics([]string{"my-topic"}, nil)

for {
    msg, err := consumer.ReadMessage(-1)
    if err == nil {
        fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
    } else {
        fmt.Printf("Consumer error: %v (%v)\n", err, msg)
    }
}
  1. Using transactions:
producer, err := kafka.NewProducer(&kafka.ConfigMap{
    "bootstrap.servers":    "localhost:9092",
    "transactional.id":     "my-transactional-id",
    "enable.idempotence":   true,
})
if err != nil {
    panic(err)
}
defer producer.Close()

err = producer.InitTransactions(nil)
if err != nil {
    panic(err)
}

err = producer.BeginTransaction()
if err != nil {
    panic(err)
}

// Produce messages within the transaction
// ...

err = producer.CommitTransaction(nil)
if err != nil {
    panic(err)
}

Getting Started

  1. Install librdkafka:

    brew install librdkafka  # macOS
    apt-get install librdkafka-dev  # Ubuntu
    
  2. Install the Go library:

    go get github.com/confluentinc/confluent-kafka-go/kafka
    
  3. Import and use in your Go code:

    import "github.com/confluentinc/confluent-kafka-go/kafka"
    
  4. Configure and create a producer or consumer as shown in the code examples above.

Competitor Comparisons

11,634

Sarama is a Go library for Apache Kafka.

Pros of Sarama

  • Pure Go implementation, easier to build and deploy
  • More flexible API, allowing fine-grained control over Kafka operations
  • Supports older Kafka versions (0.8 and above)

Cons of Sarama

  • Requires more manual configuration and management
  • Less integrated with Confluent-specific features
  • May have slightly lower performance due to pure Go implementation

Code Comparison

Sarama:

producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, nil)
message := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test message")}
partition, offset, err := producer.SendMessage(message)

Confluent-Kafka-Go:

producer, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
topic := "test"
err = producer.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte("test message")}, nil)

Both libraries provide similar functionality for producing messages to Kafka, but Sarama offers more granular control over the process. Confluent-Kafka-Go, being a wrapper around the C library librdkafka, provides a simpler API and potentially better performance. The choice between the two depends on specific project requirements, such as performance needs, deployment constraints, and desired level of control over Kafka operations.

Kafka library in Go

Pros of kafka-go

  • Pure Go implementation, making it easier to build and deploy
  • Better support for newer Kafka protocol features
  • More idiomatic Go API design

Cons of kafka-go

  • Less mature and potentially less stable than confluent-kafka-go
  • May have lower performance in some scenarios
  • Smaller community and ecosystem compared to confluent-kafka-go

Code Comparison

kafka-go:

writer := kafka.NewWriter(kafka.WriterConfig{
    Brokers: []string{"localhost:9092"},
    Topic:   "my-topic",
})
err := writer.WriteMessages(context.Background(),
    kafka.Message{Value: []byte("Hello, World!")},
)

confluent-kafka-go:

p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
topic := "my-topic"
p.Produce(&kafka.Message{
    TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
    Value:          []byte("Hello, World!"),
}, nil)

Both libraries offer similar functionality for producing messages to Kafka topics. kafka-go provides a more Go-idiomatic API with its Writer interface, while confluent-kafka-go uses a Producer object that closely mirrors the C library's API. The choice between them often depends on specific project requirements, performance needs, and developer preferences.

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 3.8+. Producing, consuming, transacting, administrating, etc.

Pros of franz-go

  • Pure Go implementation without CGO dependencies, making it easier to build and deploy
  • More comprehensive API coverage, including admin operations and newer Kafka features
  • Better performance in some scenarios, especially for high-throughput applications

Cons of franz-go

  • Less mature and battle-tested compared to confluent-kafka-go
  • Smaller community and ecosystem support
  • May have fewer enterprise-specific features that Confluent provides

Code Comparison

franz-go:

producer, err := kgo.NewProducer(&kgo.ProducerOpts{
    Brokers: []string{"localhost:9092"},
})
producer.Produce(context.Background(), &kgo.Record{
    Topic: "my-topic",
    Value: []byte("Hello, Kafka!"),
}, nil)

confluent-kafka-go:

producer, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
producer.Produce(&kafka.Message{
    TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
    Value:          []byte("Hello, Kafka!"),
}, nil)

Both libraries offer similar functionality, but franz-go provides a more Go-idiomatic API with context support and structured options. confluent-kafka-go's API is closer to the C librdkafka library it wraps.

2,368

Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go.

Pros of Goka

  • Provides a higher-level abstraction for building stream processing applications
  • Offers built-in support for local storage and state management
  • Includes a web view for monitoring and debugging

Cons of Goka

  • Less mature and less widely adopted compared to Confluent Kafka Go
  • May have a steeper learning curve due to its more opinionated approach
  • Limited to Kafka-specific use cases, while Confluent Kafka Go is more versatile

Code Comparison

Goka example:

func (g *game) Consume(ctx goka.Context, msg interface{}) {
    var state GameState
    ctx.Value(&state)
    // Process message and update state
    ctx.SetValue(&state)
}

Confluent Kafka Go example:

consumer, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
err = consumer.SubscribeTopics([]string{"myTopic"}, nil)
for {
    msg, err := consumer.ReadMessage(-1)
    // Process message
}

Goka provides a more streamlined approach for building stateful stream processors, while Confluent Kafka Go offers lower-level control and broader Kafka functionality. Goka's abstraction simplifies development for specific use cases, but Confluent Kafka Go's flexibility makes it suitable for a wider range of Kafka-related tasks.

Convert Figma logo designs to code with AI

Visual Copilot

Introducing Visual Copilot: A new AI model to turn Figma designs to high quality code using your components.

Try Visual Copilot

README

Confluent's Golang Client for Apache KafkaTM

confluent-kafka-go is Confluent's Golang client for Apache Kafka and the Confluent Platform.

Features:

  • High performance - confluent-kafka-go is a lightweight wrapper around librdkafka, a finely tuned C client.

  • Reliability - There are a lot of details to get right when writing an Apache Kafka client. We get them right in one place (librdkafka) and leverage this work across all of our clients (also confluent-kafka-python and confluent-kafka-dotnet).

  • Supported - Commercial support is offered by Confluent.

  • Future proof - Confluent, founded by the original creator/co-creator of Kafka, is building a streaming platform with Apache Kafka at its core. It's high priority for us that client features keep pace with core Apache Kafka and components of the Confluent Platform.

The Golang bindings provides a high-level Producer and Consumer with support for the balanced consumer groups of Apache Kafka 0.9 and above.

See the API documentation for more information.

For a step-by-step guide on using the client see Getting Started with Apache Kafka and Golang.

Examples

High-level balanced consumer

import (
	"fmt"
	"time"

	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {

	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": "localhost",
		"group.id":          "myGroup",
		"auto.offset.reset": "earliest",
	})

	if err != nil {
		panic(err)
	}

	err = c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)

	if err != nil {
		panic(err)
	}

	// A signal handler or similar could be used to set this to false to break the loop.
	run := true

	for run {
		msg, err := c.ReadMessage(time.Second)
		if err == nil {
			fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
		} else if !err.(kafka.Error).IsTimeout() {
			// The client will automatically try to recover from all errors.
			// Timeout is not considered an error because it is raised by
			// ReadMessage in absence of messages.
			fmt.Printf("Consumer error: %v (%v)\n", err, msg)
		}
	}

	c.Close()
}

Producer

import (
	"fmt"
	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {

	p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
	if err != nil {
		panic(err)
	}

	defer p.Close()

	// Delivery report handler for produced messages
	go func() {
		for e := range p.Events() {
			switch ev := e.(type) {
			case *kafka.Message:
				if ev.TopicPartition.Error != nil {
					fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
				} else {
					fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
				}
			}
		}
	}()

	// Produce messages to topic (asynchronously)
	topic := "myTopic"
	for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
		p.Produce(&kafka.Message{
			TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
			Value:          []byte(word),
		}, nil)
	}

	// Wait for message deliveries before shutting down
	p.Flush(15 * 1000)
}

More elaborate examples are available in the examples directory, including how to configure the Go client for use with Confluent Cloud.

Getting Started

Supports Go 1.17+ and librdkafka 2.6.1+.

Using Go Modules

You can use Go Modules to install confluent-kafka-go.

Import the kafka package from GitHub in your code:

import "github.com/confluentinc/confluent-kafka-go/v2/kafka"

Build your project:

go build ./...

If you are building for Alpine Linux (musl), -tags musl must be specified.

go build -tags musl ./...

A dependency to the latest stable version of confluent-kafka-go should be automatically added to your go.mod file.

Install the client

Manual install:

go get -u github.com/confluentinc/confluent-kafka-go/v2/kafka

Golang import:

import "github.com/confluentinc/confluent-kafka-go/v2/kafka"

librdkafka

Prebuilt librdkafka binaries are included with the Go client and librdkafka does not need to be installed separately on the build or target system. The following platforms are supported by the prebuilt librdkafka binaries:

  • Mac OSX x64 and arm64
  • glibc-based Linux x64 and arm64 (e.g., RedHat, Debian, CentOS, Ubuntu, etc) - without GSSAPI/Kerberos support
  • musl-based Linux amd64 and arm64 (Alpine) - without GSSAPI/Kerberos support
  • Windows amd64 - without GSSAPI/Kerberos support

When building your application for Alpine Linux (musl libc) you must pass -tags musl to go get, go build, etc.

CGO_ENABLED must NOT be set to 0 since the Go client is based on the C library librdkafka.

If GSSAPI/Kerberos authentication support is required you will need to install librdkafka separately, see the Installing librdkafka chapter below, and then build your Go application with -tags dynamic.

Installing librdkafka

If the bundled librdkafka build is not supported on your platform, or you need a librdkafka with GSSAPI/Kerberos support, you must install librdkafka manually on the build and target system using one of the following alternatives:

  • For Debian and Ubuntu based distros, install librdkafka-dev from the standard repositories or using Confluent's Deb repository.
  • For Redhat based distros, install librdkafka-devel using Confluent's YUM repository.
  • For MacOS X, install librdkafka from Homebrew. You may also need to brew install pkg-config if you don't already have it: brew install librdkafka pkg-config.
  • For Alpine: apk add librdkafka-dev pkgconf
  • For Windows: there are no official/supported packages, but static builds are included for Windows/x64. Installing from source is needed only for GSSAPI/Kerberos support.
  • For source builds, see instructions below.

Build from source:

git clone https://github.com/confluentinc/librdkafka.git
cd librdkafka
./configure
make
sudo make install

After installing librdkafka you will need to build your Go application with -tags dynamic.

Note: If you use the master branch of the Go client, then you need to use the master branch of librdkafka.

confluent-kafka-go requires librdkafka v1.9.0 or later.

Static builds on Linux

Since we are using cgo, Go builds a dynamically linked library even when using the prebuilt, statically-compiled librdkafka as described in the librdkafka chapter.

For glibc based systems, if the system where the client is being compiled is different from the target system, especially when the target system is older, there is a glibc version error when trying to run the compiled client.

Unfortunately, if we try building a statically linked binary, it doesn't solve the problem, since there is no way to have truly static builds using glibc. This is because there are some functions in glibc, like getaddrinfo which need the shared version of the library even when the code is compiled statically.

One way around this is to either use a container/VM to build the binary, or install an older version of glibc on the system where the client is being compiled.

The other way is using musl to create truly static builds for Linux. To do this, install it for your system.

Static compilation command, meant to be used alongside the prebuilt librdkafka bundle:

CC=/path/to/musl-gcc go build --ldflags '-linkmode external -extldflags "-static"' -tags musl

API Strands

The recommended API strand is the Function-Based one, the Channel-Based one is documented in examples/legacy.

Function-Based Consumer

Messages, errors and events are polled through the consumer.Poll() function.

It has direct mapping to underlying librdkafka functionality.

See examples/consumer_example

Function-Based Producer

Application calls producer.Produce() to produce messages. Delivery reports are emitted on the producer.Events() or specified private channel.

Warnings

  • Produce() is a non-blocking call, if the internal librdkafka queue is full the call will fail and can be retried.

See examples/producer_example

License

Apache License v2.0

KAFKA is a registered trademark of The Apache Software Foundation and has been licensed for use by confluent-kafka-go. confluent-kafka-go has no affiliation with and is not endorsed by The Apache Software Foundation.

Developer Notes

See kafka/README

Contributions to the code, examples, documentation, et.al, are very much appreciated.

Make your changes, run gofmt, tests, etc, push your branch, create a PR, and sign the CLA.

Confluent Cloud

For a step-by-step guide on using the Golang client with Confluent Cloud see Getting Started with Apache Kafka and Golang on Confluent Developer.