Flink实战之 MySQL CDC


前言

flink CDC主要关注于从源数据库(如mysql、PostgreSQL等)捕获数据变更,并将这些变更实时地提供给Flink作业进行处理。Flink CDC的核心优势在于其实时性和一致性。通过捕获数据库的增量变动记录,Flink CDC能够实时地将这些变更数据同步到Flink流处理作业中,从而实现低延迟的数据处理和分析。同时,Flink CDC还保证了数据的一致性,确保在数据处理过程中数据的准确性和完整性。

为了实现这一功能,Flink社区开发了flink-cdc-connectors组件。这是一个可以直接从MySQL、PostgreSQL等数据库读取全量数据和增量变更数据的source组件。通过配置相应的连接器和参数,Flink作业可以连接到源数据库,并实时捕获和处理数据变更。


MySQL 启用binlog

在使用CDC之前务必要开启MySQl的binlog。
修改my.***f文件,增加:

server_id=1
log_bin=mysql-bin
binlog_format=ROW
expire_logs_days=15
binlog_do_db=testdb

添加maven依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>***.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.4.0</version>
        </dependency>

使用DataStream API java代码读取CDC数据流

创建表

CREATE TABLE `userdemo` (
	`user_id` VARCHAR(50) NOT NULL COLLATE 'utf8mb4_general_ci',
	`user_name` VARCHAR(50) NULL DEFAULT NULL COLLATE 'utf8mb4_general_ci',
	`age` INT(11) NULL DEFAULT '0',
	PRIMARY KEY (`user_id`) USING BTREE
)
COLLATE='utf8mb4_general_ci'
ENGINE=InnoDB
;

import ***.ververica.cdc.connectors.mysql.MySqlSource;
import ***.ververica.cdc.connectors.mysql.table.StartupOptions;
import ***.ververica.cdc.debezium.DebeziumSourceFunction;
import ***.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @Date: 2024/3/12 10:03
 * @Description DataStream API CDC
 **/
public class FlinkMysqlCdc {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
                .hostname("10.168.192.70")
                .port(3306)
                .username("root")
                .password("XXXXX")
                .databaseList("testdb")
                // 这里一定要是db.table的形式
                .tableList("testdb.userdemo")
                .serverTimeZone("GMT+8")
//                .deserializer(new StringDebeziumDeserializationSchema())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();

        DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);
        dataStreamSource.print();
        env.execute("FlinkDSCDC");
    }
}

运行程序输出内容入下:

{"before":null,"after":{"user_id":"001","user_name":"sdaf","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1710228835000,"snapshot":"true","db":"testdb","sequence":null,"table":"userdemo","server_id":0,"gtid":null,"file":"xxx.000002","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1710228834716,"transaction":null}
{"before":null,"after":{"user_id":"002","user_name":"DSF","age":35},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1710228835000,"snapshot":"last","db":"testdb","sequence":null,"table":"userdemo","server_id":0,"gtid":null,"file":"xxx.000002","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1710228834720,"transaction":null}

插入数据

INSERT INTO  userdemo (user_id,user_name,age) VALUES('004','wangwu',26);
{"before":null,"after":{"user_id":"004","user_name":"wangwu","age":26},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1710235648000,"snapshot":"false","db":"testdb","sequence":null,"table":"userdemo","server_id":1,"gtid":null,"file":"xxx.000002","pos":649,"row":0,"thread":7,"query":null},"op":"c","ts_ms":1710235647380,"transaction":null}

修改userdemo数据

UPDATE userdemo SET user_name='zhangsan' WHERE user_id='001'

运行结果如下:

{"before":{"user_id":"001","user_name":"sdaf","age":23},"after":{"user_id":"001","user_name":"zhangsan","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1710235526000,"snapshot":"false","db":"testdb","sequence":null,"table":"userdemo","server_id":1,"gtid":null,"file":"xxx.000002","pos":352,"row":0,"thread":7,"query":null},"op":"u","ts_ms":1710235525246,"transaction":null}

使用Flink SQL读取CDC数据流

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;  
import org.apache.flink.table.api.*;  
  
public class MyFlinkCDCJob {  
    public static void main(String[] args) throws Exception {  
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);  
  
        // 配置 MySQL CDC 源  
        String sourceDDL = "CREATE TABLE my_table (" +  
                " id INT NOT NULL," +  
                " name STRING," +  
                " age INT," +  
                " PRIMARY KEY (id) NOT ENFORCED" +  
                ") WITH (" +  
                " 'connector' = 'mysql-cdc'," +  
                " 'hostname' = 'your_mysql_hostname'," +  
                " 'port' = '3306'," +  
                " 'username' = 'your_username'," +  
                " 'password' = 'your_password'," +  
                " 'database-name' = 'your_database_name'," +  
                " 'table-name' = 'your_table_name'" +  
                ")";  
        tableEnv.executeSql(sourceDDL);  
  
        // 定义 Flink 作业逻辑  
        Table result = tableEnv.sqlQuery("SELECT * FROM my_table");  
        tableEnv.toRetractStream(result, Row.class).print();  
  
        // 执行作业  
        env.execute("My Flink CDC Job");  
    }  
}

在上面的代码中,我们创建了一个名为 my_table 的表,该表通过 MySQL CDC 连接器连接到 MySQL 数据库。然后,我们执行一个 SQL 查询来选择这个表中的所有数据,并将结果打印到控制台。

请注意,你需要替换示例代码中的 'your_mysql_hostname', 'your_username', 'your_password', 'your_database_name', 和 'your_table_name' 为你的实际 MySQL 数据库信息。

转载请说明出处内容投诉
CSS教程_站长资源网 » Flink实战之 MySQL CDC

发表评论

欢迎 访客 发表评论

一个令你着迷的主题!

查看演示 官网购买