大数据-玩转数据-Flink 海量数据实时去重

一、海量数据实时去重说明

借助redis的Set,需要频繁连接Redis,如果数据量过大, 对redis的内存也是一种压力;使用Flink的MapState,如果数据量过大, 状态后端最好选择 RocksDBStateBackend; 使用布隆过滤器,布隆过滤器可以大大减少存储的数据的数据量。

二、海里书实时去重为什么需要布隆过滤器

如果想判断一个元素是不是在一个集合里,一般想到的是将集合中所有元素保存起来,然后通过比较确定。链表、树、散列表(又叫哈希表,Hash table)等等数据结构都是这种思路。
但是随着集合中元素的增加,我们需要的存储空间越来越大。同时检索速度也越来越慢,上述三种结构的检索时间复杂度分别为。
布隆过滤器即可以解决存储空间的问题, 又可以解决时间复杂度的问题.
布隆过滤器的原理是,当一个元素被加入集合时,通过K个散列函数将这个元素映射成一个位数组中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检元素一定不在;如果都是1,则被检元素很可能在。这就是布隆过滤器的基本思想。

三、布隆过滤基本概念

布隆过滤器(Bloom Filter,下文简称BF)由Burton Howard Bloom在1970年提出,是一种空间效率高的概率型数据结构。它专门用来检测集合中是否存在特定的元素。
它实际上是一个很长的二进制向量和一系列随机映射函数。

实现原理
布隆过滤器的原理是,当一个元素被加入集合时,通过K个散列函数将这个元素映射成一个位数组中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检元素一定不在;如果都是1,则被检元素很可能在。这就是布隆过滤器的基本思想。
BF是由一个长度为m比特的位数组(bit array)与k个哈希函数(hash function)组成的数据结构。位数组均初始化为0,所有哈希函数都可以分别把输入数据尽量均匀地散列。
当要插入一个元素时,将其数据分别输入k个哈希函数,产生k个哈希值。以哈希值作为位数组中的下标,将所有k个对应的比特置为1。
当要查询(即判断是否存在)一个元素时,同样将其数据输入哈希函数,然后检查对应的k个比特。如果有任意一个比特为0,表明该元素一定不在集合中。如果所有比特均为1,表明该集合有(较大的)可能性在集合中。为什么不是一定在集合中呢?因为一个比特被置为1有可能会受到其他元素的影响(hash碰撞),这就是所谓“假阳性”(false positive)。相对地,“假阴性”(false negative)在BF中是绝不会出现的。
下图示出一个m=18, k=3的BF示例。集合中的x、y、z三个元素通过3个不同的哈希函数散列到位数组中。当查询元素w时,因为有一个比特为0,因此w不在该集合中。
在这里插入图片描述

优点
1.不需要存储数据本身,只用比特表示,因此空间占用相对于传统方式有巨大的优势,并且能够保密数据;
2.时间效率也较高,插入和查询的时间复杂度均为, 所以他的时间复杂度实际是
3.哈希函数之间相互独立,可以在硬件指令层面并行计算。
缺点
1.存在假阳性的概率,不适用于任何要求100%准确率的情境;
2.只能插入和查询元素,不能删除元素,这与产生假阳性的原因是相同的。我们可以简单地想到通过计数(即将一个比特扩展为计数值)来记录元素数,但仍然无法保证删除的元素一定在集合中。
使用场景
所以,BF在对查准度要求没有那么苛刻,而对时间、空间效率要求较高的场合非常合适.
另外,由于它不存在假阴性问题,所以用作“不存在”逻辑的处理时有奇效,比如可以用来作为缓存系统(如Redis)的缓冲,防止缓存穿透。
假阳性概率的计算
假阳性的概率其实就是一个不在的元素,被k个函数函数散列到的k个位置全部都是1的概率。可以按照如下的步骤进行计算: p = f(m,n,k)
其中各个字母的含义:
1.n :放入BF中的元素的总个数;
2.m:BF的总长度,也就是bit数组的个数
3.k:哈希函数的个数;
4.p:表示BF将一个不在其中的元素错判为在其中的概率,也就是false positive的概率;
A.BF中的任何一个bit在第一个元素的第一个hash函数执行完之后为 0的概率是:

