Flink RoaringBitmap去重

1、RoaringBitmap的依赖

  <!-- 去重大哥-->
<dependency><groupId>org.roaringbitmap</groupId><artifactId>RoaringBitmap</artifactId><version>0.9.21</version>
</dependency>

2、Demo去重

package com.gwm.driver;import com.alibaba.fastjson.JSON;
import com.alibaba.flink.connectors.datahub.datastream.source.DatahubSourceFunction;
import com.aliyun.datahub.client.model.RecordEntry;
import com.gwm.pojo.EventSuccessInfo;
import com.gwm.utils.TimeToStampUtil;
import com.gwm.utils.getString;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
import org.roaringbitmap.longlong.Roaring64Bitmap;
import scala.Tuple4;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.UUID;/*** @author yangyingchun* @version 1.0* @date 2022/11/14 16:26*/
public class EventOrderSuccessRoaringBitmap {private static String endPoint = "endPoint ";//private static String endPoint ="public endpoint";//公网访问(填写内网Endpoint,就不用填写公网Endpoint)。private static String projectName = "projectName ";private static String topicSourceName =  "topicSourceName ";
//    private static String topicSourceName =  "topicSourceName ";private static String accessId = "accessId ";private static String accessKey = "accessKey ";//设置消费的启动位点对应的时间。TimeToStampUtil.timeToStamp("2021-12-21") 此时间至少为当前时间
//    private static Long datahubStartInMs = TimeToStampUtil.timeToStamp("2023-02-23");private static Long datahubStartInMs = System.currentTimeMillis();private static Long datahubEndInMs=Long.MAX_VALUE;private static SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");private static SimpleDateFormat sd1 = new SimpleDateFormat("yyyy-MM-dd");private static Date startDate;static {try {startDate = sd1.parse(sd.format(new Date()));} catch (ParseException e) {e.printStackTrace();}};public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.enableCheckpointing(3600000L);
//        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));env.setParallelism(1);DataStreamSource<List<RecordEntry>> aedata =  env.addSource(new DatahubSourceFunction(endPoint,projectName,topicSourceName,accessId,accessKey,datahubStartInMs,datahubEndInMs,20L,1000L,1000));DataStream<Tuple4<String, EventSuccessInfo, String, Long>> aecollectordataDataStream = aedata.flatMap(new FlatMapFunction<List<RecordEntry>, Tuple4<String, EventSuccessInfo, String, Long>>() {@Overridepublic void flatMap(List<RecordEntry> value, Collector<Tuple4<String, EventSuccessInfo, String, Long>> out) throws Exception {for (RecordEntry recordEntry : value) {String phone = getString.getString(recordEntry, "customer_phone");Long order_sn = Long.parseLong(getString.getString(recordEntry, "order_no"));String brand = getString.getString(recordEntry, "brand");String car_model = getString.getString(recordEntry, "car_model");String action_time = "null".equals(getString.getString(recordEntry, "paid_at"))||"".equals(getString.getString(recordEntry, "paid_at"))?null:sd.format(new Date(Long.parseLong(getString.getString(recordEntry, "paid_at"))/1000));Double paid_amount = "null".equals(getString.getString(recordEntry, "paid_amount"))?null:Double.parseDouble(getString.getString(recordEntry, "paid_amount"));String name = getString.getString(recordEntry, "customer_name");String operation_flag = getString.getString(recordEntry, "new_dts_sync_dts_after_flag");String order_time = "null".equals(getString.getString(recordEntry, "order_time"))||"".equals(getString.getString(recordEntry, "order_time"))?null:sd.format(new Date(Long.parseLong(getString.getString(recordEntry, "order_time"))/1000));String order_state = getString.getString(recordEntry, "order_state"); //'订购成功'Date add_time ="null".equals(getString.getString(recordEntry, "order_time"))||"".equals(getString.getString(recordEntry, "order_time"))?null:new Date(Long.parseLong(getString.getString(recordEntry, "order_time")) / 1000);
//                    startDate = sd1.parse(sd.format(new Date()));System.out.println(order_state+"====startDate:"+startDate+"====paid_at:"+order_time+"=====phone+order_sn:"+phone+"--"+order_sn);//这里有三个问题,// 1、技术+业务:因为获取的是数据库操作日志,所以数据是重复的,(已经做了重复校验,确保不会重复发且无时效性)// 2、技术:如果操作了历史数据,且用户的订单状态恰好还是订购成功时,也会触达,是不是要加限制,加的话加什么合适,//    新增且当天(很多数据是获取不到时间的)?还是所有时间都推,再ma测加一个时间的控制条件//    结论:空的也要,// 3、业务:需要明确订购成功的规则,否则极易造成异常, order_state=12当前是订购成功 能复用吗if (
//                            "12".equals(order_state)&&"Y".equals(operation_flag)
//                             &&!StringUtils.isNullOrWhitespaceOnly(order_time)
//                             &&add_time.after(startDate)){EventSuccessInfo eventSuccessInfo = new EventSuccessInfo(phone, order_sn, brand, car_model, action_time, paid_amount, name, operation_flag,order_time,order_state);//                    System.out.println(eventSuccessInfo);Tuple4<String, EventSuccessInfo, String, Long> tuple4= new Tuple4<String, EventSuccessInfo, String, Long>("test_event_order_success",eventSuccessInfo,UUID.randomUUID().toString().replace("-",""),System.currentTimeMillis());out.collect(tuple4);}}}});KeyedStream<Tuple4<String, EventSuccessInfo, String, Long>, String> tuple4StringKeyedStream= aecollectordataDataStream.keyBy(x -> x._2().getPhone());//        StateTtlConfig ttlConfig = StateTtlConfig
//                .newBuilder(Time.days(2))
//                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
//                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
//                .build();//create StateDescriptor//这里进行状态注册通过bitmap高效存储实现去重,当然bitmap去重只适合bigint场景ValueStateDescriptor<Roaring64Bitmap> bitmapDescriptor = new ValueStateDescriptor("Roaring64Bitmap",TypeInformation.of(new TypeHint<Roaring64Bitmap>() {}));//手机号去重逻辑 通过Roaring64BitmapSingleOutputStreamOperator<Tuple4<String, EventSuccessInfo, String, Long>> map = tuple4StringKeyedStream.filter(new RichFilterFunction<Tuple4<String, EventSuccessInfo, String, Long>>() {//1.定义状态 进行手机号去重private transient ValueState<Roaring64Bitmap> bitmapState;@Overridepublic void open(Configuration parameters) throws Exception {// 设置状态生命周期
//                StateTtlConfig stateTtlConfig = new StateTtlConfig
//                        .Builder(Time.days(1)) // 周期为1天
//                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 创建或者更新状态时重新刷新生命周期
//                        .build();bitmapState = getRuntimeContext().getState(bitmapDescriptor);;}@Overridepublic boolean filter(Tuple4<String, EventSuccessInfo, String, Long> value) throws Exception {//由于本程序只筛选订购成功的,所以每个手机号+每个订单唯一确认一条数据(订单状态已经在上游过滤过了)Roaring64Bitmap bitmap = bitmapState.value();if (bitmap == null) {bitmap = new Roaring64Bitmap();}if (!bitmap.contains(value._2().getOrder_sn())) {bitmap.addLong(value._2().getOrder_sn());bitmapState.update(bitmap);return true;}return false;}});//因为是binlog,但需求只要数据时间是当天的 :通过flink定时器 定义每天零晨更新比较时间SingleOutputStreamOperator<Tuple4<String, EventSuccessInfo, String, Long>> process = map.keyBy(x -> x._2().getPhone()).process(new KeyedProcessFunction<String, Tuple4<String, EventSuccessInfo, String, Long>, Tuple4<String, EventSuccessInfo, String, Long>>() {//1.定义状态 进行手机号去重private ValueState<String> timeSate;@Overridepublic void processElement(Tuple4<String, EventSuccessInfo, String, Long> value, Context ctx, Collector<Tuple4<String, EventSuccessInfo, String, Long>> out) throws Exception {//获取格林威治标准时间的第二天00:00:00即获取北京时间的第二天08:00:00
//                long ts = (ctx.timerService().currentProcessingTime() / (1000 * 60 * 60 * 24) + 1) * (1000 * 60 * 60 * 24);//获取北京时间的第二天00:00:00long ts = ( ctx.timerService().currentProcessingTime()/(1000*60*60*24) + 1) * (1000*60*60*24)- 8 * 60 * 60 * 1000;//                long ts = 1677054000000L;//如果注册相同数据的TimeTimer,后面的会将前面的覆盖,即相同的timeTimer只会触发一次ctx.timerService().registerProcessingTimeTimer(ts);out.collect(value);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple4<String, EventSuccessInfo, String, Long>> out) throws Exception {//定时器质性,每天凌晨更新开始时间
//                System.out.println(timestamp);System.out.println("定时器执行了:" + timestamp);//状态初始化timeSate.clear();startDate = sd1.parse(sd.format(new Date()));System.out.println(startDate);
//                startDate = sd1.parse("2023-02-01");}});SingleOutputStreamOperator<Tuple4<String, String, String, Long>> jsonString = process.map(new MapFunction<Tuple4<String, EventSuccessInfo, String, Long>, Tuple4<String, String, String, Long>>() {@Overridepublic Tuple4<String, String, String, Long> map(Tuple4<String, EventSuccessInfo, String, Long> value) throws Exception {return new Tuple4<String, String, String, Long>(value._1(),JSON.toJSONString(value._2()),value._3(),value._4());}});jsonString.print();
//        jsonString.addSink(new EventOmsSuccessSink());env.execute("EventOrderSuccess===>");}
}

