Top Related Projects
Open source RabbitMQ: core server and tier 1 (built-in) plugins
Apache Pulsar - distributed pub-sub messaging system
Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.
Mirror of Apache ActiveMQ
Apache Flink
Quick Overview
Apache Kafka is a distributed event streaming platform capable of handling trillions of events a day. Initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log. It's designed for high-throughput, fault-tolerant, and scalable data streaming applications.
Pros
- High throughput and low latency for real-time data streaming
- Scalable and fault-tolerant architecture
- Supports stream processing with exactly-once semantics
- Integrates well with big data ecosystems (Hadoop, Spark, Flink, etc.)
Cons
- Complex setup and configuration for optimal performance
- Requires careful capacity planning and monitoring
- Potential for data loss if not configured properly
- Steep learning curve for advanced features and operations
Code Examples
- Producing messages:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my-topic", "key", "value"));
producer.close();
- Consuming messages:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
- Creating a topic:
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (Admin admin = Admin.create(props)) {
NewTopic newTopic = new NewTopic("my-new-topic", 1, (short) 1);
admin.createTopics(Collections.singleton(newTopic)).all().get();
}
Getting Started
-
Download and extract Kafka:
wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz tar -xzf kafka_2.13-3.4.0.tgz cd kafka_2.13-3.4.0
-
Start ZooKeeper and Kafka server:
bin/zookeeper-server-start.sh config/zookeeper.properties & bin/kafka-server-start.sh config/server.properties &
-
Create a topic:
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
-
Write and read events:
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
Competitor Comparisons
Open source RabbitMQ: core server and tier 1 (built-in) plugins
Pros of RabbitMQ
- Easier to set up and manage for smaller deployments
- Supports multiple messaging protocols (AMQP, MQTT, STOMP)
- More flexible routing options with exchanges and bindings
Cons of RabbitMQ
- Lower throughput compared to Kafka, especially for high-volume scenarios
- Less scalable for large-scale distributed systems
- Limited built-in stream processing capabilities
Code Comparison
RabbitMQ (Erlang):
basic_publish(Channel, <<"my_exchange">>, <<"routing_key">>, <<"Hello, World!">>),
{#'basic.get_ok'{}, Content} = basic_get(Channel, <<"my_queue">>, no_ack),
io:format("Received: ~p~n", [Content]).
Kafka (Java):
producer.send(new ProducerRecord<>("my-topic", "Hello, World!"));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received: " + record.value());
}
Both RabbitMQ and Kafka are popular message brokers, but they have different strengths. RabbitMQ is more suitable for complex routing scenarios and smaller deployments, while Kafka excels in high-throughput, large-scale distributed systems with built-in stream processing capabilities. The choice between them depends on specific use cases and requirements.
Apache Pulsar - distributed pub-sub messaging system
Pros of Pulsar
- Multi-tenancy support with better isolation between tenants
- Built-in support for multiple storage tiers (hot/warm/cold)
- Native geo-replication capabilities
Cons of Pulsar
- Steeper learning curve due to more complex architecture
- Smaller community and ecosystem compared to Kafka
- Higher resource requirements for deployment
Code Comparison
Kafka producer example:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
Pulsar producer example:
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer<byte[]> producer = client.newProducer()
.topic("my-topic")
.create();
Both Kafka and Pulsar offer robust messaging capabilities, but they differ in architecture and features. Kafka is known for its simplicity and high throughput, while Pulsar provides more advanced features at the cost of increased complexity. The choice between them depends on specific use cases and requirements.
Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.
Pros of RocketMQ
- Simpler architecture and easier to deploy, especially for smaller-scale applications
- Built-in support for message tracing and message filtering
- More flexible message consumption models, including push and pull
Cons of RocketMQ
- Less mature ecosystem and community support compared to Kafka
- Lower throughput in high-volume scenarios
- Limited language client support beyond Java
Code Comparison
RocketMQ producer example:
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
Kafka producer example:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("topic", "Hello Kafka"));
Both RocketMQ and Kafka are distributed messaging and streaming platforms, but they have different strengths and use cases. RocketMQ is often preferred for its simplicity and built-in features, while Kafka is known for its high throughput and scalability in large-scale distributed systems.
Mirror of Apache ActiveMQ
Pros of ActiveMQ
- Supports multiple protocols (AMQP, MQTT, STOMP) out of the box
- Easier to set up and manage for smaller-scale deployments
- Offers both persistent and non-persistent message delivery
Cons of ActiveMQ
- Lower throughput and scalability compared to Kafka
- Less suitable for big data and real-time streaming scenarios
- More complex configuration for high availability setups
Code Comparison
ActiveMQ (Java):
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("myQueue");
MessageProducer producer = session.createProducer(destination);
Kafka (Java):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
Both ActiveMQ and Kafka are popular message brokers, but they have different strengths. ActiveMQ is more versatile in terms of protocol support and easier to set up for smaller projects. Kafka, on the other hand, excels in high-throughput scenarios and is better suited for big data applications and real-time streaming.
Apache Flink
Pros of Flink
- Supports both batch and stream processing in a unified framework
- Offers lower latency and higher throughput for real-time data processing
- Provides built-in support for event time processing and out-of-order events
Cons of Flink
- Steeper learning curve due to more complex API and concepts
- Smaller community and ecosystem compared to Kafka
- Less mature and battle-tested in production environments
Code Comparison
Flink (Stream Processing):
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));
stream.map(s -> s.toUpperCase())
.filter(s -> s.startsWith("A"))
.addSink(new FlinkKafkaProducer<>("output-topic", new SimpleStringSchema(), properties));
Kafka Streams:
KStream<String, String> stream = builder.stream("topic");
stream.mapValues(String::toUpperCase)
.filter((key, value) -> value.startsWith("A"))
.to("output-topic");
Both examples demonstrate processing a stream of data, but Flink's API is more flexible for complex stream processing tasks, while Kafka Streams is more tightly integrated with Kafka itself.
Convert designs to code with AI
Introducing Visual Copilot: A new AI model to turn Figma designs to high quality code using your components.
Try Visual CopilotREADME
Apache Kafka
See our web site for details on the project.
You need to have Java installed.
We build and test Apache Kafka with Java 8, 11, 17 and 21. We set the release
parameter in javac and scalac
to 8
to ensure the generated binaries are compatible with Java 8 or higher (independently of the Java version
used for compilation). Java 8 support project-wide has been deprecated since Apache Kafka 3.0, Java 11 support for
the broker and tools has been deprecated since Apache Kafka 3.7 and removal of both is planned for Apache Kafka 4.0 (
see KIP-750 and
KIP-1013 for more details).
Scala 2.12 and 2.13 are supported and 2.13 is used by default. Scala 2.12 support has been deprecated since Apache Kafka 3.0 and will be removed in Apache Kafka 4.0 (see KIP-751 for more details). See below for how to use a specific Scala version or all of the supported Scala versions.
Build a jar and run it
./gradlew jar
Follow instructions in https://kafka.apache.org/quickstart
Build source jar
./gradlew srcJar
Build aggregated javadoc
./gradlew aggregatedJavadoc
Build javadoc and scaladoc
./gradlew javadoc
./gradlew javadocJar # builds a javadoc jar for each module
./gradlew scaladoc
./gradlew scaladocJar # builds a scaladoc jar for each module
./gradlew docsJar # builds both (if applicable) javadoc and scaladoc jars for each module
Run unit/integration tests
./gradlew test # runs both unit and integration tests
./gradlew unitTest
./gradlew integrationTest
Force re-running tests without code change
./gradlew test --rerun
./gradlew unitTest --rerun
./gradlew integrationTest --rerun
Running a particular unit/integration test
./gradlew clients:test --tests RequestResponseTest
Repeatedly running a particular unit/integration test
I=0; while ./gradlew clients:test --tests RequestResponseTest --rerun --fail-fast; do (( I=$I+1 )); echo "Completed run: $I"; sleep 1; done
Running a particular test method within a unit/integration test
./gradlew core:test --tests kafka.api.ProducerFailureHandlingTest.testCannotSendToInternalTopic
./gradlew clients:test --tests org.apache.kafka.clients.MetadataTest.testTimeToNextUpdate
Running a particular unit/integration test with log4j output
By default, there will be only small number of logs output while testing. You can adjust it by changing the log4j.properties
file in the module's src/test/resources
directory.
For example, if you want to see more logs for clients project tests, you can modify the line in clients/src/test/resources/log4j.properties
to log4j.logger.org.apache.kafka=INFO
and then run:
./gradlew cleanTest clients:test --tests NetworkClientTest
And you should see INFO
level logs in the file under the clients/build/test-results/test
directory.
Specifying test retries
By default, each failed test is retried once up to a maximum of five retries per test run. Tests are retried at the end of the test task. Adjust these parameters in the following way:
./gradlew test -PmaxTestRetries=1 -PmaxTestRetryFailures=5
See Test Retry Gradle Plugin for more details.
Generating test coverage reports
Generate coverage reports for the whole project:
./gradlew reportCoverage -PenableTestCoverage=true -Dorg.gradle.parallel=false
Generate coverage for a single module, i.e.:
./gradlew clients:reportCoverage -PenableTestCoverage=true -Dorg.gradle.parallel=false
Building a binary release gzipped tar ball
./gradlew clean releaseTarGz
The release file can be found inside ./core/build/distributions/
.
Building auto generated messages
Sometimes it is only necessary to rebuild the RPC auto-generated message data when switching between branches, as they could fail due to code changes. You can just run:
./gradlew processMessages processTestMessages
Running a Kafka broker in KRaft mode
Using compiled files:
KAFKA_CLUSTER_ID="$(./bin/kafka-storage.sh random-uuid)"
./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
./bin/kafka-server-start.sh config/kraft/server.properties
Using docker image:
docker run -p 9092:9092 apache/kafka:3.7.0
Running a Kafka broker in ZooKeeper mode
Using compiled files:
./bin/zookeeper-server-start.sh config/zookeeper.properties
./bin/kafka-server-start.sh config/server.properties
Since ZooKeeper mode is already deprecated and planned to be removed in Apache Kafka 4.0, the docker image only supports running in KRaft mode
Cleaning the build
./gradlew clean
Running a task with one of the Scala versions available (2.12.x or 2.13.x)
Note that if building the jars with a version other than 2.13.x, you need to set the SCALA_VERSION
variable or change it in bin/kafka-run-class.sh
to run the quick start.
You can pass either the major version (eg 2.12) or the full version (eg 2.12.7):
./gradlew -PscalaVersion=2.12 jar
./gradlew -PscalaVersion=2.12 test
./gradlew -PscalaVersion=2.12 releaseTarGz
Running a task with all the scala versions enabled by default
Invoke the gradlewAll
script followed by the task(s):
./gradlewAll test
./gradlewAll jar
./gradlewAll releaseTarGz
Running a task for a specific project
This is for core
, examples
and clients
./gradlew core:jar
./gradlew core:test
Streams has multiple sub-projects, but you can run all the tests:
./gradlew :streams:testAll
Listing all gradle tasks
./gradlew tasks
Building IDE project
Note that this is not strictly necessary (IntelliJ IDEA has good built-in support for Gradle projects, for example).
./gradlew eclipse
./gradlew idea
The eclipse
task has been configured to use ${project_dir}/build_eclipse
as Eclipse's build directory. Eclipse's default
build directory (${project_dir}/bin
) clashes with Kafka's scripts directory and we don't use Gradle's build directory
to avoid known issues with this configuration.
Publishing the jar for all versions of Scala and for all projects to maven
The recommended command is:
./gradlewAll publish
For backwards compatibility, the following also works:
./gradlewAll uploadArchives
Please note for this to work you should create/update ${GRADLE_USER_HOME}/gradle.properties
(typically, ~/.gradle/gradle.properties
) and assign the following variables
mavenUrl=
mavenUsername=
mavenPassword=
signing.keyId=
signing.password=
signing.secretKeyRingFile=
Publishing the streams quickstart archetype artifact to maven
For the Streams archetype project, one cannot use gradle to upload to maven; instead the mvn deploy
command needs to be called at the quickstart folder:
cd streams/quickstart
mvn deploy
Please note for this to work you should create/update user maven settings (typically, ${USER_HOME}/.m2/settings.xml
) to assign the following variables
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0
https://maven.apache.org/xsd/settings-1.0.0.xsd">
...
<servers>
...
<server>
<id>apache.snapshots.https</id>
<username>${maven_username}</username>
<password>${maven_password}</password>
</server>
<server>
<id>apache.releases.https</id>
<username>${maven_username}</username>
<password>${maven_password}</password>
</server>
...
</servers>
...
Installing ALL the jars to the local Maven repository
The recommended command to build for both Scala 2.12 and 2.13 is:
./gradlewAll publishToMavenLocal
For backwards compatibility, the following also works:
./gradlewAll install
Installing specific projects to the local Maven repository
./gradlew -PskipSigning=true :streams:publishToMavenLocal
If needed, you can specify the Scala version with -PscalaVersion=2.13
.
Building the test jar
./gradlew testJar
Running code quality checks
There are two code quality analysis tools that we regularly run, spotbugs and checkstyle.
Checkstyle
Checkstyle enforces a consistent coding style in Kafka. You can run checkstyle using:
./gradlew checkstyleMain checkstyleTest spotlessCheck
The checkstyle warnings will be found in reports/checkstyle/reports/main.html
and reports/checkstyle/reports/test.html
files in the
subproject build directories. They are also printed to the console. The build will fail if Checkstyle fails.
Please note that ./gradlew spotlessCheck
currently has an issue with Java 21 (see https://github.com/diffplug/spotless/pull/1920), so make sure to run this with JDK 11 or 17
Spotless
The import order is a part of static check. please call spotlessApply
(require JDK 11+) to optimize the imports of Java codes before filing pull request.
./gradlew spotlessApply
Please note that ./gradlew spotlessApply
currently has an issue with Java 21 (see https://github.com/diffplug/spotless/pull/1920), so make sure to run this with JDK 11 or 17
Spotbugs
Spotbugs uses static analysis to look for bugs in the code. You can run spotbugs using:
./gradlew spotbugsMain spotbugsTest -x test
The spotbugs warnings will be found in reports/spotbugs/main.html
and reports/spotbugs/test.html
files in the subproject build
directories. Use -PxmlSpotBugsReport=true to generate an XML report instead of an HTML one.
JMH microbenchmarks
We use JMH to write microbenchmarks that produce reliable results in the JVM.
See jmh-benchmarks/README.md for details on how to run the microbenchmarks.
Dependency Analysis
The gradle dependency debugging documentation mentions using the dependencies
or dependencyInsight
tasks to debug dependencies for the root project or individual subprojects.
Alternatively, use the allDeps
or allDepInsight
tasks for recursively iterating through all subprojects:
./gradlew allDeps
./gradlew allDepInsight --configuration runtimeClasspath --dependency com.fasterxml.jackson.core:jackson-databind
These take the same arguments as the builtin variants.
Determining if any dependencies could be updated
./gradlew dependencyUpdates
Common build options
The following options should be set with a -P
switch, for example ./gradlew -PmaxParallelForks=1 test
.
commitId
: sets the build commit ID as .git/HEAD might not be correct if there are local commits added for build purposes.mavenUrl
: sets the URL of the maven deployment repository (file://path/to/repo
can be used to point to a local repository).maxParallelForks
: maximum number of test processes to start in parallel. Defaults to the number of processors available to the JVM.maxScalacThreads
: maximum number of worker threads for the scalac backend. Defaults to the lowest of8
and the number of processors available to the JVM. The value must be between 1 and 16 (inclusive).ignoreFailures
: ignore test failures from junitshowStandardStreams
: shows standard out and standard error of the test JVM(s) on the console.skipSigning
: skips signing of artifacts.testLoggingEvents
: unit test events to be logged, separated by comma. For example./gradlew -PtestLoggingEvents=started,passed,skipped,failed test
.xmlSpotBugsReport
: enable XML reports for spotBugs. This also disables HTML reports as only one can be enabled at a time.maxTestRetries
: maximum number of retries for a failing test case.maxTestRetryFailures
: maximum number of test failures before retrying is disabled for subsequent tests.enableTestCoverage
: enables test coverage plugins and tasks, including bytecode enhancement of classes required to track said coverage. Note that this introduces some overhead when running tests and hence why it's disabled by default (the overhead varies, but 15-20% is a reasonable estimate).keepAliveMode
: configures the keep alive mode for the Gradle compilation daemon - reuse improves start-up time. The values should be one ofdaemon
orsession
(the default isdaemon
).daemon
keeps the daemon alive until it's explicitly stopped whilesession
keeps it alive until the end of the build session. This currently only affects the Scala compiler, see https://github.com/gradle/gradle/pull/21034 for a PR that attempts to do the same for the Java compiler.scalaOptimizerMode
: configures the optimizing behavior of the scala compiler, the value should be one ofnone
,method
,inline-kafka
orinline-scala
(the default isinline-kafka
).none
is the scala compiler default, which only eliminates unreachable code.method
also includes method-local optimizations.inline-kafka
adds inlining of methods within the kafka packages. Finally,inline-scala
also includes inlining of methods within the scala library (which avoids lambda allocations for methods likeOption.exists
).inline-scala
is only safe if the Scala library version is the same at compile time and runtime. Since we cannot guarantee this for all cases (for example, users may depend on the kafka jar for integration tests where they may include a scala library with a different version), we don't enable it by default. See https://www.lightbend.com/blog/scala-inliner-optimizer for more details.
Running system tests
See tests/README.md.
Running in Vagrant
See vagrant/README.md.
Contribution
Apache Kafka is interested in building the community; we would welcome any thoughts or patches. You can reach us on the Apache mailing lists.
To contribute follow the instructions here:
Top Related Projects
Open source RabbitMQ: core server and tier 1 (built-in) plugins
Apache Pulsar - distributed pub-sub messaging system
Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.
Mirror of Apache ActiveMQ
Apache Flink
Convert designs to code with AI
Introducing Visual Copilot: A new AI model to turn Figma designs to high quality code using your components.
Try Visual Copilot