Convert Figma logo to code with AI

apache logokafka

Mirror of Apache Kafka

28,601
13,892
28,601
1,089

Top Related Projects

Open source RabbitMQ: core server and tier 1 (built-in) plugins

14,185

Apache Pulsar - distributed pub-sub messaging system

21,362

Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.

Mirror of Apache ActiveMQ

23,929

Apache Flink

Quick Overview

Apache Kafka is a distributed event streaming platform capable of handling trillions of events a day. Initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log. It's designed for high-throughput, fault-tolerant, and scalable data streaming applications.

Pros

  • High throughput and low latency for real-time data streaming
  • Scalable and fault-tolerant architecture
  • Supports stream processing with exactly-once semantics
  • Integrates well with big data ecosystems (Hadoop, Spark, Flink, etc.)

Cons

  • Complex setup and configuration for optimal performance
  • Requires careful capacity planning and monitoring
  • Potential for data loss if not configured properly
  • Steep learning curve for advanced features and operations

Code Examples

  1. Producing messages:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my-topic", "key", "value"));
producer.close();
  1. Consuming messages:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}
  1. Creating a topic:
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

try (Admin admin = Admin.create(props)) {
    NewTopic newTopic = new NewTopic("my-new-topic", 1, (short) 1);
    admin.createTopics(Collections.singleton(newTopic)).all().get();
}

Getting Started

  1. Download and extract Kafka:

    wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
    tar -xzf kafka_2.13-3.4.0.tgz
    cd kafka_2.13-3.4.0
    
  2. Start ZooKeeper and Kafka server:

    bin/zookeeper-server-start.sh config/zookeeper.properties &
    bin/kafka-server-start.sh config/server.properties &
    
  3. Create a topic:

    bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
    
  4. Write and read events:

    bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
    bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
    

Competitor Comparisons

Open source RabbitMQ: core server and tier 1 (built-in) plugins

Pros of RabbitMQ

  • Easier to set up and manage for smaller deployments
  • Supports multiple messaging protocols (AMQP, MQTT, STOMP)
  • More flexible routing options with exchanges and bindings

Cons of RabbitMQ

  • Lower throughput compared to Kafka, especially for high-volume scenarios
  • Less scalable for large-scale distributed systems
  • Limited built-in stream processing capabilities

Code Comparison

RabbitMQ (Erlang):

basic_publish(Channel, <<"my_exchange">>, <<"routing_key">>, <<"Hello, World!">>),
{#'basic.get_ok'{}, Content} = basic_get(Channel, <<"my_queue">>, no_ack),
io:format("Received: ~p~n", [Content]).

Kafka (Java):

producer.send(new ProducerRecord<>("my-topic", "Hello, World!"));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
    System.out.println("Received: " + record.value());
}

Both RabbitMQ and Kafka are popular message brokers, but they have different strengths. RabbitMQ is more suitable for complex routing scenarios and smaller deployments, while Kafka excels in high-throughput, large-scale distributed systems with built-in stream processing capabilities. The choice between them depends on specific use cases and requirements.

14,185

Apache Pulsar - distributed pub-sub messaging system

Pros of Pulsar

  • Multi-tenancy support with better isolation between tenants
  • Built-in support for multiple storage tiers (hot/warm/cold)
  • Native geo-replication capabilities

Cons of Pulsar

  • Steeper learning curve due to more complex architecture
  • Smaller community and ecosystem compared to Kafka
  • Higher resource requirements for deployment

Code Comparison

Kafka producer example:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);

Pulsar producer example:

PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .build();
Producer<byte[]> producer = client.newProducer()
    .topic("my-topic")
    .create();

Both Kafka and Pulsar offer robust messaging capabilities, but they differ in architecture and features. Kafka is known for its simplicity and high throughput, while Pulsar provides more advanced features at the cost of increased complexity. The choice between them depends on specific use cases and requirements.

21,362

Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.

Pros of RocketMQ

  • Simpler architecture and easier to deploy, especially for smaller-scale applications
  • Built-in support for message tracing and message filtering
  • More flexible message consumption models, including push and pull

Cons of RocketMQ

  • Less mature ecosystem and community support compared to Kafka
  • Lower throughput in high-volume scenarios
  • Limited language client support beyond Java

Code Comparison