B.BF中的任何一个bit在第一个元素的k个hash函数执行完之后为 0的概率是:

C.BF中的任何一个bit在所有的n元素都添加完之后为 0的概率是:

D.BF中的任何一个bit在所有的n元素都添加完之后为 1的概率是:

E.一个不存在的元素被k个hash函数映射后k个bit都是1的概率是:

结论:在哈数函数个数k一定的情况下
1.比特数组m长度越大, p越小, 表示假阳性率越低
2.已插入的元素个数n越大, p越大, 表示假阳性率越大
经过各种数学推导:
对于给定的m和n,使得假阳性率(误判率)最小的k通过如下公式定义:

四、使用布隆过滤器实现去重

Flink已经内置了布隆过滤器的实现(使用的是google的Guava)

package com.lyh.flink12;import com.atguigu.flink.java.chapter_6.UserBehavior;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.hash.BloomFilter;
import org.apache.flink.shaded.guava18.com.google.common.hash.Funnels;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;public class Flink02_UV_BoomFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建WatermarkStrategyWatermarkStrategy<UserBehavior> wms = WatermarkStrategy.<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() {@Overridepublic long extractTimestamp(UserBehavior element, long recordTimestamp) {return element.getTimestamp() * 1000L;}});env.readTextFile("input/UserBehavior.csv").map(line -> { // 对数据切割, 然后封装到POJO中String[] split = line.split(",");return new UserBehavior(Long.valueOf(split[0]), Long.valueOf(split[1]), Integer.valueOf(split[2]), split[3], Long.valueOf(split[4]));}).filter(behavior -> "pv".equals(behavior.getBehavior())) //过滤出pv行为.assignTimestampsAndWatermarks(wms).keyBy(UserBehavior::getBehavior).window(TumblingEventTimeWindows.of(Time.minutes(60))).process(new ProcessWindowFunction<UserBehavior, String, String, TimeWindow>() {private ValueState<Long> countState;private ValueState<BloomFilter<Long>> bfState;@Overridepublic void open(Configuration parameters) throws Exception {countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("countState", Long.class));bfState = getRuntimeContext().getState(new ValueStateDescriptor<BloomFilter<Long>>("bfState", TypeInformation.of(new TypeHint<BloomFilter<Long>>() {})));}@Overridepublic void process(String key,Context context,Iterable<UserBehavior> elements, Collector<String> out) throws Exception {countState.update(0L);// 在状态中初始化一个布隆过滤器// 参数1: 漏斗, 存储的类型// 参数2: 期望插入的元素总个数// 参数3: 期望的误判率(假阳性率)BloomFilter<Long> bf = BloomFilter.create(Funnels.longFunnel(), 1000000, 0.001);bfState.update(bf);for (UserBehavior behavior : elements) {// 查布隆if (!bfState.value().mightContain(behavior.getUserId())) {// 不存在 计数+1countState.update(countState.value() + 1L);// 记录这个用户di, 表示来过bfState.value().put(behavior.getUserId());}}out.collect("窗口: " + context.window() + " 的uv是: " + countState.value());}}).print();env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/97534.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mysql八股

1、请你说说mysql索引&#xff0c;以及它们的好处和坏处 检索效率、存储资源、索引 索引就像指向表行的指针&#xff0c;是一个允许查询操作快速确定哪些行符合WHERE子句中的条件&#xff0c;并检索到这些行的其他列值的数据结构索引主要有普通索引、唯一索引、主键索引、外键…

攻防世界-T1 Training-WWW-Robots

文章目录 步骤1步骤二结束语 步骤1 看到文本——>提取有效信息——>利用有效信息 文本&#xff1a;In this little training challenge, you are going to learn about the Robots_exclusion_standard. The robots.txt file is used by web crawlers to check if they …

策略模式与模板方法结合案例

一、背景 上周在迁移项目MQ工程的时候&#xff0c;重新Review代码&#xff0c;发现有一段代码综合使用了策略模式和模板方法&#xff0c;下面讲解一下具体场景应用的思路。 二、模板方法 策略模式前段时间有一个关于库存具体案例&#xff0c;详见 库存管理与策略模式。 模板…

修改npm全局安装的插件(下载目录指向)

我们先打开终端 然后执行 npm config get prefix查看npm 的下载地址 一般都会在C盘 但是 我们都知道 C盘下东西多了是很不好的 所以 我们可以执行 npm config set prefix “E:\npmfile”将 npm 的下载地址 改变成 E盘下的 npmfile目录 这样 以后 默认全局安装的插件就会都到…

中国34省区市三维地形图(直接保存)

吉林 ▼ 辽宁 ▼ 北京 ▼ 河北 ▼ 山东 ▼ 山西 ▼ 天津 ▼ 江苏 ▼ 福建 ▼ 上海 ▼ 台湾 ▼ 浙江 ▼ 广东 ▼ 广西 ▼ 海南 ▼ 香港和澳门 ▼ 安徽 ▼ 河南 ▼ 湖北 ▼ 湖南 ▼ 江西 ▼ 甘肃 ▼ 内蒙古 ▼ 宁夏 ▼ 青海 ▼ 陕西 ▼ 新疆 ▼ 贵州 …

016 Spring Boot + Vue 图书管理系统

Spring Boot Vue 图书馆管理系统&#xff08;library-system&#xff09; 本地快捷预览项目 第一步&#xff1a;运行 db 文件夹下的springboot-vue.sql(询问作者获取)&#xff0c;创建springboot-vue数据库 第二步&#xff1a;修改后端数据库配置文件&#xff0c;启动后端 …

计算机网络-计算机网络体系结构-概述,模型

目录 一、计算机网络概述 二、性能指标 速率 带宽 吞吐量 时延 往返时延RTT 利用率 三、计算机网络体系结构 分层结构 IOS模型 应用层-> 表示层-> 会话层-> 传输层-> 网络层-> 数据链路层-> 物理层-> TCP/IP模型 一、计算机网络概述 计…

浅谈OV SSL 证书的优势

随着网络威胁日益增多&#xff0c;保护网站和用户安全已成为每个企业和组织的重要任务。在众多SSL证书类型中&#xff0c;OV&#xff08;Organization Validation&#xff09;证书以其独特的优势备受关注。让我们深入探究OV证书的优势所在&#xff0c;为网站安全搭建坚实的防线…

10-Node.js模块化

01.模块化简介 目标 了解模块化概念和好处&#xff0c;以及 CommonJS 标准语法导出和导入 讲解 在 Node.js 中每个文件都被当做是一个独立的模块&#xff0c;模块内定义的变量和函数都是独立作用域的&#xff0c;因为 Node.js 在执行模块代码时&#xff0c;将使用如下所示的…

Vue中如何进行图像处理与图像滤镜

在Vue中进行图像处理与图像滤镜 图像处理和滤镜效果是现代Web应用程序中常见的功能之一。Vue.js作为一个流行的JavaScript框架&#xff0c;为实现这些功能提供了许多工具和库。本文将介绍如何使用Vue来进行图像处理与图像滤镜&#xff0c;包括使用HTML5 Canvas和CSS滤镜。 准备…

抖音账号矩阵系统开发源码----技术研发

一、技术自研框架开发背景&#xff1a; 抖音账号矩阵系统是一种基于数据分析和管理的全新平台&#xff0c;能够帮助用户更好地管理、扩展和营销抖音账号。 抖音账号矩阵系统开发源码 部分源码分享&#xff1a; ic function indexAction() { //面包屑 $breadc…

WEB 3D 技术,通过node环境创建一个three案例

好 文章 前端3D Three.js 在本地搭建一个官方网站 中我们 搭建了一个Three的官网 现在呢 我们就来创建第一个ThreeJs的资源 这里呢 我们还是选择一个脚手架的开发模式 因为现在基本所有的前端都在使用这样的开发方式 这里 我们创建一个文件夹目录 作为我们项目的存放目录 我们…

boost在不同平台下的编译(win、arm)

首先下载boost源码 下载完成之后解压 前提需要自行安装gcc等工具 window ./bootstrap.sh ./b2 ./b2 installarm &#xff08;linux&#xff09; sudo ./bootstrap.sh sudo ./b2 cxxflags-fPIC cflags-fPIC linkstatic -a threadingmulti sudo ./b2 installx86 (linux) su…

第8期ThreadX视频教程:应用实战,将裸机工程移植到RTOS的任务划分,驱动和应用层交互,中断DMA,C库和中间件处理等注意事项

视频教程汇总帖&#xff1a;【学以致用&#xff0c;授人以渔】2023视频教程汇总&#xff0c;DSP第12期&#xff0c;ThreadX第8期&#xff0c;BSP驱动第26期&#xff0c;USB实战第5期&#xff0c;GUI实战第3期&#xff08;2023-10-01&#xff09; - STM32F429 - 硬汉嵌入式论坛 …

HD-G2UL-GW高性能4G工业网关|介绍|参数

HD-G2UL-GW多功能型网关基于高性能低功耗 ARM 处理器设计&#xff0c;集成 4G、2路网口、4 路 RS-485、2路 RS-232&#xff08;与485有复用&#xff09;、WIFI等功能&#xff0c;在电力、环保、节能、消防、农业等领域有广泛应用。 HD-G2UL-GW板载的外设功能&#xff1a; 集成2…

10-Node.js入门

01.什么是 Node.js 目标 什么是 Node.js&#xff0c;有什么用&#xff0c;为何能独立执行 JS 代码&#xff0c;演示安装和执行 JS 文件内代码 讲解 Node.js 是一个独立的 JavaScript 运行环境&#xff0c;能独立执行 JS 代码&#xff0c;因为这个特点&#xff0c;它可以用来…

虚拟环境搭建、后台项目创建及目录调整、封装logger、封装全局异常、封装Response、后台数据库创建

1 虚拟环境搭建 #1 虚拟环境作用多个项目&#xff0c;自己有自己的环境&#xff0c;装的模块属于自己的# 2 使用pycharm创建-一般放在项目路径下&#xff1a;venv文件夹-lib文件夹---》site-package--》虚拟环境装的模块&#xff0c;都会放在这里-scripts--》python&#xff0…

Arcgis日常天坑问题(1)——将Revit模型转为slpk数据卡住不前

这段时间碰到这么一个问题&#xff0c;revit模型在arcgis pro里导出slpk的时候&#xff0c;卡在98%一直不动&#xff0c;大约有两个小时。 首先想到的是revit模型过大&#xff0c;接近300M。然后各种减小模型测试&#xff0c;还是一样的问题&#xff0c;大概花了两天的时间&am…

如何选择合适的智能工单管理系统?智能工单系统有什么优势?

在当今社会&#xff0c;单位的运营和管理面临着越来越多的挑战。其中&#xff0c;设备的维护和保养是单位日常运营中的重要环节。然而&#xff0c;传统的工单管理方式存在着诸多问题&#xff0c;如报修效率低下、人工成本高昂、维修品质参差不齐等。为了解决这些问题&#xff0…

自动驾驶传感器技术

自动驾驶传感器技术是自动驾驶系统的关键组成部分&#xff0c;它使车辆能够感知并理解周围环境。本文将深入探讨自动驾驶传感器技术&#xff0c;包括常见类型、工作原理以及它们在自动驾驶中的作用。 1. 摄像头 摄像头的工作原理 摄像头是基于光学原理的传感器&#xff0c;其…