
简介
Flink CDC Connector 是ApacheFlink的一组数据源连接器,使用变化数据捕获change data capture (CDC)从不同的数据库中提取变更数据。Flink CDC连接器将Debezium集成为引擎来捕获数据变更。因此,它可以充分利用Debezium的功能。
特点
- 支持读取数据库快照,并且能够持续读取数据库的变更日志,即使发生故障,也支持exactly-once 的处理语义
- 对于DataStream API的CDC connector,用户无需部署Debezium和Kafka,即可在单个作业中使用多个数据库和表上的变更数据。
- 对于Table/SQL API 的CDC connector,用户可以使用SQL DDL创建CDC数据源,来监视单个表上的数据变更。
使用场景
- 数据库之间的增量数据同步
- 审计日志
- 数据库之上的实时物化视图
- 基于CDC的维表join
- …
Flink提供的 table format
Flink提供了一系列可以用于table connector的table format,具体如下:
| Formats | Supported Connectors |
|---|---|
| CSV | Apache Kafka, Filesystem |
| JSON | Apache Kafka, Filesystem, Elasticsearch |
| Apache Avro | Apache Kafka, Filesystem |
| Debezium CDC | Apache Kafka |
| Canal CDC | Apache Kafka |
| Apache Parquet | Filesystem |
| Apache ORC | Filesystem |
使用过程中的注意点
使用MySQL CDC的注意点
如果要使用MySQL CDC connector,对于程序而言,需要添加如下依赖:
- <dependency>
- <groupId>com.alibaba.ververica</groupId>
- <artifactId>flink-connector-mysql-cdc</artifactId>
- <version>1.0.0</version>
- </dependency>
如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-mysql-cdc-1.0.0.jar,将该jar包放在Flink安装目录的lib文件夹下即可。
使用canal-json的注意点
如果要使用Kafka的canal-json,对于程序而言,需要添加如下依赖:
- <!--universal-->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka_2.11</artifactId>
- <version>1.11.0</version>
- </dependency>
如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-kafka_2.11-1.11.0.jar,将该jar包放在Flink安装目录的lib文件夹下即可。由于Flink1.11的安装包 的lib目录下并没有提供该jar包,所以必须要手动添加依赖包,否则会报如下错误:
- [ERROR]CouldnotexecuteSQLstatement.Reason:
- org.apache.flink.table.api.ValidationException:Couldnotfindanyfactoryforidentifier'kafka'thatimplements'org.apache.flink.table.factories.DynamicTableSourceFactory'intheclasspath.
- Availablefactoryidentifiersare:
- datagen
- mysql-cdc
使用changelog-json的注意点
如果要使用Kafka的changelog-json Format,对于程序而言,需要添加如下依赖:
- <dependency>
- <groupId>com.alibaba.ververica</groupId>
- <artifactId>flink-format-changelog-json</artifactId>
- <version>1.0.0</version>
- </dependency>
如果要使用Flink SQL Client,需要添加如下jar包:flink-format-changelog-json-1.0.0.jar,将该jar包放在Flink安装目录的lib文件夹下即可。
mysql-cdc的操作实践
创建MySQL数据源表
在创建MySQL CDC表之前,需要先创建MySQL的数据表,如下:
- --MySQL
- /*Tablestructurefortable`order_info`*/
- DROPTABLEIFEXISTS`order_info`;
- CREATETABLE`order_info`(
- `id`bigint(20)NOTNULLAUTO_INCREMENTCOMMENT'编号',
- `consignee`varchar(100)DEFAULTNULLCOMMENT'收货人',
- `consignee_tel`varchar(20)DEFAULTNULLCOMMENT'收件人电话',
- `total_amount`decimal(10,2)DEFAULTNULLCOMMENT'总金额',
- `order_status`varchar(20)DEFAULTNULLCOMMENT'订单状态,1表示下单,2表示支付',
- `user_id`bigint(20)DEFAULTNULLCOMMENT'用户id',
- `payment_way`varchar(20)DEFAULTNULLCOMMENT'付款方式',
- `delivery_address`varchar(1000)DEFAULTNULLCOMMENT'送货地址',
- `order_comment`varchar(200)DEFAULTNULLCOMMENT'订单备注',
- `out_trade_no`varchar(50)DEFAULTNULLCOMMENT'订单交易编号(第三方支付用)',
- `trade_body`varchar(200)DEFAULTNULLCOMMENT'订单描述(第三方支付用)',
- `create_time`datetimeDEFAULTNULLCOMMENT'创建时间',
- `operate_time`datetimeDEFAULTNULLCOMMENT'操作时间',
- `expire_time`datetimeDEFAULTNULLCOMMENT'失效时间',
- `tracking_no`varchar(100)DEFAULTNULLCOMMENT'物流单编号',
- `parent_order_id`bigint(20)DEFAULTNULLCOMMENT'父订单编号',
- `img_url`varchar(200)DEFAULTNULLCOMMENT'图片路径',
- `province_id`int(20)DEFAULTNULLCOMMENT'地区',
- PRIMARYKEY(`id`)
- )ENGINE=InnoDBAUTO_INCREMENT=1DEFAULTCHARSET=utf8COMMENT='订单表';
- ------------------------------
- --Recordsoforder_info
- ------------------------------
- INSERTINTO`order_info`
- VALUES(476,'lAXjcL','13408115089',433.00,'2',10,'2','OYyAdSdLxedceqovndCD','ihjAYsSjrgJMQVdFQnSy','8728720206','','2020-06-1802:21:38',NULL,NULL,NULL,NULL,NULL,9);
- INSERTINTO`order_info`
- VALUES(477,'QLiFDb','13415139984',772.00,'1',90,'2','OizYrQbKuWvrvdfpkeSZ','wiBhhqhMndCCgXwmWVQq','1679381473','','2020-06-1809:12:25',NULL,NULL,NULL,NULL,NULL,3);
- INSERTINTO`order_info`
- VALUES(478,'iwKjQD','13320383859',88.00,'1',107,'1','cbXLKtNHWOcWzJVBWdAs','njjsnknHxsxhuCCeNDDi','0937074290','','2020-06-1815:56:34',NULL,NULL,NULL,NULL,NULL,7);
- /*Tablestructurefortable`order_detail`*/
- CREATETABLE`order_detail`(
- `id`bigint(20)NOTNULLAUTO_INCREMENTCOMMENT'编号',
- `order_id`bigint(20)DEFAULTNULLCOMMENT'订单编号',
- `sku_id`bigint(20)DEFAULTNULLCOMMENT'sku_id',
- `sku_name`varchar(200)DEFAULTNULLCOMMENT'sku名称(冗余)',
- `img_url`varchar(200)DEFAULTNULLCOMMENT'图片名称(冗余)',
- `order_price`decimal(10,2)DEFAULTNULLCOMMENT'购买价格(下单时sku价格)',
- `sku_num`varchar(200)DEFAULTNULLCOMMENT'购买个数',
- `create_time`datetimeDEFAULTNULLCOMMENT'创建时间',
- PRIMARYKEY(`id`)
- )ENGINE=InnoDBAUTO_INCREMENT=1DEFAULTCHARSET=utf8COMMENT='订单明细表';
- ------------------------------
- --Recordsoforder_detail
- ------------------------------
- INSERTINTO`order_detail`
- VALUES(1329,476,8,'AppleiPhoneXSMax(A2104)256GB深空灰色移动联通电信4G手机双卡双待','http://XLMByOyZDTJQYxphQHNTgYAFzJJCKTmCbzvEJIpz',8900.00,'3','2020-06-1802:21:38');
- INSERTINTO`order_detail`
- VALUES(1330,477,9,'荣耀10GT游戏加速AIS手持夜景6GB+64GB幻影蓝全网通移动联通电信','http://ixOCtlYmlxEEgUfPLiLdjMftzrleOEIBKSjrhMne',2452.00,'4','2020-06-1809:12:25');
- INSERTINTO`order_detail`
- VALUES(1331,478,4,'小米Play流光渐变AI双摄4GB+64GB梦幻蓝全网通4G双卡双待小水滴全面屏拍照游戏智能手机','http://RqfEFnAOqnqRnNZLFRvBuwXxwNBtptYJCILDKQYv',1442.00,'1','2020-06-1815:56:34');
- INSERTINTO`order_detail`
- VALUES(1332,478,8,'AppleiPhoneXSMax(A2104)256GB深空灰色移动联通电信4G手机双卡双待','http://IwhuCDlsiLenfKjPzbJrIoxswdfofKhJLMzlJAKV',8900.00,'3','2020-06-1815:56:34');
- INSERTINTO`order_detail`
- VALUES(1333,478,8,'AppleiPhoneXSMax(A2104)256GB深空灰色移动联通电信4G手机双卡双待','http://bbfwTbAzTWapywODzOtDJMJUEqNTeRTUQuCDkqXP',8900.00,'1','2020-06-1815:56:34');
Flink SQL Cli创建CDC数据源
启动 Flink 集群,再启动 SQL CLI,执行下面命令:
- --创建订单信息表
- CREATETABLEorder_info(
- idBIGINT,
- user_idBIGINT,
- create_timeTIMESTAMP(0),
- operate_timeTIMESTAMP(0),
- province_idINT,
- order_statusSTRING,
- total_amountDECIMAL(10,5)
- )WITH(
- 'connector'='mysql-cdc',
- 'hostname'='kms-1',
- 'port'='3306',
- 'username'='root',
- 'password'='123qwe',
- 'database-name'='mydw',
- 'table-name'='order_info'
- );
在Flink SQL Cli中查询该表的数据:result-mode: tableau,+表示数据的insert

