Convert Figma logo to code with AI

ThreeDotsLabs logowatermill

Building event-driven applications the easy way in Go.

7,947
413
7,947
113

Top Related Projects

5,619

Golang client for NATS, the cloud native messaging system.

Confluent's Apache Kafka Golang client

11,712

Sarama is a Go library for Apache Kafka.

Kafka library in Go

4,886

Go client for AMQP 0.9.1

An AMQP 0-9-1 Go client maintained by the RabbitMQ team. Originally by @streadway: `streadway/amqp`

Quick Overview

Watermill is a Go library for building event-driven and message-driven applications. It provides a set of abstractions and tools for working with messages, events, and message brokers, making it easier to build scalable and resilient distributed systems.

Pros

  • Flexibility: Watermill supports multiple message brokers, including RabbitMQ, Kafka, and NATS, allowing developers to choose the best fit for their use case.
  • Scalability: Watermill is designed to handle high-volume message processing, making it suitable for building scalable event-driven applications.
  • Testability: Watermill provides a set of mocks and utilities that make it easier to test message-driven components in isolation.
  • Extensibility: Watermill's modular design allows developers to easily extend its functionality by creating custom message handlers, publishers, and subscribers.

Cons

  • Learning Curve: Watermill's flexibility and feature set can make it challenging for beginners to get started, especially if they are new to event-driven architecture.
  • Documentation: While the project has good documentation, some areas could be more comprehensive, particularly for advanced use cases.
  • Performance: Depending on the message broker and use case, Watermill's performance may not be as high as more specialized message queue solutions.
  • Maturity: Watermill is a relatively new project, and its ecosystem and community may not be as mature as some other Go libraries.

Code Examples

Here are a few code examples demonstrating how to use Watermill:

  1. Publishing a Message:
package main

import (
    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill/message"
)

func main() {
    publisher, err := message.NewPublisher(message.PublisherConfig{
        Brokers: []string{"localhost:9092"},
    })
    if err != nil {
        panic(err)
    }
    defer publisher.Close()

    msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, Watermill!"))
    if err := publisher.Publish("topic_name", msg); err != nil {
        panic(err)
    }
}

This code creates a new Kafka publisher and publishes a message to the "topic_name" topic.

  1. Subscribing to a Topic:
package main

import (
    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill/message"
)

func main() {
    subscriber, err := message.NewSubscriber(message.SubscriberConfig{
        Brokers: []string{"localhost:9092"},
    })
    if err != nil {
        panic(err)
    }
    defer subscriber.Close()

    messages, err := subscriber.Subscribe(context.Background(), "topic_name")
    if err != nil {
        panic(err)
    }

    for msg := range messages {
        // Process the message
        println(string(msg.Payload))
        msg.Ack()
    }
}

This code creates a new Kafka subscriber and subscribes to the "topic_name" topic, processing each message as it arrives.

  1. Handling Errors:
package main

import (
    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill/message"
)

