当前位置:首页 > 通信资讯 > 正文

实时数仓方案(实时数仓案例)

实时数仓方案(实时数仓案例)

简介

Flink CDC Connector 是ApacheFlink的一组数据源连接器,使用变化数据捕获change data capture (CDC)从不同的数据库中提取变更数据。Flink CDC连接器将Debezium集成为引擎来捕获数据变更。因此,它可以充分利用Debezium的功能。

特点

  • 支持读取数据库快照,并且能够持续读取数据库的变更日志,即使发生故障,也支持exactly-once 的处理语义
  • 对于DataStream API的CDC connector,用户无需部署Debezium和Kafka,即可在单个作业中使用多个数据库和表上的变更数据。
  • 对于Table/SQL API 的CDC connector,用户可以使用SQL DDL创建CDC数据源,来监视单个表上的数据变更。

使用场景

  • 数据库之间的增量数据同步
  • 审计日志
  • 数据库之上的实时物化视图
  • 基于CDC的维表join

Flink提供的 table format

Flink提供了一系列可以用于table connector的table format,具体如下:

Formats Supported Connectors
CSV Apache Kafka, Filesystem
JSON Apache Kafka, Filesystem, Elasticsearch
Apache Avro Apache Kafka, Filesystem
Debezium CDC Apache Kafka
Canal CDC Apache Kafka
Apache Parquet Filesystem
Apache ORC Filesystem

使用过程中的注意点

使用MySQL CDC的注意点

如果要使用MySQL CDC connector,对于程序而言,需要添加如下依赖:

  1. <dependency>
  2. <groupId>com.alibaba.ververica</groupId>
  3. <artifactId>flink-connector-mysql-cdc</artifactId>
  4. <version>1.0.0</version>
  5. </dependency>

如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-mysql-cdc-1.0.0.jar,将该jar包放在Flink安装目录的lib文件夹下即可。

使用canal-json的注意点

如果要使用Kafka的canal-json,对于程序而言,需要添加如下依赖:

  1. <!--universal-->
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-connector-kafka_2.11</artifactId>
  5. <version>1.11.0</version>
  6. </dependency>

如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-kafka_2.11-1.11.0.jar,将该jar包放在Flink安装目录的lib文件夹下即可。由于Flink1.11的安装包 的lib目录下并没有提供该jar包,所以必须要手动添加依赖包,否则会报如下错误:

  1. [ERROR]CouldnotexecuteSQLstatement.Reason:
  2. org.apache.flink.table.api.ValidationException:Couldnotfindanyfactoryforidentifier'kafka'thatimplements'org.apache.flink.table.factories.DynamicTableSourceFactory'intheclasspath.
  3. Availablefactoryidentifiersare:
  4. datagen
  5. mysql-cdc

使用changelog-json的注意点

如果要使用Kafka的changelog-json Format,对于程序而言,需要添加如下依赖:

  1. <dependency>
  2. <groupId>com.alibaba.ververica</groupId>
  3. <artifactId>flink-format-changelog-json</artifactId>
  4. <version>1.0.0</version>
  5. </dependency>

如果要使用Flink SQL Client,需要添加如下jar包:flink-format-changelog-json-1.0.0.jar,将该jar包放在Flink安装目录的lib文件夹下即可。

mysql-cdc的操作实践

创建MySQL数据源表

