【数据挖掘】bytewax 与 ydata工具可实时了解您的数据

一、说明

在这篇博文中,我们将介绍如何将开源流式处理解决方案 bytewax 与 ydata 分析相结合并加以利用,以提高流式处理流的质量。

        STream 处理支持在传输中和存储之前对数据进行实时分析,并且可以是有状态的,也可以是无状态的。

        有状态流处理用于实时建议、模式检测或复杂事件处理,其中处理需要已发生事件的历史记录(窗口、按键连接等)。

        无状态流处理用于内联转换,不需要了解流中的其他数据点,例如屏蔽电子邮件或转换类型。        

        总体而言,数据流在工业中被广泛使用,并且可以应用于欺诈检测患者监控事件预测维护等用例。

二、 数据流必须考虑关键是数据的质量

        与通常在创建数据仓库或仪表板解决方案期间评估数据质量的传统模型不同,流数据需要持续监视

        在从收集到馈送下游应用程序的整个过程中保持数据质量至关重要。毕竟,对于组织来说,糟糕的数据质量的成本可能很高。

        在本文中,我们将向您展示如何结合以分析和提高流媒体流的质量!bytewaxydata-profiling

三、使用 Bytewax 为数据专业人员提供流处理

          是专门为Python开发人员设计的OSS流处理框架。

        它允许用户构建具有类似于Flink,Spark和Kafka Streams功能的流数据管道和实时应用程序,同时提供友好和熟悉的界面以及与Python生态系统的100%兼容性。

        使用内置连接器或现有的 Python 库,您可以连接到实时和流数据源(Kafka、RedPanda、WebSocket 等),并将转换后的数据写入各种下游系统(Kafka、拼花地板文件、数据湖等)。

        对于转换,Bytewax 通过映射窗口聚合方法促进有状态和无状态转换,并具有恢复和可伸缩性等熟悉的功能。

        Bytewax 促进了 Python 优先和以数据为中心的数据流体验,并且是为数据工程师和数据科学家构建的。它允许用户构建流数据管道和实时应用程序,并创建满足其需求所需的自定义项,而无需学习和维护基于 JVM 的流平台,如 Spark 或 Flink。

        Bytewax 非常适合许多用例,即为生成 AI 嵌入管道、处理数据流中的缺失值、在流上下文中使用语言模型来理解金融市场等等。有关用例灵感和更多信息,如文档、教程和指南,请随时查看字节蜡网站。

四、为什么要对数据流进行数据剖析?

        数据剖析是成功启动任何机器学习任务的关键,指的是彻底了解数据的步骤:其结构、行为和质量。

        简而言之,数据分析涉及分析与数据格式和基本描述符相关的方面(例如,样本数量、特征的数量/类型、重复值)、其内在特征(例如存在缺失数据或不平衡的特征)以及在数据收集或处理过程中可能出现的其他复杂因素(例如,错误值或不一致的特征)。

        确保高数据质量标准对所有领域和组织都至关重要,但对于使用输出连续数据的域运营的领域尤其重要,其中情况可能会快速变化,可能需要立即采取行动(例如,医疗保健监测、股票价值、空气质量政策)。

        对于许多领域,从探索性数据分析的角度使用数据分析,考虑存储在数据库中的历史数据。相反,对于数据流,数据分析对于沿流持续验证和质量控制变得至关重要,需要在流程的不同时间范围或阶段检查数据。

        通过将自动分析嵌入到我们的数据流中,我们可以立即获得有关数据当前状态的反馈,并收到任何潜在关键问题的警报 - 无论是与数据一致性和完整性有关(例如,损坏的值或更改格式),还是与短时间内发生的事件(例如,数据漂移, 偏离业务规则和结果)。

        在现实世界的领域——你只知道墨菲定律一定会发生,“一切都可能出错”——自动分析可能会让我们免于多个大脑难题和需要停止生产的系统!

        在涉及数据剖析方面,无论是表格数据还是时间序列数据,它一直是大众的最爱。难怪为什么 - 它是一组广泛的分析和见解的一行代码。ydata-profiling

        复杂且耗时的操作是在后台完成的:ydata 分析会自动检测数据中包含的特征类型,并根据特征类型(数字或分类)调整分析报告中显示的汇总统计数据和可视化效果。

        该软件包促进了以数据为中心的分析,还突出了特征之间的现有关系,重点关注它们的成对交互相关性,并提供了对数据质量警报的全面评估,从重复常量值到偏斜不平衡的特征。

        它实际上是我们数据质量的 360º 视图 - 只需最少的努力。

        分析报告:突出显示潜在的数据质量问题。图片由作者提供。