在SQL CLI中创建订单详情表:
- CREATETABLEorder_detail(
- idBIGINT,
- order_idBIGINT,
- sku_idBIGINT,
- sku_nameSTRING,
- sku_numBIGINT,
- order_priceDECIMAL(10,5),
- create_timeTIMESTAMP(0)
- )WITH(
- 'connector'='mysql-cdc',
- 'hostname'='kms-1',
- 'port'='3306',
- 'username'='root',
- 'password'='123qwe',
- 'database-name'='mydw',
- 'table-name'='order_detail'
- );
查询结果如下:

执行JOIN操作:
- SELECT
- od.id,
- oi.idorder_id,
- oi.user_id,
- oi.province_id,
- od.sku_id,
- od.sku_name,
- od.sku_num,
- od.order_price,
- oi.create_time,
- oi.operate_time
- FROM
- (
- SELECT*
- FROMorder_info
- WHERE
- order_status='2'--已支付
- )oi
- JOIN
- (
- SELECT*
- FROMorder_detail
- )od
- ONoi.id=od.order_id;
canal-json的操作实践
关于cannal的使用方式,可以参考我的另一篇文章:基于Canal与Flink实现数据实时增量同步(一)。我已经将下面的表通过canal同步到了kafka,具体格式为:
- {
- "data":[
- {
- "id":"1",
- "region_name":"华北"
- },
- {
- "id":"2",
- "region_name":"华东"
- },
- {
- "id":"3",
- "region_name":"东北"
- },
- {
- "id":"4",
- "region_name":"华中"
- },
- {
- "id":"5",
- "region_name":"华南"
- },
- {
- "id":"6",
- "region_name":"西南"
- },
- {
- "id":"7",
- "region_name":"西北"
- }
- ],
- "database":"mydw",
- "es":1597128441000,
- "id":102,
- "isDdl":false,
- "mysqlType":{
- "id":"varchar(20)",
- "region_name":"varchar(20)"
- },
- "old":null,
- "pkNames":null,
- "sql":"",
- "sqlType":{
- "id":12,
- "region_name":12
- },
- "table":"base_region",
- "ts":1597128441424,
- "type":"INSERT"
- }
在SQL CLI中创建该canal-json格式的表:
- CREATETABLEregion(
- idBIGINT,
- region_nameSTRING
- )WITH(
- 'connector'='kafka',
- 'topic'='mydw.base_region',
- 'properties.bootstrap.servers'='kms-3:9092',
- 'properties.group.id'='testGroup',
- 'format'='canal-json',
- 'scan.startup.mode'='earliest-offset'
- );
查询结果如下:

changelog-json的操作实践
创建MySQL数据源
参见上面的order_info
Flink SQL Cli创建changelog-json表
- CREATETABLEorder_gmv2kafka(
- day_strSTRING,
- gmvDECIMAL(10,5)
- )WITH(
- 'connector'='kafka',
- 'topic'='order_gmv_kafka',
- 'scan.startup.mode'='earliest-offset',
- 'properties.bootstrap.servers'='kms-3:9092',
- 'format'='changelog-json'
- );
- INSERTINTOorder_gmv2kafka
- SELECTDATE_FORMAT(create_time,'yyyy-MM-dd')asday_str,SUM(total_amount)asgmv
- FROMorder_info
- WHEREorder_status='2'--订单已支付
- GROUPBYDATE_FORMAT(create_time,'yyyy-MM-dd');
查询表看一下结果:

再查一下kafka的数据:
- {"data":{"day_str":"2020-06-18","gmv":433},"op":"+I"}
当将另外两个订单的状态order_status更新为2时,总金额=443+772+88=1293再观察数据:

再看kafka中的数据:

原文链接:https://mp.weixin.qq.com/s/uedL8HbtXgYi9xQk18QjDw








发表评论
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。