3、注意:Roaring64Bitmap 去重只适合去重整形情况

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/87756.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringBoot-Druid

目录 1.什么是Druid 2.主要优点和原因 3.误区 4.Part代码 0.pom 1.Spring.datasource.type: com.alibaba.druid.pool.DruidDataSource 2.Druid用Jasypt加密任意内容 EnableEncryptableProperties开启加密注解 3.Druid监控平台 1.什么是Druid Druid 是一个开源的数据库…

C++之互斥锁、读写锁、互斥量、 信号量、原子锁机制总结(二百二十五)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 人生格言&#xff1a; 人生…

MAC word 如何并列排列两张图片

系统&#xff1a;MAC os 参考博客 https://baijiahao.baidu.com/s?id1700824516945958911&wfrspider&forpc 步骤1 新建一个word文档和表格 修改表格属性 去掉自动重调尺寸以适应内容 插入图片 在表格的位置插入对应的图片如下 去除边框 最终结果如下

十大排序算法的实现(C/C++)

以下是十大经典排序算法的简单 C 实现&#xff1a; 冒泡排序&#xff08;Bubble Sort&#xff09;&#xff1a; 思想&#xff1a;重复地遍历要排序的列表&#xff0c;比较相邻的两个元素&#xff0c;如果它们的顺序错误就交换它们。时间复杂度&#xff1a;最坏情况和平均情况…