在创建MySQL CDC表之前,需要先创建MySQL的数据表,如下:

  1. --MySQL
  2. /*Tablestructurefortable`order_info`*/
  3. DROPTABLEIFEXISTS`order_info`;
  4. CREATETABLE`order_info`(
  5. `id`bigint(20)NOTNULLAUTO_INCREMENTCOMMENT'编号',
  6. `consignee`varchar(100)DEFAULTNULLCOMMENT'收货人',
  7. `consignee_tel`varchar(20)DEFAULTNULLCOMMENT'收件人电话',
  8. `total_amount`decimal(10,2)DEFAULTNULLCOMMENT'总金额',
  9. `order_status`varchar(20)DEFAULTNULLCOMMENT'订单状态,1表示下单,2表示支付',
  10. `user_id`bigint(20)DEFAULTNULLCOMMENT'用户id',
  11. `payment_way`varchar(20)DEFAULTNULLCOMMENT'付款方式',
  12. `delivery_address`varchar(1000)DEFAULTNULLCOMMENT'送货地址',
  13. `order_comment`varchar(200)DEFAULTNULLCOMMENT'订单备注',
  14. `out_trade_no`varchar(50)DEFAULTNULLCOMMENT'订单交易编号(第三方支付用)',
  15. `trade_body`varchar(200)DEFAULTNULLCOMMENT'订单描述(第三方支付用)',
  16. `create_time`datetimeDEFAULTNULLCOMMENT'创建时间',
  17. `operate_time`datetimeDEFAULTNULLCOMMENT'操作时间',
  18. `expire_time`datetimeDEFAULTNULLCOMMENT'失效时间',
  19. `tracking_no`varchar(100)DEFAULTNULLCOMMENT'物流单编号',
  20. `parent_order_id`bigint(20)DEFAULTNULLCOMMENT'父订单编号',
  21. `img_url`varchar(200)DEFAULTNULLCOMMENT'图片路径',
  22. `province_id`int(20)DEFAULTNULLCOMMENT'地区',
  23. PRIMARYKEY(`id`)
  24. )ENGINE=InnoDBAUTO_INCREMENT=1DEFAULTCHARSET=utf8COMMENT='订单表';
  25. ------------------------------
  26. --Recordsoforder_info
  27. ------------------------------
  28. INSERTINTO`order_info`
  29. VALUES(476,'lAXjcL','13408115089',433.00,'2',10,'2','OYyAdSdLxedceqovndCD','ihjAYsSjrgJMQVdFQnSy','8728720206','','2020-06-1802:21:38',NULL,NULL,NULL,NULL,NULL,9);
  30. INSERTINTO`order_info`
  31. VALUES(477,'QLiFDb','13415139984',772.00,'1',90,'2','OizYrQbKuWvrvdfpkeSZ','wiBhhqhMndCCgXwmWVQq','1679381473','','2020-06-1809:12:25',NULL,NULL,NULL,NULL,NULL,3);
  32. INSERTINTO`order_info`
  33. VALUES(478,'iwKjQD','13320383859',88.00,'1',107,'1','cbXLKtNHWOcWzJVBWdAs','njjsnknHxsxhuCCeNDDi','0937074290','','2020-06-1815:56:34',NULL,NULL,NULL,NULL,NULL,7);
  34. /*Tablestructurefortable`order_detail`*/
  35. CREATETABLE`order_detail`(
  36. `id`bigint(20)NOTNULLAUTO_INCREMENTCOMMENT'编号',
  37. `order_id`bigint(20)DEFAULTNULLCOMMENT'订单编号',
  38. `sku_id`bigint(20)DEFAULTNULLCOMMENT'sku_id',
  39. `sku_name`varchar(200)DEFAULTNULLCOMMENT'sku名称(冗余)',
  40. `img_url`varchar(200)DEFAULTNULLCOMMENT'图片名称(冗余)',
  41. `order_price`decimal(10,2)DEFAULTNULLCOMMENT'购买价格(下单时sku价格)',
  42. `sku_num`varchar(200)DEFAULTNULLCOMMENT'购买个数',
  43. `create_time`datetimeDEFAULTNULLCOMMENT'创建时间',
  44. PRIMARYKEY(`id`)
  45. )ENGINE=InnoDBAUTO_INCREMENT=1DEFAULTCHARSET=utf8COMMENT='订单明细表';
  46. ------------------------------
  47. --Recordsoforder_detail
  48. ------------------------------
  49. INSERTINTO`order_detail`
  50. VALUES(1329,476,8,'AppleiPhoneXSMax(A2104)256GB深空灰色移动联通电信4G手机双卡双待','http://XLMByOyZDTJQYxphQHNTgYAFzJJCKTmCbzvEJIpz',8900.00,'3','2020-06-1802:21:38');
  51. INSERTINTO`order_detail`
  52. VALUES(1330,477,9,'荣耀10GT游戏加速AIS手持夜景6GB+64GB幻影蓝全网通移动联通电信','http://ixOCtlYmlxEEgUfPLiLdjMftzrleOEIBKSjrhMne',2452.00,'4','2020-06-1809:12:25');
  53. INSERTINTO`order_detail`
  54. VALUES(1331,478,4,'小米Play流光渐变AI双摄4GB+64GB梦幻蓝全网通4G双卡双待小水滴全面屏拍照游戏智能手机','http://RqfEFnAOqnqRnNZLFRvBuwXxwNBtptYJCILDKQYv',1442.00,'1','2020-06-1815:56:34');
  55. INSERTINTO`order_detail`
  56. VALUES(1332,478,8,'AppleiPhoneXSMax(A2104)256GB深空灰色移动联通电信4G手机双卡双待','http://IwhuCDlsiLenfKjPzbJrIoxswdfofKhJLMzlJAKV',8900.00,'3','2020-06-1815:56:34');
  57. INSERTINTO`order_detail`
  58. VALUES(1333,478,8,'AppleiPhoneXSMax(A2104)256GB深空灰色移动联通电信4G手机双卡双待','http://bbfwTbAzTWapywODzOtDJMJUEqNTeRTUQuCDkqXP',8900.00,'1','2020-06-1815:56:34');

Flink SQL Cli创建CDC数据源

启动 Flink 集群,再启动 SQL CLI,执行下面命令:

  1. --创建订单信息表
  2. CREATETABLEorder_info(
  3. idBIGINT,
  4. user_idBIGINT,
  5. create_timeTIMESTAMP(0),
  6. operate_timeTIMESTAMP(0),
  7. province_idINT,
  8. order_statusSTRING,
  9. total_amountDECIMAL(10,5)
  10. )WITH(
  11. 'connector'='mysql-cdc',
  12. 'hostname'='kms-1',
  13. 'port'='3306',
  14. 'username'='root',
  15. 'password'='123qwe',
  16. 'database-name'='mydw',
  17. 'table-name'='order_info'
  18. );

