Top Related Projects
Change data capture for a variety of databases. Please log issues at https://issues.redhat.com/browse/DBZ.
阿里巴巴 MySQL binlog 增量订阅&消费组件
Maxwell's daemon, a mysql-to-json kafka producer
Flink CDC is a streaming data integration tool
The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
Quick Overview
Apache Flink CDC (Change Data Capture) is an open-source project that enables real-time data synchronization from various databases to Apache Flink. It allows users to capture and process database changes as a stream, facilitating real-time data integration, analytics, and event-driven applications.
Pros
- Supports multiple databases, including MySQL, PostgreSQL, Oracle, and SQL Server
- Provides low-latency, exactly-once processing of change events
- Integrates seamlessly with Apache Flink's ecosystem and powerful stream processing capabilities
- Offers both full snapshot and incremental change capture modes
Cons
- Requires a good understanding of Apache Flink and stream processing concepts
- May have performance overhead for very high-volume change capture scenarios
- Limited support for certain advanced database features or custom data types
- Dependency on specific database configurations and permissions for optimal performance
Code Examples
- Creating a MySQL CDC source:
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("inventory")
.tableList("inventory.products")
.username("user")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
.print().setParallelism(1);
env.execute("Print MySQL Snapshot + Binlog");
- Creating a PostgreSQL CDC source:
Properties debeziumProperties = new Properties();
debeziumProperties.setProperty("snapshot.mode", "initial");
PostgreSQLSource<String> postgresSource = PostgreSQLSource.<String>builder()
.hostname("localhost")
.port(5432)
.database("postgres")
.schemaList("inventory")
.tableList("inventory.products")
.username("postgres")
.password("postgres")
.decodingPluginName("pgoutput")
.debeziumProperties(debeziumProperties)
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.fromSource(postgresSource, WatermarkStrategy.noWatermarks(), "PostgreSQL Source")
.print().setParallelism(1);
env.execute("Print PostgreSQL Snapshot + Binlog");
- Using a custom deserializer:
public class CustomDeserializationSchema implements DebeziumDeserializationSchema<MyCustomType> {
@Override
public void deserialize(SourceRecord record, Collector<MyCustomType> out) throws Exception {
Struct value = (Struct) record.value();
MyCustomType result = // Convert Struct to MyCustomType
out.collect(result);
}
@Override
public TypeInformation<MyCustomType> getProducedType() {
return TypeInformation.of(MyCustomType.class);
}
}
MySqlSource<MyCustomType> mySqlSource = MySqlSource.<MyCustomType>builder()
// ... other configurations ...
.deserializer(new CustomDeserializationSchema())
.build();
Getting Started
-
Add Flink CDC dependency to your project:
<dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>2.3.0</version> </dependency>
-
Configure your database for CDC (e.g., enable binary logging for MySQL).
-
Create a Flink CDC source and add it to your Flink job (see code examples above).
-
Build and submit your Flink job to a Flink cluster.
Competitor Comparisons
Change data capture for a variety of databases. Please log issues at https://issues.redhat.com/browse/DBZ.
Pros of Debezium
- Broader database support, including Oracle, SQL Server, and MongoDB
- More mature project with a larger community and extensive documentation
- Flexible deployment options, including standalone and Kafka Connect modes
Cons of Debezium
- Higher resource consumption and potential performance overhead
- More complex setup and configuration process
- Limited real-time analytics capabilities compared to Flink CDC
Code Comparison
Debezium configuration example:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054"
}
}
Flink CDC configuration example:
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("inventory")
.tableList("inventory.products")
.username("flinkuser")
.password("flinkpw")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
Both projects offer robust CDC solutions, but Debezium provides broader database support and deployment flexibility, while Flink CDC excels in real-time processing and analytics integration within the Apache Flink ecosystem.
阿里巴巴 MySQL binlog 增量订阅&消费组件
Pros of Canal
- Mature and widely adopted in production environments
- Supports a broader range of databases, including MySQL, Oracle, and PostgreSQL
- Offers flexible deployment options, including standalone and cluster modes
Cons of Canal
- Primarily focused on MySQL, with less robust support for other databases
- Requires more manual configuration and setup compared to Flink CDC
- Limited integration with stream processing frameworks
Code Comparison
Canal configuration example:
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.filter.regex=.*\\..*
Flink CDC configuration example:
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("mydatabase")
.tableList("mydatabase.users")
.username("flinkuser")
.password("flinkpw")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
Both Canal and Flink CDC are powerful tools for change data capture, but they cater to different use cases and integration scenarios. Canal is more established and versatile in terms of database support, while Flink CDC offers seamless integration with Apache Flink for stream processing applications. The choice between the two depends on specific project requirements, existing infrastructure, and the desired level of integration with stream processing frameworks.
Maxwell's daemon, a mysql-to-json kafka producer
Pros of Maxwell
- Lightweight and easy to set up, with minimal dependencies
- Supports multiple output formats (JSON, Avro, Protobuf)
- Can operate as a standalone application without additional infrastructure
Cons of Maxwell
- Limited to MySQL databases only
- Lacks advanced features like exactly-once processing and fault tolerance
- Not as scalable for high-volume data streams compared to Flink CDC
Code Comparison
Maxwell configuration example:
{
"producer": "kafka",
"kafka.bootstrap.servers": "localhost:9092",
"kafka_topic": "maxwell"
}
Flink CDC configuration example:
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("mydatabase")
.tableList("mydatabase.users")
.username("user")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
Maxwell is more straightforward to configure, while Flink CDC offers more granular control and integration with the Flink ecosystem. Flink CDC provides better scalability and fault tolerance, making it more suitable for large-scale, production-grade change data capture scenarios. However, Maxwell's simplicity and standalone nature make it a good choice for smaller projects or quick prototyping.
Flink CDC is a streaming data integration tool
Pros of flink-cdc
- More comprehensive documentation and examples
- Wider range of supported databases and data sources
- Active community support and frequent updates
Cons of flink-cdc
- Potentially higher resource consumption
- Steeper learning curve for beginners
- May require more configuration for complex setups
Code Comparison
flink-cdc:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("inventory")
.tableList("inventory.products")
.username("root")
.password("123456")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
Both repositories are essentially the same project, as apache/flink-cdc is the main repository for Apache Flink CDC Connectors. The comparison provided above is based on the features and characteristics of the Flink CDC project itself, rather than comparing two distinct repositories.
The code example demonstrates how to set up a MySQL source in Flink CDC, which is a common use case for change data capture in Apache Flink applications. This code snippet showcases the ease of configuration and the flexibility of the Flink CDC connectors.
The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
Pros of Airbyte
- Broader data integration support with 300+ connectors for various data sources and destinations
- User-friendly UI for configuration and monitoring of data pipelines
- Extensible architecture allowing easy creation of custom connectors
Cons of Airbyte
- Potentially higher resource consumption due to its comprehensive feature set
- Steeper learning curve for advanced customizations and optimizations
Code Comparison
Flink CDC (Java):
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("inventory")
.tableList("inventory.products")
.username("root")
.password("123456")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
}
}
Airbyte (Python):
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
try:
with MySQLConnection(**config) as connection:
connection.cursor().execute("SELECT 1")
return True, None
except Exception as e:
return False, f"An exception occurred: {str(e)}"
The code snippets demonstrate the configuration of a MySQL source in both projects. Flink CDC uses a builder pattern in Java, while Airbyte implements a connection check method in Python.
Convert designs to code with AI
Introducing Visual Copilot: A new AI model to turn Figma designs to high quality code using your components.
Try Visual CopilotREADME
Flink CDC is a distributed data integration tool for real time data and batch data. Flink CDC brings the simplicity and elegance of data integration via YAML to describe the data movement and transformation in a Data Pipeline.
The Flink CDC prioritizes efficient end-to-end data integration and offers enhanced functionalities such as full database synchronization, sharding table synchronization, schema evolution and data transformation.
Getting Started
- Prepare a Apache Flink cluster and set up
FLINK_HOME
environment variable. - Download Flink CDC tar, unzip it and put jars of pipeline connector to Flink
lib
directory. - Create a YAML file to describe the data source and data sink, the following example synchronizes all tables under MySQL app_db database to Doris :
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*
sink:
type: doris
fenodes: 127.0.0.1:8030
username: root
password: ""
transform:
- source-table: adb.web_order01
projection: \*, format('%S', product_name) as product_name
filter: addone(id) > 10 AND order_id > 100
description: project fields and filter
- source-table: adb.web_order02
projection: \*, format('%S', product_name) as product_name
filter: addone(id) > 20 AND order_id > 200
description: project fields and filter
route:
- source-table: app_db.orders
sink-table: ods_db.ods_orders
- source-table: app_db.shipments
sink-table: ods_db.ods_shipments
- source-table: app_db.products
sink-table: ods_db.ods_products
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
user-defined-function:
- name: addone
classpath: com.example.functions.AddOneFunctionClass
- name: format
classpath: com.example.functions.FormatFunctionClass
- Submit pipeline job using
flink-cdc.sh
script.
bash bin/flink-cdc.sh /path/mysql-to-doris.yaml
- View job execution status through Flink WebUI or downstream database.
Try it out yourself with our more detailed tutorial. You can also see connector overview to view a comprehensive catalog of the connectors currently provided and understand more detailed configurations.
Join the Community
There are many ways to participate in the Apache Flink CDC community. The
mailing lists are the primary place where all Flink
committers are present. For user support and questions use the user mailing list. If you've found a problem of Flink CDC,
please create a Flink jira and tag it with the Flink CDC
tag.
Bugs and feature requests can either be discussed on the dev mailing list or on Jira.
Contributing
Welcome to contribute to Flink CDC, please see our Developer Guide and APIs Guide.
License
Special Thanks
The Flink CDC community welcomes everyone who is willing to contribute, whether it's through submitting bug reports,
enhancing the documentation, or submitting code contributions for bug fixes, test additions, or new feature development.
Thanks to all contributors for their enthusiastic contributions.
Top Related Projects
Change data capture for a variety of databases. Please log issues at https://issues.redhat.com/browse/DBZ.
阿里巴巴 MySQL binlog 增量订阅&消费组件
Maxwell's daemon, a mysql-to-json kafka producer
Flink CDC is a streaming data integration tool
The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
Convert designs to code with AI
Introducing Visual Copilot: A new AI model to turn Figma designs to high quality code using your components.
Try Visual Copilot