CentOS安装kafka单机部署

一&#xff1a;保证机器上已经运行的有Java环境 服务器&#xff1a;centos7 kafka版本&#xff1a;3.5.1 二&#xff1a;下载kafka压缩包 下载地址 1.解压kafka压缩包 tar -zxvf kafka_2.13-3.5.1.tgz 2.我得是上传到了 /home目录下&#xff0c;配置文件server.propertie…

UE5 ChaosVehicles载具研究

一、基本组成 载具Actor类名称&#xff1a;WheeledVehiclePawn Actor最原始的结构 官方增加了两个摇臂相机&#xff0c;可以像驾驶游戏那样切换多机位、旋转观察 选择骨骼网格体、动画蓝图类、开启物理模拟 二、SportsCar_Pawn 角阻尼&#xff1a;物体旋转的阻力。数值越大…

云原生技术盛会KubeCon即将召开!亚马逊云科技作为钻石赞助商参会

KubeCon2023将于9月26-28日在上海跨国采购会展中心隆重召开。作为云原生领域最负盛名的技术大会之一&#xff0c;KubeConCloudNativeCon是连接全球开发者与云原生社区的最佳平台&#xff0c;此次还新增Open Source Summit环节&#xff0c;吸引了全球顶尖的云原生专家们汇聚其中…

ArrayBlockingQueue

ArrayBlockingQueue 是 一个并发队列实现,它基于数组的数据结构,提供了线程安全的队列操作。这个队列的容量是固定的,一旦达到容量上限,后续的插入操作将会被阻塞,直到有其他线程从队列中移除元素为止。以下是对 ArrayBlockingQueue 的详细解释: 容量限制:ArrayBlocking…

腾讯mini项目-【指标监控服务重构-会议记录】2023-08-04

组长会议记录 A组 调研 traefik 命名&#xff0c;大驼峰 Grafana metric 的配置还存在有些问题&#xff0c;待解决 完成了 trace 的上报 待办&#xff1a; entry.go : fiber log 重复 【完成】event.go : traceparent 变量&#xff0c;线程隔离&#xff0c;多线程并发问题…

数据链路层协议