RocketMQ producer example:

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);

Kafka producer example:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("topic", "Hello Kafka"));

Both RocketMQ and Kafka are distributed messaging and streaming platforms, but they have different strengths and use cases. RocketMQ is often preferred for its simplicity and built-in features, while Kafka is known for its high throughput and scalability in large-scale distributed systems.

Mirror of Apache ActiveMQ

Pros of ActiveMQ

  • Supports multiple protocols (AMQP, MQTT, STOMP) out of the box
  • Easier to set up and manage for smaller-scale deployments
  • Offers both persistent and non-persistent message delivery

Cons of ActiveMQ

  • Lower throughput and scalability compared to Kafka
  • Less suitable for big data and real-time streaming scenarios
  • More complex configuration for high availability setups

Code Comparison

ActiveMQ (Java):

ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("myQueue");
MessageProducer producer = session.createProducer(destination);

Kafka (Java):

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);

Both ActiveMQ and Kafka are popular message brokers, but they have different strengths. ActiveMQ is more versatile in terms of protocol support and easier to set up for smaller projects. Kafka, on the other hand, excels in high-throughput scenarios and is better suited for big data applications and real-time streaming.

23,929

Apache Flink

Pros of Flink

  • Supports both batch and stream processing in a unified framework
  • Offers lower latency and higher throughput for real-time data processing
  • Provides built-in support for event time processing and out-of-order events

Cons of Flink

  • Steeper learning curve due to more complex API and concepts
  • Smaller community and ecosystem compared to Kafka
  • Less mature and battle-tested in production environments

Code Comparison

Flink (Stream Processing):

DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));
stream.map(s -> s.toUpperCase())
      .filter(s -> s.startsWith("A"))
      .addSink(new FlinkKafkaProducer<>("output-topic", new SimpleStringSchema(), properties));

Kafka Streams:

KStream<String, String> stream = builder.stream("topic");
stream.mapValues(String::toUpperCase)
      .filter((key, value) -> value.startsWith("A"))
      .to("output-topic");

Both examples demonstrate processing a stream of data, but Flink's API is more flexible for complex stream processing tasks, while Kafka Streams is more tightly integrated with Kafka itself.

Convert Figma logo designs to code with AI

Visual Copilot

Introducing Visual Copilot: A new AI model to turn Figma designs to high quality code using your components.

Try Visual Copilot

README

Apache Kafka

CI Flaky Test Report

See our web site for details on the project.

You need to have Java installed.

We build and test Apache Kafka with 17 and 23. The release parameter in javac and scalac is set to 11 for the clients and streams modules, and 17 for the broker and tools, ensuring compatibility with their respective minimum Java versions.

Scala 2.13 is the only supported version in Apache Kafka.

Build a jar and run it

./gradlew jar

Follow instructions in https://kafka.apache.org/quickstart

Build source jar

./gradlew srcJar

Build aggregated javadoc

./gradlew aggregatedJavadoc

Build javadoc and scaladoc

./gradlew javadoc
./gradlew javadocJar # builds a javadoc jar for each module
./gradlew scaladoc
./gradlew scaladocJar # builds a scaladoc jar for each module
./gradlew docsJar # builds both (if applicable) javadoc and scaladoc jars for each module

Run unit/integration tests

./gradlew test  # runs both unit and integration tests
./gradlew unitTest
./gradlew integrationTest
./gradlew quarantinedTest  # runs the quarantined tests

Force re-running tests without code change

./gradlew test --rerun-tasks
./gradlew unitTest --rerun-tasks
./gradlew integrationTest --rerun-tasks

Running a particular unit/integration test

./gradlew clients:test --tests RequestResponseTest

Repeatedly running a particular unit/integration test with specific times by setting N

N=500; I=0; while [ $I -lt $N ] && ./gradlew clients:test --tests RequestResponseTest --rerun --fail-fast; do (( I=$I+1 )); echo "Completed run: $I"; sleep 1; done

Running a particular test method within a unit/integration test

./gradlew core:test --tests kafka.api.ProducerFailureHandlingTest.testCannotSendToInternalTopic
./gradlew clients:test --tests org.apache.kafka.clients.MetadataTest.testTimeToNextUpdate

Running a particular unit/integration test with log4j output