在Flink SQL Cli中查询该表的数据:result-mode: tableau,+表示数据的insert

实时数仓方案(实时数仓案例)

在SQL CLI中创建订单详情表:

  1. CREATETABLEorder_detail(
  2. idBIGINT,
  3. order_idBIGINT,
  4. sku_idBIGINT,
  5. sku_nameSTRING,
  6. sku_numBIGINT,
  7. order_priceDECIMAL(10,5),
  8. create_timeTIMESTAMP(0)
  9. )WITH(
  10. 'connector'='mysql-cdc',
  11. 'hostname'='kms-1',
  12. 'port'='3306',
  13. 'username'='root',
  14. 'password'='123qwe',
  15. 'database-name'='mydw',
  16. 'table-name'='order_detail'
  17. );

查询结果如下:

实时数仓方案(实时数仓案例)

执行JOIN操作:

  1. SELECT
  2. od.id,
  3. oi.idorder_id,
  4. oi.user_id,
  5. oi.province_id,
  6. od.sku_id,
  7. od.sku_name,
  8. od.sku_num,
  9. od.order_price,
  10. oi.create_time,
  11. oi.operate_time
  12. FROM
  13. (
  14. SELECT*
  15. FROMorder_info
  16. WHERE
  17. order_status='2'--已支付
  18. )oi
  19. JOIN
  20. (
  21. SELECT*
  22. FROMorder_detail
  23. )od
  24. ONoi.id=od.order_id;

canal-json的操作实践

关于cannal的使用方式,可以参考我的另一篇文章:基于Canal与Flink实现数据实时增量同步(一)。我已经将下面的表通过canal同步到了kafka,具体格式为:

  1. {
  2. "data":[
  3. {
  4. "id":"1",
  5. "region_name":"华北"
  6. },
  7. {
  8. "id":"2",
  9. "region_name":"华东"
  10. },
  11. {
  12. "id":"3",
  13. "region_name":"东北"
  14. },
  15. {
  16. "id":"4",
  17. "region_name":"华中"
  18. },
  19. {
  20. "id":"5",
  21. "region_name":"华南"
  22. },
  23. {
  24. "id":"6",
  25. "region_name":"西南"
  26. },
  27. {
  28. "id":"7",
  29. "region_name":"西北"
  30. }
  31. ],
  32. "database":"mydw",
  33. "es":1597128441000,
  34. "id":102,
  35. "isDdl":false,
  36. "mysqlType":{
  37. "id":"varchar(20)",
  38. "region_name":"varchar(20)"
  39. },
  40. "old":null,
  41. "pkNames":null,
  42. "sql":"",
  43. "sqlType":{
  44. "id":12,
  45. "region_name":12
  46. },
  47. "table":"base_region",
  48. "ts":1597128441424,
  49. "type":"INSERT"
  50. }

在SQL CLI中创建该canal-json格式的表:

  1. CREATETABLEregion(
  2. idBIGINT,
  3. region_nameSTRING
  4. )WITH(
  5. 'connector'='kafka',
  6. 'topic'='mydw.base_region',
  7. 'properties.bootstrap.servers'='kms-3:9092',
  8. 'properties.group.id'='testGroup',
  9. 'format'='canal-json',
  10. 'scan.startup.mode'='earliest-offset'
  11. );

查询结果如下:

实时数仓方案(实时数仓案例)

changelog-json的操作实践

创建MySQL数据源

参见上面的order_info

Flink SQL Cli创建changelog-json表

  1. CREATETABLEorder_gmv2kafka(
  2. day_strSTRING,
  3. gmvDECIMAL(10,5)
  4. )WITH(
  5. 'connector'='kafka',
  6. 'topic'='order_gmv_kafka',
  7. 'scan.startup.mode'='earliest-offset',
  8. 'properties.bootstrap.servers'='kms-3:9092',
  9. 'format'='changelog-json'
  10. );
  11. INSERTINTOorder_gmv2kafka
  12. SELECTDATE_FORMAT(create_time,'yyyy-MM-dd')asday_str,SUM(total_amount)asgmv
  13. FROMorder_info
  14. WHEREorder_status='2'--订单已支付
  15. GROUPBYDATE_FORMAT(create_time,'yyyy-MM-dd');

查询表看一下结果:

实时数仓方案(实时数仓案例)

再查一下kafka的数据:

  1. {"data":{"day_str":"2020-06-18","gmv":433},"op":"+I"}

当将另外两个订单的状态order_status更新为2时,总金额=443+772+88=1293再观察数据:

实时数仓方案(实时数仓案例)

再看kafka中的数据:

实时数仓方案(实时数仓案例)

原文链接:https://mp.weixin.qq.com/s/uedL8HbtXgYi9xQk18QjDw

如果您对该产品感兴趣,请填写办理(客服微信:xiaoxiongyidong)

为您推荐:

发表评论

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。