当前位置:首页 > 通信资讯 > 正文

flink求uv(每日实时uv计算flink)

flink求uv(每日实时uv计算flink)

PV(访问量):即Page View, 即页面浏览量或点击量,用户每次刷新即被计算一次。

UV(独立访客):即Unique Visitor,访问您网站的一台电脑客户端为一个访客。00:00-24:00内相同的客户端只被计算一次。

计算网站App的实时pv和uv,是很常见的统计需求,这里提供通用的计算方法,不同的业务需求只需要小改即可拿来即用。

需求

利用Flink实时统计,从0点到当前的pv、uv。

一、需求分析

从Kafka发送过来的数据含有:时间戳、时间、维度、用户id,需要从不同维度统计从0点到当前时间的pv和uv,第二天0点重新开始计数第二天的。

二、技术方案

Kafka数据可能会有延迟乱序,这里引入watermark;

通过keyBy分流进不同的滚动window,每个窗口内计算pv、uv;

由于需要保存一天的状态,process里面使用ValueState保存pv、uv;

使用BitMap类型ValueState,占内存很小,引入支持bitmap的依赖;

保存状态需要设置ttl过期时间,第二天把第一天的过期,避免内存占用过大。

三、数据准备

这里假设是用户订单数据,数据格式如下:

  1. {"time":"2021-10-3122:00:01","timestamp":"1635228001","product":"苹果手机","uid":255420}
  2. {"time":"2021-10-3122:00:02","timestamp":"1635228001","product":"MacBookPro","uid":255421}

四、代码实现

整个工程代码截图如下(抹去了一些不方便公开的信息):

flink求uv(每日实时uv计算flink)

pvuv-project

1. 环境

kafka:1.0.0;

Flink:1.11.0;

2. 发送测试数据

首先发送数据到kafka测试集群,maven依赖:

  1. org.apache.kafka
  2. kafka-clients
  3. 2.4.1

2.4.1

发送代码:

  1. importcom.alibaba.fastjson.JSON;
  2. importcom.alibaba.fastjson.JSONObject;
  3. importjodd.util.ThreadUtil;
  4. importorg.apache.commons.lang3.StringUtils;
  5. importorg.junit.Test;
  6. importjava.io.*;
  7. publicclassSendDataToKafka{
  8. @Test
  9. publicvoidsendData()throwsIOException{
  10. Stringinpath="E:\\我的文件\\click.txt";
  11. Stringtopic="click_test";
  12. intcnt=0;
  13. Stringline;
  14. InputStreaminputStream=newFileInputStream(inpath);
  15. Readerreader=newInputStreamReader(inputStream);
  16. LineNumberReaderlnr=newLineNumberReader(reader);
  17. while((line=lnr.readLine())!=null){
  18. //这里的KafkaUtil是个生产者、消费者工具类,可以自行实现
  19. KafkaUtil.sendDataToKafka(topic,String.valueOf(cnt),line);
  20. cnt=cnt+1;
  21. ThreadUtil.sleep(100);
  22. }
  23. }
  24. }

3. 主要程序

先定义个pojo:

  1. @NoArgsConstructor
  2. @AllArgsConstructor
  3. @Data
  4. @ToString
  5. publicclassUserClickModel{
  6. privateStringdate;
  7. privateStringproduct;
  8. privateintuid;
  9. privateintpv;
  10. privateintuv;
  11. }

接着就是使用Flink消费kafka,指定Watermark,通过KeyBy分流,进入滚动窗口函数通过状态保存pv和uv。

  1. publicclassUserClickMain{
  2. privatestaticfinalMapconfig=Configuration.initConfig("commons.xml");
  3. publicstaticvoidmain(String[]args)throwsException{
  4. //初始化环境,配置相关属性
  5. StreamExecutionEnvironmentsenv=StreamExecutionEnvironment.getExecutionEnvironment();
  6. senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  7. senv.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE);
  8. senv.setStateBackend(newFsStateBackend("hdfs://bigdata/flink/checkpoints/userClick"));
  9. //读取kafka
  10. PropertieskafkaProps=newProperties();
  11. kafkaProps.setProperty("bootstrap.servers",config.get("kafka-ipport"));
  12. kafkaProps.setProperty("group.id",config.get("kafka-groupid"));
  13. //kafkaProps.setProperty("auto.offset.reset","earliest");
  14. //watrmark允许数据延迟时间
  15. longmaxOutOfOrderness=5*1000L;
  16. SingleOutputStreamOperatordataStream=senv.addSource(
  17. newFlinkKafkaConsumer<>(
  18. config.get("kafka-topic"),
  19. newSimpleStringSchema(),
  20. kafkaProps
  21. ))
  22. //设置watermark
  23. .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(maxOutOfOrderness))
  24. .withTimestampAssigner((element,recordTimestamp)->{
  25. //时间戳须为毫秒
  26. returnLong.valueOf(JSON.parseObject(element).getString("timestamp"))*1000;
  27. })).map(newFCClickMapFunction()).returns(TypeInformation.of(newTypeHint(){
  28. }));
  29. //按照(date,product)分组
  30. dataStream.keyBy(newKeySelector
如果您对该产品感兴趣,请填写办理(客服微信:xiaoxiongyidong)

为您推荐:

发表评论

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。