By default, there will be only small number of logs output while testing. You can adjust it by changing the log4j2.yml file in the module's src/test/resources directory.

For example, if you want to see more logs for clients project tests, you can modify the line in clients/src/test/resources/log4j2.yml to level: INFO and then run:

./gradlew cleanTest clients:test --tests NetworkClientTest   

And you should see INFO level logs in the file under the clients/build/test-results/test directory.

Specifying test retries

Retries are disabled by default, but you can set maxTestRetryFailures and maxTestRetries to enable retries.

The following example declares -PmaxTestRetries=1 and -PmaxTestRetryFailures=3 to enable a failed test to be retried once, with a total retry limit of 3.

./gradlew test -PmaxTestRetries=1 -PmaxTestRetryFailures=3

The quarantinedTest task also has no retries by default, but you can set maxQuarantineTestRetries and maxQuarantineTestRetryFailures to enable retries, similar to the test task.

./gradlew quarantinedTest -PmaxQuarantineTestRetries=3 -PmaxQuarantineTestRetryFailures=20

See Test Retry Gradle Plugin for and build.yml more details.

Generating test coverage reports

Generate coverage reports for the whole project:

./gradlew reportCoverage -PenableTestCoverage=true -Dorg.gradle.parallel=false

Generate coverage for a single module, i.e.:

./gradlew clients:reportCoverage -PenableTestCoverage=true -Dorg.gradle.parallel=false

Building a binary release gzipped tar ball

./gradlew clean releaseTarGz

The release file can be found inside ./core/build/distributions/.

Building auto generated messages

Sometimes it is only necessary to rebuild the RPC auto-generated message data when switching between branches, as they could fail due to code changes. You can just run:

./gradlew processMessages processTestMessages

Running a Kafka broker in KRaft mode

Using compiled files:

KAFKA_CLUSTER_ID="$(./bin/kafka-storage.sh random-uuid)"
./bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/reconfig-server.properties
./bin/kafka-server-start.sh config/kraft/reconfig-server.properties

Using docker image:

docker run -p 9092:9092 apache/kafka:3.7.0

Cleaning the build

./gradlew clean

Running a task for a specific project

This is for core, examples and clients

./gradlew core:jar
./gradlew core:test

Streams has multiple sub-projects, but you can run all the tests:

./gradlew :streams:testAll

Listing all gradle tasks

./gradlew tasks

Building IDE project

Note Please ensure that JDK17 is used when developing Kafka.

Note that this is not strictly necessary (IntelliJ IDEA has good built-in support for Gradle projects, for example).

./gradlew eclipse
./gradlew idea

The eclipse task has been configured to use ${project_dir}/build_eclipse as Eclipse's build directory. Eclipse's default build directory (${project_dir}/bin) clashes with Kafka's scripts directory and we don't use Gradle's build directory to avoid known issues with this configuration.

IntelliJ Language Level awareness:

IntelliJ will automatically check Java syntax and compatibility for each module, even if the Java version is not explicitly set in the Structure > Project Settings > Modules.

Publishing the streams quickstart archetype artifact to maven

For the Streams archetype project, one cannot use gradle to upload to maven; instead the mvn deploy command needs to be called at the quickstart folder:

cd streams/quickstart
mvn deploy

Please note for this to work you should create/update user maven settings (typically, ${USER_HOME}/.m2/settings.xml) to assign the following variables

<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0
                       https://maven.apache.org/xsd/settings-1.0.0.xsd">
...                           
<servers>
   ...
   <server>
      <id>apache.snapshots.https</id>
      <username>${maven_username}</username>
      <password>${maven_password}</password>
   </server>
   <server>
      <id>apache.releases.https</id>
      <username>${maven_username}</username>
      <password>${maven_password}</password>
    </server>
    ...
 </servers>
 ...

Installing specific projects to the local Maven repository

./gradlew -PskipSigning=true :streams:publishToMavenLocal

Building the test jar

./gradlew testJar

Running code quality checks

There are two code quality analysis tools that we regularly run, spotbugs and checkstyle.

Checkstyle

Checkstyle enforces a consistent coding style in Kafka. You can run checkstyle using:

./gradlew checkstyleMain checkstyleTest spotlessCheck

