Convert Figma logo to code with AI

apache logoflink-cdc

Flink CDC is a streaming data integration tool

5,834
1,968
5,834
168

Top Related Projects

10,544

Change data capture for a variety of databases. Please log issues at https://issues.redhat.com/browse/DBZ.

28,667

阿里巴巴 MySQL binlog 增量订阅&消费组件

4,062

Maxwell's daemon, a mysql-to-json kafka producer

Flink CDC is a streaming data integration tool

15,871

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.

Quick Overview

Apache Flink CDC (Change Data Capture) is an open-source project that enables real-time data synchronization from various databases to Apache Flink. It allows users to capture and process database changes as a stream, facilitating real-time data integration, analytics, and event-driven applications.

Pros

  • Supports multiple databases, including MySQL, PostgreSQL, Oracle, and SQL Server
  • Provides low-latency, exactly-once processing of change events
  • Integrates seamlessly with Apache Flink's ecosystem and powerful stream processing capabilities
  • Offers both full snapshot and incremental change capture modes

Cons

  • Requires a good understanding of Apache Flink and stream processing concepts
  • May have performance overhead for very high-volume change capture scenarios
  • Limited support for certain advanced database features or custom data types
  • Dependency on specific database configurations and permissions for optimal performance

Code Examples

  1. Creating a MySQL CDC source:
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
    .hostname("localhost")
    .port(3306)
    .databaseList("inventory")
    .tableList("inventory.products")
    .username("user")
    .password("password")
    .deserializer(new JsonDebeziumDeserializationSchema())
    .build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env
    .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
    .print().setParallelism(1);

env.execute("Print MySQL Snapshot + Binlog");
  1. Creating a PostgreSQL CDC source:
Properties debeziumProperties = new Properties();
debeziumProperties.setProperty("snapshot.mode", "initial");

PostgreSQLSource<String> postgresSource = PostgreSQLSource.<String>builder()
    .hostname("localhost")
    .port(5432)
    .database("postgres")
    .schemaList("inventory")
    .tableList("inventory.products")
    .username("postgres")
    .password("postgres")
    .decodingPluginName("pgoutput")
    .debeziumProperties(debeziumProperties)
    .deserializer(new JsonDebeziumDeserializationSchema())
    .build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env
    .fromSource(postgresSource, WatermarkStrategy.noWatermarks(), "PostgreSQL Source")
    .print().setParallelism(1);

env.execute("Print PostgreSQL Snapshot + Binlog");
  1. Using a custom deserializer:
public class CustomDeserializationSchema implements DebeziumDeserializationSchema<MyCustomType> {
    @Override
    public void deserialize(SourceRecord record, Collector<MyCustomType> out) throws Exception {
        Struct value = (Struct) record.value();
        MyCustomType result = // Convert Struct to MyCustomType
        out.collect(result);
    }

    @Override
    public TypeInformation<MyCustomType> getProducedType() {
        return TypeInformation.of(MyCustomType.class);
    }
}

MySqlSource<MyCustomType> mySqlSource = MySqlSource.<MyCustomType>builder()
    // ... other configurations ...
    .deserializer(new CustomDeserializationSchema())
    .build();

Getting Started

  1. Add Flink CDC dependency to your project:

    <dependency>
      <groupId>com.ververica</groupId>
      <artifactId>flink-connector-mysql-cdc</artifactId>
      <version>2.3.0</version>
    </dependency>
    
  2. Configure your database for CDC (e.g., enable binary logging for MySQL).

  3. Create a Flink CDC source and add it to your Flink job (see code examples above).

  4. Build and submit your Flink job to a Flink cluster.

Competitor Comparisons

10,544

Change data capture for a variety of databases. Please log issues at https://issues.redhat.com/browse/DBZ.

Pros of Debezium

  • Broader database support, including Oracle, SQL Server, and MongoDB
  • More mature project with a larger community and extensive documentation
  • Flexible deployment options, including standalone and Kafka Connect modes

Cons of Debezium

  • Higher resource consumption and potential performance overhead
  • More complex setup and configuration process
  • Limited real-time analytics capabilities compared to Flink CDC

