Convert Figma logo to code with AI

lovoo logogoka

Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go.

2,345
175
2,345
26

Top Related Projects

NATS Streaming System Server

High-Performance server for NATS.io, the cloud and edge native messaging system.

Kafka library in Go

Confluent's Apache Kafka Golang client

Quick Overview

Goka is a framework for building distributed applications using the Kafka streaming platform. It provides a simple and efficient way to build event-driven, scalable, and fault-tolerant applications using the power of Kafka.

Pros

  • Simplicity: Goka abstracts away the complexity of Kafka, allowing developers to focus on building their application logic.
  • Scalability: Goka's design enables easy scaling of applications by leveraging Kafka's distributed nature.
  • Fault Tolerance: Goka's built-in support for fault tolerance and recovery helps ensure the reliability of distributed applications.
  • Flexibility: Goka can be used to build a wide range of distributed applications, from real-time data processing to event-driven microservices.

Cons

  • Learning Curve: Developers new to Kafka may need to invest time in understanding the underlying concepts and how Goka integrates with them.
  • Dependency on Kafka: Goka is tightly coupled with Kafka, so applications built with Goka are dependent on the availability and performance of the Kafka cluster.
  • Limited Documentation: The project's documentation, while generally good, could be more comprehensive and provide more examples and use cases.
  • Lack of Community: Compared to some other Kafka-related projects, Goka has a relatively small community, which may limit the availability of support and third-party resources.

Code Examples

Here are a few examples of how to use Goka:

  1. Defining a Goka Group:
group := goka.DefineGroup("example-group",
    goka.Input("example-topic", new(string), exampleHandler),
    goka.Persist(new(int64)),
)

This code defines a Goka group with an input topic, a persistent state, and a handler function.

  1. Emitting an Event:
emitter, err := goka.NewEmitter("example-topic", new(string))
if err != nil {
    // handle error
}
defer emitter.Close()

if err := emitter.Emit("key", "value"); err != nil {
    // handle error
}

This code creates a Goka emitter and uses it to emit an event to the "example-topic" topic.

  1. Consuming from a Goka Group:
consumer, err := goka.NewConsumer([]string{"kafka-broker-1", "kafka-broker-2"}, group)
if err != nil {
    // handle error
}
defer consumer.Close()

if err := consumer.Start(); err != nil {
    // handle error
}

This code creates a Goka consumer and starts consuming events from the "example-group" group.

Getting Started

To get started with Goka, follow these steps:

  1. Install the Goka library:
go get github.com/lovoo/goka
  1. Define your Goka group:
group := goka.DefineGroup("example-group",
    goka.Input("example-topic", new(string), exampleHandler),
    goka.Persist(new(int64)),
)
  1. Create a Goka emitter to send events:
emitter, err := goka.NewEmitter("example-topic", new(string))
if err != nil {
    // handle error
}
defer emitter.Close()

if err := emitter.Emit("key", "value"); err != nil {
    // handle error
}
  1. Create a Goka consumer to consume events:
consumer, err := goka.NewConsumer([]string{"kafka-broker-1", "kafka-broker-2"}, group)
if err != nil {
    // handle error
}
defer consumer.Close()

if err := consumer.Start(); err != nil {
    // handle error
}

That's the basic setup to get started with Goka. You can further customize your application by defining more input and output topics, handling different message types, and implementing your own processing logic.

Competitor Comparisons

NATS Streaming System Server

Pros of NATS Streaming Server

  • Highly scalable and performant, capable of handling millions of messages per second
  • Supports at-least-once delivery semantics and message persistence
  • Provides built-in fault tolerance and high availability features

Cons of NATS Streaming Server

  • More complex setup and configuration compared to Goka
  • Requires separate client libraries for different programming languages
  • May have higher resource usage for smaller-scale applications

Code Comparison

NATS Streaming Server (Go client):

sc, _ := stan.Connect(clusterID, clientID)
sub, _ := sc.Subscribe("foo", func(m *stan.Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
})
sc.Publish("foo", []byte("Hello World"))

Goka:

g := goka.DefineGroup(group,
    goka.Input(topic, new(codec.String), func(ctx goka.Context, msg interface{}) {
        fmt.Printf("Received: %v\n", msg)
    }),
)
go g.Run(context.Background())

Summary

NATS Streaming Server is a more robust and feature-rich solution for high-throughput messaging, offering better scalability and persistence. However, it comes with increased complexity and resource requirements. Goka, on the other hand, provides a simpler, Go-specific approach to stream processing, which may be more suitable for smaller-scale applications or those primarily using Go.

High-Performance server for NATS.io, the cloud and edge native messaging system.

Pros of NATS Server

  • NATS Server is a high-performance, open-source messaging system that provides a simple and efficient way to handle communication between distributed components.
  • It offers a wide range of features, including support for pub/sub, request/reply, and queue groups, making it a versatile choice for various use cases.
  • NATS Server is highly scalable and can handle a large number of concurrent connections, making it suitable for large-scale distributed systems.