func main() {
    subscriber, err := message.NewSubscriber(message.SubscriberConfig{
        Brokers: []string{"localhost:9092"},
    })
    if err != nil {
        panic(err)
    }
    defer subscriber.Close()

    messages, err := subscriber.Subscribe(context.Background(), "topic_name")
    if err != nil {
        panic(err)
    }

    for msg := range messages {
        // Process the message
        println(string(msg.Payload))

        if err := msg.Nack(); err != nil {
            // Handle the error
            println("Error processing message:", err)
        } else {
            msg.

Competitor Comparisons

5,619

Golang client for NATS, the cloud native messaging system.

Pros of nats.go

  • Specialized for NATS messaging system, offering deep integration and optimized performance
  • Lightweight and focused library, ideal for projects primarily using NATS
  • Extensive documentation and examples specific to NATS usage

Cons of nats.go

  • Limited to NATS messaging, lacking support for other message brokers or protocols
  • Requires more manual setup and configuration compared to Watermill's abstraction layer
  • Less flexibility in switching between different messaging systems without code changes

Code Comparison

nats.go:

nc, _ := nats.Connect(nats.DefaultURL)
nc.Publish("foo", []byte("Hello World"))
nc.Subscribe("foo", func(m *nats.Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
})

Watermill:

publisher, _ := nats.NewPublisher(nats.DefaultURL, watermill.NewStdLogger(false, false))
subscriber, _ := nats.NewSubscriber(nats.DefaultURL, watermill.NewStdLogger(false, false))
messages, _ := subscriber.Subscribe(context.Background(), "foo")
publisher.Publish("foo", message.NewMessage(watermill.NewUUID(), []byte("Hello World")))

Both libraries provide publish-subscribe functionality, but Watermill offers a more abstracted approach that can work with multiple message brokers, while nats.go is specifically tailored for NATS usage.

Confluent's Apache Kafka Golang client

Pros of confluent-kafka-go

  • Native Go implementation of the Kafka protocol, offering better performance
  • Comprehensive support for Confluent-specific features and extensions
  • Robust error handling and detailed logging capabilities

Cons of confluent-kafka-go

  • Steeper learning curve due to lower-level API
  • Limited to Kafka-specific messaging, less flexibility for other message brokers
  • Requires more boilerplate code for basic operations

Code Comparison

Watermill (Publishing a message):

publisher.Publish("topic", message.NewMessage(
    watermill.NewUUID(),
    []byte("Hello, Watermill!"),
))

confluent-kafka-go (Publishing a message):

producer.Produce(&kafka.Message{
    TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
    Value:          []byte("Hello, Kafka!"),
}, nil)

Watermill provides a more abstracted and unified API for various message brokers, while confluent-kafka-go offers a Kafka-specific, lower-level interface with more control over Kafka-specific features. Watermill is more suitable for projects requiring flexibility across different messaging systems, while confluent-kafka-go is ideal for Kafka-centric applications demanding high performance and advanced Kafka features.

11,712

Sarama is a Go library for Apache Kafka.

Pros of sarama

  • Specialized for Apache Kafka, offering deep integration and advanced features
  • High-performance and well-optimized for Kafka operations
  • Extensive community support and long-standing reputation in the Kafka ecosystem

Cons of sarama

  • Limited to Kafka, lacking support for other message brokers or pub/sub systems
  • Steeper learning curve due to its focus on Kafka-specific concepts and features
  • Requires more boilerplate code for basic operations compared to Watermill

Code Comparison

sarama:

producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, nil)
message := &sarama.ProducerMessage{Topic: "example", Value: sarama.StringEncoder("test")}
partition, offset, err := producer.SendMessage(message)

Watermill:

publisher, err := kafka.NewPublisher(kafkaPublisherConfig)
err = publisher.Publish("example", message.NewMessage(watermill.NewUUID(), []byte("test")))

Summary

While sarama excels in Kafka-specific scenarios with its performance and deep integration, Watermill offers a more versatile approach supporting multiple message brokers. sarama provides fine-grained control over Kafka operations but requires more setup, whereas Watermill aims for simplicity and abstraction across different messaging systems.

Kafka library in Go

Pros of kafka-go

  • Focused specifically on Kafka, providing deep integration and optimized performance
  • Lightweight and efficient, with minimal dependencies
  • Supports both producer and consumer implementations

Cons of kafka-go

  • Limited to Kafka messaging, lacking support for other message brokers
  • Requires more manual configuration and setup compared to Watermill
  • Less abstraction and higher-level features than Watermill's unified API

Code Comparison

kafka-go:

writer := kafka.NewWriter(kafka.WriterConfig{
    Brokers: []string{"localhost:9092"},
    Topic:   "example-topic",
})
err := writer.WriteMessages(context.Background(),
    kafka.Message{Value: []byte("Hello, Kafka!")},
)

Watermill:

publisher, err := kafka.NewPublisher(
    kafka.PublisherConfig{
        Brokers: []string{"localhost:9092"},
        Marshaler: kafka.DefaultMarshaler{},
    },
)
err = publisher.Publish("example-topic", message.NewMessage(
    watermill.NewUUID(), []byte("Hello, Watermill!"),
))

The code comparison shows that kafka-go requires more direct configuration, while Watermill provides a higher-level abstraction with its unified messaging interface. Watermill's approach allows for easier switching between different message brokers, whereas kafka-go is specifically tailored for Kafka operations.

4,886

Go client for AMQP 0.9.1

Pros of amqp

  • Focused specifically on AMQP protocol, providing a low-level, efficient implementation
  • Lightweight and minimal dependencies, suitable for projects requiring only AMQP functionality
  • Well-established and widely used in production environments

Cons of amqp

  • Limited to AMQP protocol, lacking support for other messaging systems
  • Requires more boilerplate code for advanced messaging patterns and error handling
  • Less abstraction, potentially leading to more complex code for large-scale applications

Code Comparison

amqp:

conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/")
ch, _ := conn.Channel()
_ = ch.Publish("exchange", "routing_key", false, false, amqp.Publishing{
    ContentType: "text/plain",
    Body:        []byte("Hello, World!"),
})

Watermill:

publisher, _ := amqp.NewPublisher(amqp.NewDurableQueueConfig("amqp://guest:guest@localhost:5672/"))
_ = publisher.Publish("topic", message.NewMessage(watermill.NewUUID(), []byte("Hello, World!")))

Watermill provides a higher-level abstraction, supporting multiple messaging systems with a unified API, while amqp offers a more direct, AMQP-specific implementation. Watermill's approach may be preferable for larger, more complex applications, while amqp might be suitable for simpler, AMQP-focused projects.

An AMQP 0-9-1 Go client maintained by the RabbitMQ team. Originally by @streadway: `streadway/amqp`

Pros of amqp091-go

  • Focused specifically on RabbitMQ, providing a more specialized and optimized implementation
  • Lightweight and low-level, offering fine-grained control over AMQP 0.9.1 operations
  • Maintained by the RabbitMQ team, ensuring compatibility and up-to-date features

Cons of amqp091-go

  • Limited to RabbitMQ and AMQP 0.9.1, lacking support for other message brokers or protocols
  • Requires more boilerplate code for common messaging patterns and error handling
  • Less abstraction and higher learning curve for developers new to message queuing

Code Comparison

amqp091-go:

conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/")
ch, _ := conn.Channel()
_ = ch.Publish("exchange", "routing_key", false, false, amqp.Publishing{Body: []byte("message")})

Watermill:

publisher, _ := amqp.NewPublisher(amqp.NewDial("amqp://guest:guest@localhost:5672/"), watermill.NewStdLogger(false, false))
_ = publisher.Publish("topic", message.NewMessage(watermill.NewUUID(), []byte("message")))

Watermill provides a higher-level abstraction, simplifying the publishing process and offering a more consistent API across different message brokers. amqp091-go offers more direct control over RabbitMQ-specific features but requires more setup code.

Convert Figma logo designs to code with AI

Visual Copilot

Introducing Visual Copilot: A new AI model to turn Figma designs to high quality code using your components.

Try Visual Copilot

README

Watermill

CI Status Go Reference Go Report Card codecov

Watermill is a Go library for working efficiently with message streams. It is intended for building event driven applications, enabling event sourcing, RPC over messages, sagas and basically whatever else comes to your mind. You can use conventional pub/sub implementations like Kafka or RabbitMQ, but also HTTP or MySQL binlog if that fits your use case.

Goals

  • Easy to understand.
  • Universal - event-driven architecture, messaging, stream processing, CQRS - use it for whatever you need.
  • Fast (see Benchmarks).
  • Flexible with middlewares, plugins and Pub/Sub configurations.
  • Resilient - using proven technologies and passing stress tests (see Stability).

Getting Started

Pick what you like the best or see in order:

  1. Follow the Getting Started guide.
  2. See examples below.
  3. Read the full documentation: https://watermill.io/

Our online hands-on training

Examples

Background

Building distributed and scalable services is rarely as easy as some may suggest. There is a lot of hidden knowledge that comes with writing such systems. Just like you don't need to know the whole TCP stack to create a HTTP REST server, you shouldn't need to study all of this knowledge to start with building message-driven applications.

Watermill's goal is to make communication with messages as easy to use as HTTP routers. It provides the tools needed to begin working with event-driven architecture and allows you to learn the details on the go.

At the heart of Watermill there is one simple interface:

func(*Message) ([]*Message, error)

Your handler receives a message and decides whether to publish new message(s) or return an error. What happens next is up to the middlewares you've chosen.

You can find more about our motivations in our Introducing Watermill blog post.

Pub/Subs

All publishers and subscribers have to implement an interface:

type Publisher interface {
	Publish(topic string, messages ...*Message) error
	Close() error
}

type Subscriber interface {
	Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
	Close() error
}

Supported Pub/Subs:

All Pub/Subs implementation documentation can be found in the documentation.

Unofficial libraries

Can't find your favorite Pub/Sub or library integration? Check Awesome Watermill.

If you know another library or are an author of one, please add it to the list.

Contributing

Please check our contributing guide.

Stability

Watermill v1.0.0 has been released and is production-ready. The public API is stable and will not change without changing the major version.

To ensure that all Pub/Subs are stable and safe to use in production, we created a set of tests that need to pass for each of the implementations before merging to master. All tests are also executed in stress mode - that means that we are running all the tests 20x in parallel.

All tests are run with the race condition detector enabled (-race flag in tests).

For more information about debugging tests, you should check tests troubleshooting guide.

Benchmarks

Initial tools for benchmarking Pub/Subs can be found in watermill-benchmark.

All benchmarks are being done on a single 16 CPU VM instance, running one binary and dependencies in Docker Compose.

These numbers are meant to serve as a rough estimate of how fast messages can be processed by different Pub/Subs. Keep in mind that the results can be vastly different, depending on the setup and configuration (both much lower and higher).

Here's the short version for message size of 16 bytes.

Pub/SubPublish (messages / s)Subscribe (messages / s)
GoChannel315,776138,743
Redis Streams59,15812,134
NATS Jetstream (16 Subscribers)50,66834,713
Kafka (one node)41,492101,669
SQL (MySQL, batch size=100)6,3712,794
SQL (PostgreSQL, batch size=1)2,8319,460
Google Cloud Pub/Sub3,02728,589
AMQP (RabbitMQ)2,77014,604

Support

If you didn't find the answer to your question in the documentation, feel free to ask us directly!

Please join us on the #watermill channel on the Three Dots Labs Discord.

Why the name?

It processes streams!

License

MIT License