Code Comparison

Debezium configuration example:

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054"
  }
}

Flink CDC configuration example:

MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
    .hostname("localhost")
    .port(3306)
    .databaseList("inventory")
    .tableList("inventory.products")
    .username("flinkuser")
    .password("flinkpw")
    .deserializer(new JsonDebeziumDeserializationSchema())
    .build();

Both projects offer robust CDC solutions, but Debezium provides broader database support and deployment flexibility, while Flink CDC excels in real-time processing and analytics integration within the Apache Flink ecosystem.

28,667

阿里巴巴 MySQL binlog 增量订阅&消费组件

Pros of Canal

  • Mature and widely adopted in production environments
  • Supports a broader range of databases, including MySQL, Oracle, and PostgreSQL
  • Offers flexible deployment options, including standalone and cluster modes

Cons of Canal

  • Primarily focused on MySQL, with less robust support for other databases
  • Requires more manual configuration and setup compared to Flink CDC
  • Limited integration with stream processing frameworks

Code Comparison

Canal configuration example:

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.filter.regex=.*\\..*

Flink CDC configuration example:

MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
    .hostname("localhost")
    .port(3306)
    .databaseList("mydatabase")
    .tableList("mydatabase.users")
    .username("flinkuser")
    .password("flinkpw")
    .deserializer(new JsonDebeziumDeserializationSchema())
    .build();

Both Canal and Flink CDC are powerful tools for change data capture, but they cater to different use cases and integration scenarios. Canal is more established and versatile in terms of database support, while Flink CDC offers seamless integration with Apache Flink for stream processing applications. The choice between the two depends on specific project requirements, existing infrastructure, and the desired level of integration with stream processing frameworks.

4,062

Maxwell's daemon, a mysql-to-json kafka producer

Pros of Maxwell

  • Lightweight and easy to set up, with minimal dependencies
  • Supports multiple output formats (JSON, Avro, Protobuf)
  • Can operate as a standalone application without additional infrastructure

Cons of Maxwell

  • Limited to MySQL databases only
  • Lacks advanced features like exactly-once processing and fault tolerance
  • Not as scalable for high-volume data streams compared to Flink CDC

Code Comparison

Maxwell configuration example:

{
  "producer": "kafka",
  "kafka.bootstrap.servers": "localhost:9092",
  "kafka_topic": "maxwell"
}

Flink CDC configuration example:

MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
    .hostname("localhost")
    .port(3306)
    .databaseList("mydatabase")
    .tableList("mydatabase.users")
    .username("user")
    .password("password")
    .deserializer(new JsonDebeziumDeserializationSchema())
    .build();

Maxwell is more straightforward to configure, while Flink CDC offers more granular control and integration with the Flink ecosystem. Flink CDC provides better scalability and fault tolerance, making it more suitable for large-scale, production-grade change data capture scenarios. However, Maxwell's simplicity and standalone nature make it a good choice for smaller projects or quick prototyping.

Flink CDC is a streaming data integration tool

Pros of flink-cdc

  • More comprehensive documentation and examples
  • Wider range of supported databases and data sources
  • Active community support and frequent updates

Cons of flink-cdc

  • Potentially higher resource consumption
  • Steeper learning curve for beginners
  • May require more configuration for complex setups

Code Comparison

flink-cdc:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
    .hostname("localhost")
    .port(3306)
    .databaseList("inventory")
    .tableList("inventory.products")
    .username("root")
    .password("123456")
    .deserializer(new JsonDebeziumDeserializationSchema())
    .build();

Both repositories are essentially the same project, as apache/flink-cdc is the main repository for Apache Flink CDC Connectors. The comparison provided above is based on the features and characteristics of the Flink CDC project itself, rather than comparing two distinct repositories.

The code example demonstrates how to set up a MySQL source in Flink CDC, which is a common use case for change data capture in Apache Flink applications. This code snippet showcases the ease of configuration and the flexibility of the Flink CDC connectors.

15,871

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.