Cons of NATS Server

  • NATS Server is a standalone server-based solution, which may add complexity to the deployment and management of the system compared to a library-based approach like Goka.
  • The learning curve for NATS Server may be steeper than that of Goka, as it requires understanding the server configuration and management.
  • NATS Server may have a higher resource footprint compared to a library-based solution, as it requires running a separate server process.

Code Comparison

Here's a brief code comparison between NATS Server and Goka:

NATS Server (Go):

// Create a new NATS connection
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
    log.Fatalf("Failed to connect to NATS: %v", err)
}
defer nc.Close()

// Publish a message
err = nc.Publish("my_topic", []byte("Hello, NATS!"))
if err != nil {
    log.Fatalf("Failed to publish message: %v", err)
}

Goka (Go):

// Create a new Goka processor
p := goka.NewProcessor([]string{"my_topic"}, goka.DefineGroup(
    "my_group",
    goka.Input("my_topic", new(string), myHandler),
    goka.Persist(new(int64)),
))

// Handle a message
func myHandler(ctx goka.Context, msg interface{}) {
    value := msg.(*string)
    ctx.Emit("my_output_topic", ctx.Key(), value)
}

Kafka library in Go

Pros of Kafka-Go

  • Kafka-Go is a lightweight and efficient Kafka client library, making it a good choice for applications with low resource requirements.
  • The library provides a simple and intuitive API, making it easy to integrate with existing Go projects.
  • Kafka-Go supports advanced Kafka features, such as consumer groups and partitions, allowing for more sophisticated Kafka usage.

Cons of Kafka-Go

  • Kafka-Go may lack some of the advanced features and functionality provided by Goka, such as built-in support for event sourcing and stream processing.
  • The library may have a smaller community and ecosystem compared to Goka, which could make it more challenging to find resources and support.

Code Comparison

Kafka-Go:

conn, _ := kafka.Dial("tcp", "localhost:9092")
defer conn.Close()

topic := "my-topic"
partition := 0

offset, err := conn.ReadOffset(topic, partition, kafka.OffsetNewest)
if err != nil {
    // handle error
}

messages, err := conn.ReadMessages(topic, partition, offset)
if err != nil {
    // handle error
}

for _, msg := range messages {
    fmt.Printf("message at offset %d: %s = %s\n", msg.Offset, string(msg.Key), string(msg.Value))
}

Goka:

g := goka.NewProcessor([]string{"kafka-brokers"}, goka.DefineGroup("my-group",
    goka.Input("my-topic", new(string)),
    goka.Output("my-output-topic", new(string)),
    func(ctx goka.Context, msg interface{}) {
        value := msg.(string)
        ctx.Emit("my-output-topic", value)
    },
))

err := g.Run(context.Background())
if err != nil {
    // handle error
}

Confluent's Apache Kafka Golang client

Pros of confluentinc/confluent-kafka-go

  • Extensive documentation and community support
  • Supports a wide range of Kafka features, including consumer groups, offsets, and transactions
  • Provides a high-level API that abstracts away the complexity of the Kafka protocol

Cons of confluentinc/confluent-kafka-go

  • Larger codebase and dependency footprint compared to Goka
  • May have a steeper learning curve for developers unfamiliar with Kafka
  • Potentially less flexible than a more lightweight library like Goka

Code Comparison

Goka (lovoo/goka):

func (g *Partition) Consume() {
    for msg := range g.messages {
        g.process(msg)
    }
}

func (g *Partition) process(msg *sarama.ConsumerMessage) {
    key := string(msg.Key)
    value := g.codec.Decode(msg.Value)
    g.emitter.EmitSync(key, value)
}

Confluent Kafka Go (confluentinc/confluent-kafka-go):

func Consume(brokers []string, topic string, group string) {
    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers": strings.Join(brokers, ","),
        "group.id":          group,
        "auto.offset.reset": "earliest",
    })
    // ...
    c.SubscribeTopics([]string{topic}, nil)
    for msg := range c.Messages() {
        // Process message
        c.CommitMessage(msg)
    }
}

Convert Figma logo designs to code with AI

Visual Copilot

Introducing Visual Copilot: A new AI model to turn Figma designs to high quality code using your components.

Try Visual Copilot

README

Goka

License Unit Tests/System Tests GoDoc Go Report Card

Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go. Goka aims to reduce the complexity of building highly scalable and highly available microservices.

Goka extends the concept of Kafka consumer groups by binding a state table to them and persisting them in Kafka. Goka provides sane defaults and a pluggable architecture.

Features

  • Message Input and Output

    Goka handles all the message input and output for you. You only have to provide one or more callback functions that handle messages from any of the Kafka topics you are interested in. You only ever have to deal with deserialized messages.

  • Scaling

    Goka automatically distributes the processing and state across multiple instances of a service. This enables effortless scaling when the load increases.

  • Fault Tolerance

    In case of a failure, Goka will redistribute the failed instance's workload and state across the remaining healthy instances. All state is safely stored in Kafka and messages delivered with at-least-once semantics.

  • Built-in Monitoring and Introspection

    Goka provides a web interface for monitoring performance and querying values in the state.

  • Modularity

    Goka fosters a pluggable architecture which enables you to replace for example the storage layer or the Kafka communication layer.