文章目录 数据链路层协议0. 数据链路层解决的问题1. 以太网协议(1) 认识以太网(2) 以太网帧格式<1> 两个核心问题 (3) 认识MAC地址(4) 局域网通信原理(5) MTU<1> 认识MTU<2> MTU对IP协议的影响<3> MTU对UDP协议的影响<4> MTU对TCP协议的影响<…

Python开发与应用实验2 | Python基础语法应用

*本文是博主对学校专业课Python各种实验的再整理与详解&#xff0c;除了代码部分和解析部分&#xff0c;一些题目还增加了拓展部分&#xff08;⭐&#xff09;。拓展部分不是实验报告中原有的内容&#xff0c;而是博主本人自己的补充&#xff0c;以方便大家额外学习、参考。 &a…

Oracle判断函数

CASE WHEN 语法&#xff1a; CASE WHEN 条件1 THEN 返回值1WHEN 条件2 THEN 返回值2ELSE 默认值END -- 对 EMP 的 DEPTNO 字段进行判断,显示出对应的部门名称 SELECT E.*, CASE WHEN E.DEPTNO10 THEN ACCOUNTINGWHEN E.DEPTNO20 THEN RESEARCHWHEN E.DEPTNO30 THEN SALES…

安装Python3.x--Windows

1 下载安装包 确定安装是干什么&#xff0c;要下哪个版本&#xff08;如果是配置项目环境&#xff0c;最好按项目需求的版本来装&#xff09; 1.1 官网链接 https://www.python.org 最新版本 指定版本 2 安装说明 点击下载exe&#xff0c;运行自定义安装路径&#xff0c;下…

什么是泛型?

泛型&#xff08;Generics&#xff09;是一种编程语言特性&#xff0c;它允许在编写代码时使用未指定具体类型的变量、参数或返回值。泛型的作用是增加代码的灵活性和重用性&#xff0c;并提高代码的类型安全性。 泛型可以应用于各种数据结构和算法中&#xff0c;例如集合类&a…

TS编译选项——不允许使用隐式any类型、不明确类型的this、严格检查空值、编译后文件自动设置严格模式

一、不允许使用隐式any类型 在tsconfig.js文件中配置noImplicitAny属性 {"compilerOptions": {// 不允许使用隐式any类型"noImplicitAny": true} } 开启后即可禁止使用隐式的any类型 注意&#xff1a;显式的any类型并不会被禁止 二、不允许使用不明确类…

腾讯mini项目-【指标监控服务重构】2023-08-26

今日已办 Venus 的 Trace 无感化 定义 handler 函数 fiber.Handler 的主要处理逻辑返回处理中出现的 error返回处理中响应 json 的函数 // handler // Description: // Author xzx 2023-08-26 18:00:03 // Param c // Return error // Return func() error : function for …

和 Node.js 说拜拜,Deno零配置解决方案

不知道大家注意没有&#xff0c;在我们启动各种类型的 Node repo 时&#xff0c;root 目录很快就会被配置文件塞满。例如&#xff0c;在最新版本的 Next.js 中&#xff0c;我们就有 next.config.js、eslintrc.json、tsconfig.json 和 package.json。而在样式那边&#xff0c;还…

Spring面试题9:Spring的BeanFactory和FactoryBean的区别和联系

该文章专注于面试,面试只要回答关键点即可,不需要对框架有非常深入的回答,如果你想应付面试,是足够了,抓住关键点 面试官:说一说Spring的BeanFactory和FactoryBean的区别和联系 区别:BeanFactory是一个工厂接口,主要负责管理和创建Bean实例。它是Spring提供的最底层的…

优维产品最佳实践:主机合规性检查

我们常常会感到这样的困惑&#xff0c;为什么这么多的无效主机记录&#xff0c;为什么这些主机很多信息空白&#xff0c;当许多人一起维护主机信息时&#xff0c;常常会出现信息错漏的情况。主机是运维最重要最基本的CMDB信息&#xff0c;而「合规性检查」为我们提供了更高效便…

Spring Cloud Alibaba Gateway 简单使用

文章目录 Spring Cloud Alibaba Gateway1.Gateway简介2. 流量网关和服务网关的区别3. Spring Cloud Gateway 网关的搭建3.1 Spring Cloud Gateway 配置项的说明3.2 依赖导入3.3 配置文件 Spring Cloud Alibaba Gateway 1.Gateway简介 Spring Cloud Gateway是一个基于Spring F…