Pros of Airbyte

  • Broader data integration support with 300+ connectors for various data sources and destinations
  • User-friendly UI for configuration and monitoring of data pipelines
  • Extensible architecture allowing easy creation of custom connectors

Cons of Airbyte

  • Potentially higher resource consumption due to its comprehensive feature set
  • Steeper learning curve for advanced customizations and optimizations

Code Comparison

Flink CDC (Java):

public class MySqlSourceExample {
    public static void main(String[] args) throws Exception {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname("localhost")
            .port(3306)
            .databaseList("inventory")
            .tableList("inventory.products")
            .username("root")
            .password("123456")
            .deserializer(new JsonDebeziumDeserializationSchema())
            .build();
    }
}

Airbyte (Python):

def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
    try:
        with MySQLConnection(**config) as connection:
            connection.cursor().execute("SELECT 1")
        return True, None
    except Exception as e:
        return False, f"An exception occurred: {str(e)}"

The code snippets demonstrate the configuration of a MySQL source in both projects. Flink CDC uses a builder pattern in Java, while Airbyte implements a connection check method in Python.

Convert Figma logo designs to code with AI

Visual Copilot

Introducing Visual Copilot: A new AI model to turn Figma designs to high quality code using your components.

Try Visual Copilot

README

Flink CDC

Test Release Build License

Flink CDC is a distributed data integration tool for real time data and batch data. Flink CDC brings the simplicity and elegance of data integration via YAML to describe the data movement and transformation in a Data Pipeline.

The Flink CDC prioritizes efficient end-to-end data integration and offers enhanced functionalities such as full database synchronization, sharding table synchronization, schema evolution and data transformation.

Flink CDC framework desigin

Getting Started

  1. Prepare a Apache Flink cluster and set up FLINK_HOME environment variable.
  2. Download Flink CDC tar, unzip it and put jars of pipeline connector to Flink lib directory.
  3. Create a YAML file to describe the data source and data sink, the following example synchronizes all tables under MySQL app_db database to Doris :
 source:
   type: mysql
   hostname: localhost
   port: 3306
   username: root
   password: 123456
   tables: app_db.\.*

 sink:
   type: doris
   fenodes: 127.0.0.1:8030
   username: root
   password: ""

 transform:
   - source-table: adb.web_order01
     projection: \*, format('%S', product_name) as product_name
     filter: addone(id) > 10 AND order_id > 100
     description: project fields and filter
   - source-table: adb.web_order02
     projection: \*, format('%S', product_name) as product_name
     filter: addone(id) > 20 AND order_id > 200
     description: project fields and filter

 route:
   - source-table: app_db.orders
     sink-table: ods_db.ods_orders
   - source-table: app_db.shipments
     sink-table: ods_db.ods_shipments
   - source-table: app_db.products
     sink-table: ods_db.ods_products

 pipeline:
   name: Sync MySQL Database to Doris
   parallelism: 2
   user-defined-function:
     - name: addone
       classpath: com.example.functions.AddOneFunctionClass
     - name: format
       classpath: com.example.functions.FormatFunctionClass
  1. Submit pipeline job using flink-cdc.sh script.
 bash bin/flink-cdc.sh /path/mysql-to-doris.yaml
  1. View job execution status through Flink WebUI or downstream database.

Try it out yourself with our more detailed tutorial. You can also see connector overview to view a comprehensive catalog of the connectors currently provided and understand more detailed configurations.

Join the Community

There are many ways to participate in the Apache Flink CDC community. The mailing lists are the primary place where all Flink committers are present. For user support and questions use the user mailing list. If you've found a problem of Flink CDC, please create a Flink jira and tag it with the Flink CDC tag.
Bugs and feature requests can either be discussed on the dev mailing list or on Jira.

Contributing

Welcome to contribute to Flink CDC, please see our Developer Guide and APIs Guide.

License

Apache 2.0 License.

Special Thanks

The Flink CDC community welcomes everyone who is willing to contribute, whether it's through submitting bug reports, enhancing the documentation, or submitting code contributions for bug fixes, test additions, or new feature development.
Thanks to all contributors for their enthusiastic contributions.