Documentation

This README provides a brief, high level overview of the ideas behind Goka.

Package API documentation is available at GoDoc and the Wiki provides several tips for configuring, extending, and deploying Goka applications.

Installation

You can install Goka by running the following command:

$ go get -u github.com/lovoo/goka

Configuration

Goka relies on Sarama to perform the actual communication with Kafka, which offers many configuration settings. The config is documented here.

In most cases, you need to modify the config, e.g. to set the Kafka Version.

cfg := goka.DefaultConfig()
cfg.Version = sarama.V2_4_0_0
goka.ReplaceGlobalConfig(cfg)

This makes all goka components use the updated config.

If you do need specific configuration for different components, you need to pass customized builders to the component's constructor, e.g.

cfg := goka.DefaultConfig()
// modify the config with component-specific settings


// use the config by creating a builder which allows to override global config
goka.NewProcessor(// ...,
	goka.WithConsumerGroupBuilder(
		goka.ConsumerGroupBuilderWithConfig(cfg),
	),
	// ...
)

Concepts

Goka relies on Kafka for message passing, fault-tolerant state storage and workload partitioning.

  • Emitters deliver key-value messages into Kafka. As an example, an emitter could be a database handler emitting the state changes into Kafka for other interested applications to consume.

  • Processor is a set of callback functions that consume and perform state transformations upon delivery of these emitted messages. Processor groups are formed of one or more instances of a processor. Goka distributes the partitions of the input topics across all processor instances in a processor group. This enables effortless scaling and fault-tolerance. If a processor instance fails, its partitions and state are reassigned to the remaining healthy members of the processor group. Processors can also emit further messages into Kafka.

  • Group table is the state of a processor group. It is a partitioned key-value table stored in Kafka that belongs to a single processor group. If a processor instance fails, the remaining instances will take over the group table partitions of the failed instance recovering them from Kafka.

  • Views are local caches of a complete group table. Views provide read-only access to the group tables and can be used to provide external services for example through a gRPC interface.

  • Local storage keeps a local copy of the group table partitions to speedup recovery and reduce memory utilization. By default, the local storage uses LevelDB, but in-memory map and Redis-based storage are also available.

Get Started

An example Goka application could look like the following. An emitter emits a single message with key "some-key" and value "some-value" into the "example-stream" topic. A processor processes the "example-stream" topic counting the number of messages delivered for "some-key". The counter is persisted in the "example-group-table" topic. To locally start a dockerized Zookeeper and Kafka instances, execute make start with the Makefile in the examples folder.

package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/lovoo/goka"
	"github.com/lovoo/goka/codec"
)

var (
	brokers             = []string{"localhost:9092"}
	topic   goka.Stream = "example-stream"
	group   goka.Group  = "example-group"
)

// Emit messages forever every second
func runEmitter() {
	emitter, err := goka.NewEmitter(brokers, topic, new(codec.String))
	if err != nil {
		log.Fatalf("error creating emitter: %v", err)
	}
	defer emitter.Finish()
	for {
		time.Sleep(1 * time.Second)
		err = emitter.EmitSync("some-key", "some-value")
		if err != nil {
			log.Fatalf("error emitting message: %v", err)
		}
	}
}

// process messages until ctrl-c is pressed
func runProcessor() {
	// process callback is invoked for each message delivered from
	// "example-stream" topic.
	cb := func(ctx goka.Context, msg interface{}) {
		var counter int64
		// ctx.Value() gets from the group table the value that is stored for
		// the message's key.
		if val := ctx.Value(); val != nil {
			counter = val.(int64)
		}
		counter++
		// SetValue stores the incremented counter in the group table for in
		// the message's key.
		ctx.SetValue(counter)
		log.Printf("key = %s, counter = %v, msg = %v", ctx.Key(), counter, msg)
	}

	// Define a new processor group. The group defines all inputs, outputs, and
	// serialization formats. The group-table topic is "example-group-table".
	g := goka.DefineGroup(group,
		goka.Input(topic, new(codec.String), cb),
		goka.Persist(new(codec.Int64)),
	)

	p, err := goka.NewProcessor(brokers, g)
	if err != nil {
		log.Fatalf("error creating processor: %v", err)
	}
	ctx, cancel := context.WithCancel(context.Background())
	done := make(chan bool)
	go func() {
		defer close(done)
		if err = p.Run(ctx); err != nil {
			log.Fatalf("error running processor: %v", err)
		} else {
			log.Printf("Processor shutdown cleanly")
		}
	}()

	wait := make(chan os.Signal, 1)
	signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
	<-wait   // wait for SIGINT/SIGTERM
	cancel() // gracefully stop processor
	<-done
}

func main() {
	go runEmitter() // emits one message every second forever
	runProcessor()  // press ctrl-c to stop
}

A very similar example is also in 1-simplest. Just run go run examples/1-simplest/main.go.

Note that tables have to be configured in Kafka with log compaction. For details check the Wiki.

How to contribute

Contributions are always welcome. Please fork the repo, create a pull request against master, and be sure tests pass. See the GitHub Flow for details.