五、把所有东西放在一起:字节蜡和ydata-profile。

        在开始项目之前,我们需要先设置 python 依赖项并配置数据源。

        首先,让我们安装 和 软件包(您可能希望为此使用虚拟环境 - 如果您需要一些额外的指导,请查看这些说明!bytewaxydata-profiling

pip install bytewax==0.16.2 ydata-profiling==4.3.1

        然后,我们将上传环境传感器遥测数据集(许可证 — CC0:公共域),其中包含来自不同 IoT 设备的温度、湿度、一氧化碳液化石油气、烟雾、光线和运动的多个测量值:        在生产环境中,这些测量将由每个设备连续生成,输入看起来像我们在 Kafka 等流媒体平台中期望的。在本文中,为了模拟我们在流数据中找到的上下文,我们将一次一行地从 CSV 文件中读取数据,并使用字节蜡创建数据流。

        (作为快速旁注,数据流本质上是一个数据管道,可以描述为有向无环图 — DAG)

        首先,让我们进行一些必要的导入

from datetime import datetime, timedelta, timezonefrom bytewax.dataflow import Dataflow
from bytewax.connectors.stdio import StdOutput
from bytewax.connectors.files import CSVInput
from bytewax.testing import run_main
        然后,我们定义数据流对象。之后,我们将使用无状态映射方法,在其中传入一个函数将字符串转换为 datetime 对象并将数据重组为格式(device_id,数据)。

        map 方法将以无状态的方式对每个数据点进行更改。我们修改数据形状的原因是,我们可以在接下来的步骤中轻松地对数据进行分组,以单独分析每个设备的数据,而不是同时分析所有设备的数据。

flow = Dataflow()
flow.input("simulated_stream", CSVInput("/content/iot_telemetry_data_1000"))# parse timestamp
def parse_time(reading_data):reading_data["ts"] = datetime.fromtimestamp(float(reading_data["ts"]), timezone.utc)return reading_dataflow.map(parse_time)# remap format to tuple (device_id, reading_data)
flow.map(lambda reading_data: (reading_data['device'], reading_data))

        现在,我们将利用有状态功能,在我们定义的时间段内收集每个设备的数据。 需要一段时间内的数据快照,这使得窗口运算符成为执行此操作的完美方法。

        在bytewaxydata-profiling 中,我们能够为为特定上下文指定的数据帧生成汇总统计信息。例如,在我们的示例中,我们可以生成引用每个物联网设备或特定时间框架的数据快照:ydata-profiling

from bytewax.window import EventClockConfig, TumblingWindow# This is the accumulator function, and outputs a list of readings
def acc_values(acc, reading):acc.append(reading)return acc# This function instructs the event clock on how to retrieve the
# event's datetime from the input.
def get_time(reading):return reading["ts"]# Configure the `fold_window` operator to use the event time.
cc = EventClockConfig(get_time, wait_for_system_duration=timedelta(seconds=30))# And a tumbling window
align_to = datetime(2020, 1, 1, tzinfo=timezone.utc)
wc = TumblingWindow(align_to=align_to, length=timedelta(hours=1))flow.fold_window("running_average", cc, wc, list, acc_values)flow.inspect(print)

        定义快照后,利用就像为我们要分析的每个数据帧调用 一样简单:ydata-profilingPorfileReport

import pandas as pd
from ydata_profiling import ProfileReportdef profile(device_id__readings):print(device_id__readings)device_id, readings = device_id__readingsstart_time = readings[0]['ts'].replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S')df = pd.DataFrame(readings)profile = ProfileReport(df,tsmode=True,sortby="ts",title=f"Sensor Readings - device: {device_id}")profile.to_file(f"Ts_Profile_{device_id}-{start_time}.html")return f"device {device_id} profiled at hour {start_time}"flow.map(profile)

        在此示例中,我们将图像作为 map 方法中函数的一部分写入本地文件。这些可以通过消息传递工具报告出来,或者我们可以在将来将它们保存到一些远程存储中。配置文件完成后,数据流需要一些输出,因此我们可以使用内置设备打印已分析的设备,以及从映射步骤中的配置文件函数传递的配置文件时间:StdOutput

flow.output("out", StdOutput())

        有多种方法可以执行字节蜡数据流。在这个例子中,我们使用相同的本地机器,但 Bytewax 也可以在多个 Python 进程上运行,跨多个主机,在 Docker 容器中运行,使用 Kubernetes 集群等等。

        在本文中,我们将继续使用本地设置,但我们鼓励你查看我们的帮助程序工具 waxctl,该工具在管道准备好过渡到生产环境后管理 Kubernetes 数据流部署。

假设我们与具有数据流定义的文件位于同一目录中,则可以使用以下方法运行它:

python -m bytewax.run ydata-profiling-streaming:flow

        然后,我们可以使用分析报告来验证数据质量,检查模式或数据格式的更改,并比较不同设备或时间窗口之间的数据特征

        事实上,我们可以利用比较报告功能,以直接的方式突出显示两个数据配置文件之间的差异,从而更容易检测需要调查的重要模式或必须解决的问题:

snapshot_a_report = ProfileReport(df_a, title="Snapshot A")
snapshot_b_report = ProfileReport(df_b, title="Snapshot B")comparison_report =snapshot_a_report(snapshot_b_report)
comparison_report.to_file("comparison_report.html")

六、准备好探索您自己的数据流了吗?

        验证数据流对于连续识别数据质量问题并比较不同时间段的数据状态至关重要。

        对于医疗保健能源制造娱乐领域的组织(所有组织都在处理连续的数据流),分析是建立从质量评估到数据隐私的数据治理最佳实践的关键

        这需要对数据快照进行分析,如本文所示,可以通过组合 和 无缝实现数据快照。bytewaxydata-profiling

        Bytewax负责处理数据流并将其构建为快照所需的所有过程,然后可以通过数据特征的综合报告对其进行汇总并与ydata分析进行比较。

        能够适当地处理和分析传入的数据开启了跨不同领域的大量用例,从纠正数据架构和格式中的错误到突出显示和缓解实际活动产生的其他问题,例如异常检测(例如,欺诈或入侵/威胁检测)、设备故障以及其他偏离预期的事件(例如,数据偏移或与业务规则不一致)。

        现在,您就可以开始探索数据流了!让我们知道您发现了哪些其他用例,并一如既往地在评论中给我们留言,或在以数据为中心的 AI 社区中找到我们以获取进一步的问题和建议!再见!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/5450.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【网络编程】网络套接字udp通用服务器和客户端

1.预备知识 认识端口号 端口号(port)是传输层协议的内容: 端口号是一个2字节16位的整数(uint16)端口号用来标识主机上的一个进程IP地址port能够标识网络上的某一台主机和某一个进程一个端口号只能被一个进程占用 认识TCP协议 此处我们先对TCP(Transmission Con…

Spring MVC异步上传、跨服务器上传和文件下载

一、异步上传 之前的上传方案,在上传成功后都会跳转页面。而在实际开发中,很多情况下上传后不进行跳转,而是进行页面的局部刷新,比如:上传头像成功后将头像显示在网页中。这时候就需要使用异步文件上传。 1.1 JSP页面 …

[golang gin框架] 41.Gin商城项目-微服务实战之后台Rbac微服务(用户登录 、Gorm数据库配置单独抽离、 Consul配置单独抽离)

上一节抽离了captcha验证码功能,集成了验证码微服务功能,这一节来看看后台Rbac功能,并抽离其中的用户登录,管理员管理,角色管理,权限管理等功能作为微服务来调用 一.引入 后台操作从登录到后台首页,然后其中的管理员管理,角色管理,权限管理等功能可以抽离出来作为 一个Rbac微服…

Python实战

官方文档 请点击下面工程名称,跳转到代码的仓库页面,将工程 下载下来 Demo Code 里有详细的注释 LearnPythonPython 实现功能点demo

OpenCV for Python 实战(一):获取图片拍摄GPS地址并自动添加水印

Hello 我们在OpenCV每天的基础博客当中已经更新了很多了,那么今天我们就来结合前几天的内容。做一个获取属性然后添加对应属性的水印。那让我们赶快开始吧~ 文章目录 图片EXIFPython 获取EXIFexifread库使用方法转换成文字地址 添加水印cv2.putText() 每日总结 图片…

【001 操作系统】什么是线程、进程?线程进程的区别是什么?

一、什么是线程、进程? 进程:进程是资源分配的基本单位,它是程序执行时的一个实例,在程序运行时创建。 在Linux环境下,每个进程有自己各自独立的 4G 地址空间,大家互不干扰对方,如果两个进程之间…

基于大模型的Text2SQL微调的实战教程

大家好,我是herosunly。985院校硕士毕业,现担任算法研究员一职,热衷于机器学习算法研究与应用。曾获得阿里云天池比赛第一名,CCF比赛第二名,科大讯飞比赛第三名。拥有多项发明专利。对机器学习和深度学习拥有自己独到的见解。曾经辅导过若干个非计算机专业的学生进入到算法…

Homography单应性矩阵

1. Homography 单应性概念 考虑 同一个平面(比如书皮)的两张图片,红点表示同一个物理坐标点在两张图片上的各自位置。在 CV 术语中,我们称之为对应点。 Homography 就是将一张图像上的点映射到另一张图像上对应点的3x3变换矩阵. 因为 Homography 是一个 …

Python模块requests基本用法

简介 Python 的 requests 模块是一个流行的第三方库,用于发送HTTP请求。它提供了一组简洁且易于使用的API,使得在Python中进行网络通信变得更加简单和灵活。 目录 1. 基本概念 1.1. HTTP 协议 1.2. GET 请求 1.3. POST 请求 1.4. get 和 post 的区别…

java本地socket服务端暴露至公网访问【内网穿透】

前言 📕作者简介:热爱跑步的恒川,致力于C/C、Java、Python等多编程语言,热爱跑步,喜爱音乐的一位博主。 📗本文收录于恒川的日常汇报系列,大家有兴趣的可以看一看 📘相关专栏C语言初…

蓝桥杯专题-真题版含答案-【生命之树】【消除尾一】【密码脱落】【生日蜡烛】

点击跳转专栏>Unity3D特效百例点击跳转专栏>案例项目实战源码点击跳转专栏>游戏脚本-辅助自动化点击跳转专栏>Android控件全解手册点击跳转专栏>Scratch编程案例点击跳转>软考全系列点击跳转>蓝桥系列 👉关于作者 专注于Android/Unity和各种游…

【Java项目实战-牛客社区】--idea maven配置

第一 IDEA集成Maven插件,并配置Maven 以下步骤中,重点关注红色方框的配置 第二 IDEA 创建 Maven 项目 步骤一:创建模块,选择Maven,点击Next 步骤二:填写模块名称,坐标信息,点击finis…

【技术面试】Java八股文业余选手-下篇(持续更新)

文章目录 5. RocketMQ 消息中间件、RabbitMQ、ActiveMQ【√】5.1 RocketMQ 6. Kafka 大数据量消息中间件、ElasticSearch、ZooKeeper【√】6.1 Kafka【√】6.2 ElasticSearch 7. 分布式、研发提效、高并发、线程安全【√】7.1 分布式与集群【√】7.2 高并发、线程安全【】7.3 研…

【JavaScript】实现网页中的选项卡

一、简易选项卡 以下是实现一个简单选项卡的代码。代码中有注释。 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0"…

Airbnb 引入 HTTP Streaming,网页性能得到大幅度提升

Airbnb 通过引入HTTP Streaming来提升网站的页面加载性能。他们将测试的每个页面&#xff08;包括主页&#xff09;的首次内容绘制&#xff08;First Contentful Paint&#xff0c;FCP&#xff09;时间降低了大约 100 毫秒。他们还最小化了后端慢查询对加载时间的影响。 Airbn…

Docker概述 镜像-容器基本操作

Docker 概述 Docker是一个开源的应用容器引擎&#xff0c;基于go语言开发并遵循了apache2.0协议开源。 Docker是在Linux容器里运行应用的开源工具&#xff0c;是一种轻量级的“虚拟机”。 Docker 的容器技术可以在一台主机上轻松为任何应用创建一个轻量级的、可移植的、自给自足…

Docker 安装 和 GPU 支持

一、Docker安装过程&#xff08;ubuntu18.04环境&#xff09; 清华镜像 docker 安装&#xff1a;docker-ce | 镜像站使用帮助 | 清华大学开源软件镜像站 | Tsinghua Open Source Mirror 1、由于apt官方库里的docker版本可能比较旧&#xff0c;所以先卸载可能存在的旧版本&…

MySQL(一)基本架构、SQL语句操作、试图

MySQL系列文章 MySQL&#xff08;一&#xff09;基本架构、SQL语句操作、试图 MySQL&#xff08;二&#xff09;索引原理以及优化 MySQL&#xff08;三&#xff09;SQL优化、Buffer pool、Change buffer MySQL&#xff08;四&#xff09;事务原理及分析 MySQL&#xff08;五&a…

异步任务——CompletabelFuture

本专栏学习内容又是来自尚硅谷周阳老师的视频 有兴趣的小伙伴可以点击视频地址观看 在学习CompletableFuture之前&#xff0c;必须要先了解一下Future Future 概念 Future接口&#xff08;FutureTask实现类&#xff09;定义了操作异步任务执行的一些方法&#xff0c;如获取异…

前端学习——Vue (Day2)

指令补充 指令修饰符 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdevi…