Top Related Projects
Open source RabbitMQ: core server and tier 1 (built-in) plugins
Apache Pulsar - distributed pub-sub messaging system
Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.
Mirror of Apache ActiveMQ
Apache Flink
Quick Overview
Apache Kafka is a distributed event streaming platform capable of handling trillions of events a day. Initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log. It's designed for high-throughput, fault-tolerant, and scalable data streaming applications.
Pros
- High throughput and low latency for real-time data streaming
- Scalable and fault-tolerant architecture
- Supports stream processing with exactly-once semantics
- Integrates well with big data ecosystems (Hadoop, Spark, Flink, etc.)
Cons
- Complex setup and configuration for optimal performance
- Requires careful capacity planning and monitoring
- Potential for data loss if not configured properly
- Steep learning curve for advanced features and operations
Code Examples
- Producing messages:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my-topic", "key", "value"));
producer.close();
- Consuming messages:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
- Creating a topic:
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (Admin admin = Admin.create(props)) {
NewTopic newTopic = new NewTopic("my-new-topic", 1, (short) 1);
admin.createTopics(Collections.singleton(newTopic)).all().get();
}
Getting Started
-
Download and extract Kafka:
wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz tar -xzf kafka_2.13-3.4.0.tgz cd kafka_2.13-3.4.0
-
Start ZooKeeper and Kafka server:
bin/zookeeper-server-start.sh config/zookeeper.properties & bin/kafka-server-start.sh config/server.properties &
-
Create a topic:
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
-
Write and read events:
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
Competitor Comparisons
Open source RabbitMQ: core server and tier 1 (built-in) plugins
Pros of RabbitMQ
- Easier to set up and manage for smaller deployments
- Supports multiple messaging protocols (AMQP, MQTT, STOMP)
- More flexible routing options with exchanges and bindings
Cons of RabbitMQ
- Lower throughput compared to Kafka, especially for high-volume scenarios
- Less scalable for large-scale distributed systems
- Limited built-in stream processing capabilities
Code Comparison
RabbitMQ (Erlang):
basic_publish(Channel, <<"my_exchange">>, <<"routing_key">>, <<"Hello, World!">>),
{#'basic.get_ok'{}, Content} = basic_get(Channel, <<"my_queue">>, no_ack),
io:format("Received: ~p~n", [Content]).
Kafka (Java):
producer.send(new ProducerRecord<>("my-topic", "Hello, World!"));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received: " + record.value());
}
Both RabbitMQ and Kafka are popular message brokers, but they have different strengths. RabbitMQ is more suitable for complex routing scenarios and smaller deployments, while Kafka excels in high-throughput, large-scale distributed systems with built-in stream processing capabilities. The choice between them depends on specific use cases and requirements.
Apache Pulsar - distributed pub-sub messaging system
Pros of Pulsar
- Multi-tenancy support with better isolation between tenants
- Built-in support for multiple storage tiers (hot/warm/cold)
- Native geo-replication capabilities
Cons of Pulsar
- Steeper learning curve due to more complex architecture
- Smaller community and ecosystem compared to Kafka
- Higher resource requirements for deployment
Code Comparison
Kafka producer example:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
Pulsar producer example:
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer<byte[]> producer = client.newProducer()
.topic("my-topic")
.create();
Both Kafka and Pulsar offer robust messaging capabilities, but they differ in architecture and features. Kafka is known for its simplicity and high throughput, while Pulsar provides more advanced features at the cost of increased complexity. The choice between them depends on specific use cases and requirements.
Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.
Pros of RocketMQ
- Simpler architecture and easier to deploy, especially for smaller-scale applications
- Built-in support for message tracing and message filtering
- More flexible message consumption models, including push and pull
Cons of RocketMQ
- Less mature ecosystem and community support compared to Kafka
- Lower throughput in high-volume scenarios
- Limited language client support beyond Java
Code Comparison
RocketMQ producer example:
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
Kafka producer example:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("topic", "Hello Kafka"));
Both RocketMQ and Kafka are distributed messaging and streaming platforms, but they have different strengths and use cases. RocketMQ is often preferred for its simplicity and built-in features, while Kafka is known for its high throughput and scalability in large-scale distributed systems.
Mirror of Apache ActiveMQ
Pros of ActiveMQ
- Supports multiple protocols (AMQP, MQTT, STOMP) out of the box
- Easier to set up and manage for smaller-scale deployments
- Offers both persistent and non-persistent message delivery
Cons of ActiveMQ
- Lower throughput and scalability compared to Kafka
- Less suitable for big data and real-time streaming scenarios
- More complex configuration for high availability setups
Code Comparison
ActiveMQ (Java):
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("myQueue");
MessageProducer producer = session.createProducer(destination);
Kafka (Java):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
Both ActiveMQ and Kafka are popular message brokers, but they have different strengths. ActiveMQ is more versatile in terms of protocol support and easier to set up for smaller projects. Kafka, on the other hand, excels in high-throughput scenarios and is better suited for big data applications and real-time streaming.
Apache Flink
Pros of Flink
- Supports both batch and stream processing in a unified framework
- Offers lower latency and higher throughput for real-time data processing
- Provides built-in support for event time processing and out-of-order events
Cons of Flink
- Steeper learning curve due to more complex API and concepts
- Smaller community and ecosystem compared to Kafka
- Less mature and battle-tested in production environments
Code Comparison
Flink (Stream Processing):
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));
stream.map(s -> s.toUpperCase())
.filter(s -> s.startsWith("A"))
.addSink(new FlinkKafkaProducer<>("output-topic", new SimpleStringSchema(), properties));
Kafka Streams:
KStream<String, String> stream = builder.stream("topic");
stream.mapValues(String::toUpperCase)
.filter((key, value) -> value.startsWith("A"))
.to("output-topic");
Both examples demonstrate processing a stream of data, but Flink's API is more flexible for complex stream processing tasks, while Kafka Streams is more tightly integrated with Kafka itself.
Convert designs to code with AI
Introducing Visual Copilot: A new AI model to turn Figma designs to high quality code using your components.
Try Visual CopilotREADME
Apache Kafka
See our web site for details on the project.
You need to have Java installed.
We build and test Apache Kafka with 17 and 23. The release
parameter in javac and scalac is set to 11
for the clients
and streams modules, and 17
for the broker and tools, ensuring compatibility with their respective minimum Java versions.
Scala 2.13 is the only supported version in Apache Kafka.
Build a jar and run it
./gradlew jar
Follow instructions in https://kafka.apache.org/quickstart
Build source jar
./gradlew srcJar
Build aggregated javadoc
./gradlew aggregatedJavadoc
Build javadoc and scaladoc
./gradlew javadoc
./gradlew javadocJar # builds a javadoc jar for each module
./gradlew scaladoc
./gradlew scaladocJar # builds a scaladoc jar for each module
./gradlew docsJar # builds both (if applicable) javadoc and scaladoc jars for each module
Run unit/integration tests
./gradlew test # runs both unit and integration tests
./gradlew unitTest
./gradlew integrationTest
./gradlew quarantinedTest # runs the quarantined tests
Force re-running tests without code change
./gradlew test --rerun-tasks
./gradlew unitTest --rerun-tasks
./gradlew integrationTest --rerun-tasks
Running a particular unit/integration test
./gradlew clients:test --tests RequestResponseTest
Repeatedly running a particular unit/integration test with specific times by setting N
N=500; I=0; while [ $I -lt $N ] && ./gradlew clients:test --tests RequestResponseTest --rerun --fail-fast; do (( I=$I+1 )); echo "Completed run: $I"; sleep 1; done
Running a particular test method within a unit/integration test
./gradlew core:test --tests kafka.api.ProducerFailureHandlingTest.testCannotSendToInternalTopic
./gradlew clients:test --tests org.apache.kafka.clients.MetadataTest.testTimeToNextUpdate
Running a particular unit/integration test with log4j output
By default, there will be only small number of logs output while testing. You can adjust it by changing the log4j2.yml
file in the module's src/test/resources
directory.
For example, if you want to see more logs for clients project tests, you can modify the line in clients/src/test/resources/log4j2.yml
to level: INFO
and then run:
./gradlew cleanTest clients:test --tests NetworkClientTest
And you should see INFO
level logs in the file under the clients/build/test-results/test
directory.
Specifying test retries
Retries are disabled by default, but you can set maxTestRetryFailures and maxTestRetries to enable retries.
The following example declares -PmaxTestRetries=1 and -PmaxTestRetryFailures=3 to enable a failed test to be retried once, with a total retry limit of 3.
./gradlew test -PmaxTestRetries=1 -PmaxTestRetryFailures=3
The quarantinedTest task also has no retries by default, but you can set maxQuarantineTestRetries and maxQuarantineTestRetryFailures to enable retries, similar to the test task.
./gradlew quarantinedTest -PmaxQuarantineTestRetries=3 -PmaxQuarantineTestRetryFailures=20
See Test Retry Gradle Plugin for and build.yml more details.
Generating test coverage reports
Generate coverage reports for the whole project:
./gradlew reportCoverage -PenableTestCoverage=true -Dorg.gradle.parallel=false
Generate coverage for a single module, i.e.:
./gradlew clients:reportCoverage -PenableTestCoverage=true -Dorg.gradle.parallel=false
Building a binary release gzipped tar ball
./gradlew clean releaseTarGz
The release file can be found inside ./core/build/distributions/
.
Building auto generated messages
Sometimes it is only necessary to rebuild the RPC auto-generated message data when switching between branches, as they could fail due to code changes. You can just run:
./gradlew processMessages processTestMessages
Running a Kafka broker in KRaft mode
Using compiled files:
KAFKA_CLUSTER_ID="$(./bin/kafka-storage.sh random-uuid)"
./bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/reconfig-server.properties
./bin/kafka-server-start.sh config/kraft/reconfig-server.properties
Using docker image:
docker run -p 9092:9092 apache/kafka:3.7.0
Cleaning the build
./gradlew clean
Running a task for a specific project
This is for core
, examples
and clients
./gradlew core:jar
./gradlew core:test
Streams has multiple sub-projects, but you can run all the tests:
./gradlew :streams:testAll
Listing all gradle tasks
./gradlew tasks
Building IDE project
Note Please ensure that JDK17 is used when developing Kafka.
Note that this is not strictly necessary (IntelliJ IDEA has good built-in support for Gradle projects, for example).
./gradlew eclipse
./gradlew idea
The eclipse
task has been configured to use ${project_dir}/build_eclipse
as Eclipse's build directory. Eclipse's default
build directory (${project_dir}/bin
) clashes with Kafka's scripts directory and we don't use Gradle's build directory
to avoid known issues with this configuration.
IntelliJ Language Level awareness:
IntelliJ will automatically check Java syntax and compatibility for each module, even if the Java version is not explicitly set in the Structure > Project Settings > Modules.
Publishing the streams quickstart archetype artifact to maven
For the Streams archetype project, one cannot use gradle to upload to maven; instead the mvn deploy
command needs to be called at the quickstart folder:
cd streams/quickstart
mvn deploy
Please note for this to work you should create/update user maven settings (typically, ${USER_HOME}/.m2/settings.xml
) to assign the following variables
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0
https://maven.apache.org/xsd/settings-1.0.0.xsd">
...
<servers>
...
<server>
<id>apache.snapshots.https</id>
<username>${maven_username}</username>
<password>${maven_password}</password>
</server>
<server>
<id>apache.releases.https</id>
<username>${maven_username}</username>
<password>${maven_password}</password>
</server>
...
</servers>
...
Installing specific projects to the local Maven repository
./gradlew -PskipSigning=true :streams:publishToMavenLocal
Building the test jar
./gradlew testJar
Running code quality checks
There are two code quality analysis tools that we regularly run, spotbugs and checkstyle.
Checkstyle
Checkstyle enforces a consistent coding style in Kafka. You can run checkstyle using:
./gradlew checkstyleMain checkstyleTest spotlessCheck
The checkstyle warnings will be found in reports/checkstyle/reports/main.html
and reports/checkstyle/reports/test.html
files in the
subproject build directories. They are also printed to the console. The build will fail if Checkstyle fails.
For experiments (or regression testing purposes) add -PcheckstyleVersion=X.y.z
switch (to override project-defined checkstyle version).
Spotless
The import order is a part of static check. please call spotlessApply
to optimize the imports of Java codes before filing pull request.
./gradlew spotlessApply
Spotbugs
Spotbugs uses static analysis to look for bugs in the code. You can run spotbugs using:
./gradlew spotbugsMain spotbugsTest -x test
The spotbugs warnings will be found in reports/spotbugs/main.html
and reports/spotbugs/test.html
files in the subproject build
directories. Use -PxmlSpotBugsReport=true to generate an XML report instead of an HTML one.
JMH microbenchmarks
We use JMH to write microbenchmarks that produce reliable results in the JVM.
See jmh-benchmarks/README.md for details on how to run the microbenchmarks.
Dependency Analysis
The gradle dependency debugging documentation mentions using the dependencies
or dependencyInsight
tasks to debug dependencies for the root project or individual subprojects.
Alternatively, use the allDeps
or allDepInsight
tasks for recursively iterating through all subprojects:
./gradlew allDeps
./gradlew allDepInsight --configuration runtimeClasspath --dependency com.fasterxml.jackson.core:jackson-databind
These take the same arguments as the builtin variants.
Determining if any dependencies could be updated
./gradlew dependencyUpdates
Common build options
The following options should be set with a -P
switch, for example ./gradlew -PmaxParallelForks=1 test
.
commitId
: sets the build commit ID as .git/HEAD might not be correct if there are local commits added for build purposes.mavenUrl
: sets the URL of the maven deployment repository (file://path/to/repo
can be used to point to a local repository).maxParallelForks
: maximum number of test processes to start in parallel. Defaults to the number of processors available to the JVM.maxScalacThreads
: maximum number of worker threads for the scalac backend. Defaults to the lowest of8
and the number of processors available to the JVM. The value must be between 1 and 16 (inclusive).ignoreFailures
: ignore test failures from junitshowStandardStreams
: shows standard out and standard error of the test JVM(s) on the console.skipSigning
: skips signing of artifacts.testLoggingEvents
: unit test events to be logged, separated by comma. For example./gradlew -PtestLoggingEvents=started,passed,skipped,failed test
.xmlSpotBugsReport
: enable XML reports for spotBugs. This also disables HTML reports as only one can be enabled at a time.maxTestRetries
: maximum number of retries for a failing test case.maxTestRetryFailures
: maximum number of test failures before retrying is disabled for subsequent tests.enableTestCoverage
: enables test coverage plugins and tasks, including bytecode enhancement of classes required to track said coverage. Note that this introduces some overhead when running tests and hence why it's disabled by default (the overhead varies, but 15-20% is a reasonable estimate).keepAliveMode
: configures the keep alive mode for the Gradle compilation daemon - reuse improves start-up time. The values should be one ofdaemon
orsession
(the default isdaemon
).daemon
keeps the daemon alive until it's explicitly stopped whilesession
keeps it alive until the end of the build session. This currently only affects the Scala compiler, see https://github.com/gradle/gradle/pull/21034 for a PR that attempts to do the same for the Java compiler.scalaOptimizerMode
: configures the optimizing behavior of the scala compiler, the value should be one ofnone
,method
,inline-kafka
orinline-scala
(the default isinline-kafka
).none
is the scala compiler default, which only eliminates unreachable code.method
also includes method-local optimizations.inline-kafka
adds inlining of methods within the kafka packages. Finally,inline-scala
also includes inlining of methods within the scala library (which avoids lambda allocations for methods likeOption.exists
).inline-scala
is only safe if the Scala library version is the same at compile time and runtime. Since we cannot guarantee this for all cases (for example, users may depend on the kafka jar for integration tests where they may include a scala library with a different version), we don't enable it by default. See https://www.lightbend.com/blog/scala-inliner-optimizer for more details.
Running system tests
See tests/README.md.
Running in Vagrant
See vagrant/README.md.
Contribution
Apache Kafka is interested in building the community; we would welcome any thoughts or patches. You can reach us on the Apache mailing lists.
To contribute follow the instructions here:
Top Related Projects
Open source RabbitMQ: core server and tier 1 (built-in) plugins
Apache Pulsar - distributed pub-sub messaging system
Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.
Mirror of Apache ActiveMQ
Apache Flink
Convert designs to code with AI
Introducing Visual Copilot: A new AI model to turn Figma designs to high quality code using your components.
Try Visual Copilot