The checkstyle warnings will be found in reports/checkstyle/reports/main.html and reports/checkstyle/reports/test.html files in the subproject build directories. They are also printed to the console. The build will fail if Checkstyle fails. For experiments (or regression testing purposes) add -PcheckstyleVersion=X.y.z switch (to override project-defined checkstyle version).

Spotless

The import order is a part of static check. please call spotlessApply to optimize the imports of Java codes before filing pull request.

./gradlew spotlessApply

Spotbugs

Spotbugs uses static analysis to look for bugs in the code. You can run spotbugs using:

./gradlew spotbugsMain spotbugsTest -x test

The spotbugs warnings will be found in reports/spotbugs/main.html and reports/spotbugs/test.html files in the subproject build directories. Use -PxmlSpotBugsReport=true to generate an XML report instead of an HTML one.

JMH microbenchmarks

We use JMH to write microbenchmarks that produce reliable results in the JVM.

See jmh-benchmarks/README.md for details on how to run the microbenchmarks.

Dependency Analysis

The gradle dependency debugging documentation mentions using the dependencies or dependencyInsight tasks to debug dependencies for the root project or individual subprojects.

Alternatively, use the allDeps or allDepInsight tasks for recursively iterating through all subprojects:

./gradlew allDeps

./gradlew allDepInsight --configuration runtimeClasspath --dependency com.fasterxml.jackson.core:jackson-databind

These take the same arguments as the builtin variants.

Determining if any dependencies could be updated

./gradlew dependencyUpdates

Common build options

The following options should be set with a -P switch, for example ./gradlew -PmaxParallelForks=1 test.

  • commitId: sets the build commit ID as .git/HEAD might not be correct if there are local commits added for build purposes.
  • mavenUrl: sets the URL of the maven deployment repository (file://path/to/repo can be used to point to a local repository).
  • maxParallelForks: maximum number of test processes to start in parallel. Defaults to the number of processors available to the JVM.
  • maxScalacThreads: maximum number of worker threads for the scalac backend. Defaults to the lowest of 8 and the number of processors available to the JVM. The value must be between 1 and 16 (inclusive).
  • ignoreFailures: ignore test failures from junit
  • showStandardStreams: shows standard out and standard error of the test JVM(s) on the console.
  • skipSigning: skips signing of artifacts.
  • testLoggingEvents: unit test events to be logged, separated by comma. For example ./gradlew -PtestLoggingEvents=started,passed,skipped,failed test.
  • xmlSpotBugsReport: enable XML reports for spotBugs. This also disables HTML reports as only one can be enabled at a time.
  • maxTestRetries: maximum number of retries for a failing test case.
  • maxTestRetryFailures: maximum number of test failures before retrying is disabled for subsequent tests.
  • enableTestCoverage: enables test coverage plugins and tasks, including bytecode enhancement of classes required to track said coverage. Note that this introduces some overhead when running tests and hence why it's disabled by default (the overhead varies, but 15-20% is a reasonable estimate).
  • keepAliveMode: configures the keep alive mode for the Gradle compilation daemon - reuse improves start-up time. The values should be one of daemon or session (the default is daemon). daemon keeps the daemon alive until it's explicitly stopped while session keeps it alive until the end of the build session. This currently only affects the Scala compiler, see https://github.com/gradle/gradle/pull/21034 for a PR that attempts to do the same for the Java compiler.
  • scalaOptimizerMode: configures the optimizing behavior of the scala compiler, the value should be one of none, method, inline-kafka or inline-scala (the default is inline-kafka). none is the scala compiler default, which only eliminates unreachable code. method also includes method-local optimizations. inline-kafka adds inlining of methods within the kafka packages. Finally, inline-scala also includes inlining of methods within the scala library (which avoids lambda allocations for methods like Option.exists). inline-scala is only safe if the Scala library version is the same at compile time and runtime. Since we cannot guarantee this for all cases (for example, users may depend on the kafka jar for integration tests where they may include a scala library with a different version), we don't enable it by default. See https://www.lightbend.com/blog/scala-inliner-optimizer for more details.

Running system tests

See tests/README.md.

Running in Vagrant

See vagrant/README.md.

Contribution

Apache Kafka is interested in building the community; we would welcome any thoughts or patches. You can reach us on the Apache mailing lists.

To contribute follow the instructions here: