StructuredStreaming (一)

一、sparkStreaming的不足

1.基于微批,延迟高不能做到真正的实时

2.DStream基于RDD,不直接支持SQL

3.流批处理的API应用层不统一,(流用的DStream-底层是RDD,批用的DF/DS/RDD)

4.不支持EventTime事件时间(一般流处理都会有两个时间:事件发生的事件,一个是事件处理的时间)

5.数据的Exactly-Once(恰好一次语义)需要手动实现

二、StructuredStreaming 的介绍 

1、2016年Spark2.0版本中发布

2、基于SparkSQL引擎的可扩展、容错的全新的流处理引擎。

3、并不是对Spark Streaming的简单改进,而是重新开发的全新流式引擎

准实时技术:来一批处理一批 实时:来一条处理一条 离线:一般都是处理一些静止的数据

三、socket+console

1、在虚拟机中下载nc
yum install -y nc2、启动 nc -lk 9999

案例:wordcount

import osfrom pyspark.sql import SparkSession
from pyspark.sql.functions import explode
import pyspark.sql.functions as F
if __name__ == '__main__':os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 创建一个sparkSession对象spark = SparkSession.builder.appName("socketDemo").getOrCreate()socketDf = spark.readStream.format("socket") \.option("host", "bigdata01") \.option("port", 9999) \.load()# 处理# 方式一:使用dsl语法splitDf = socketDf.select(explode(F.split(socketDf.value, " ")).alias("word"))resultDf1 = splitDf.groupBy("word").count()# 方式二:使用sqlsocketDf.createOrReplaceTempView("wordcount")resultDf2 = spark.sql("""with t1 as( select num from wordcount lateral view explode(split(value," ")) c as num)select num,count(*) counts from t1 group by num;""")# 下面的就是sink的写法 后续会写query1 = resultDf1.writeStream \.outputMode("complete") \.format("console") \.start()query2 = resultDf2.writeStream \.outputMode("complete") \.format("console") \.start() \.awaitTermination()spark.stop()

四、file+console

文件中的数据:
1;yuwen;43
1;shuxue;55
2;yuwen;77
2;shuxue;88
3;yuwen;98
3;shuxue;65
3;yingyu;88
import osfrom pyspark.sql import SparkSessionfrom pyspark.sql.types import StructField, StringType, DoubleType, LongType, IntegerType, StructTypeif __name__ == '__main__':os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 创建一个sparkSession对象spark = SparkSession.builder.appName("socketDemo").getOrCreate()# score_schema = StructType([#     StructField(name="stu_id", dataType=IntegerType(), nullable=False),#     StructField(name="subject_name", dataType=StringType(), nullable=True),#     StructField(name="score", dataType=DoubleType(), nullable=True)# ])score_schema = StructType().add("stu_id", IntegerType()).add("subject_name", StringType()).add("score",DoubleType())socketDf = spark.readStream.format("csv") \.option("sep", ";") \.schema(score_schema) \.load("../../resources/input1")socketDf.writeStream \.outputMode("append") \.format("console") \.option("truncate", False) \.start() \.awaitTermination()spark.stop()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/60651.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python_爬虫1_Requests库入门

目录 Requests库 7个主要方法 Requests库的get()方法 Response对象的属性 爬取网页的通用代码框架 理解requests库的异常 HTTP协议及Requests库方法 HTTP协议 HTTP协议采用URL作为定位网络资源的标识。 HTTP协议对资源的操作 理解PATCH和PUT的区别 HTTP协议与Requse…

万字长文解读深度学习——生成对抗网络GAN

🌺历史文章列表🌺 深度学习——优化算法、激活函数、归一化、正则化深度学习——权重初始化、评估指标、梯度消失和梯度爆炸深度学习——前向传播与反向传播、神经网络(前馈神经网络与反馈神经网络)、常见算法概要汇总万字长文解读…

学SQL,要安装什么软件?

先上结论,推荐MySQLDbeaver的组合。 学SQL需要安装软件吗? 记得几年前我学习SQL的时候,以为像Java、Python一样需要安装SQL软件包,后来知道并没有所谓SQL软件,因为SQL是一种查询语言,它用来对数据库进行操…

Leecode刷题C语言之统计好节点的数目

执行结果:通过 执行用时和内存消耗如下: 题目:统计好节点的数目 现有一棵 无向 树,树中包含 n 个节点,按从 0 到 n - 1 标记。树的根节点是节点 0 。给你一个长度为 n - 1 的二维整数数组 edges,其中 edges[i] [ai,…

【代码审计】常见漏洞专项审计-业务逻辑漏洞审计

❤️博客主页: iknow181 🔥系列专栏: 网络安全、 Python、JavaSE、JavaWeb、CCNP 🎉欢迎大家点赞👍收藏⭐评论✍ 0x01 漏洞介绍 1、 原理 业务逻辑漏洞是一类特殊的安全漏洞,业务逻辑漏洞属于设计漏洞而非实…

【408】SDN重点笔记

总特征:数据平面(负责转发)与控制平面(负责控制)分离 控制平面: 由服务器和软件组成。控制平面完成转发表,并分发。 路由器不再需要路由选择协议,不再交换信息,只负责收到…

Redis的Zset在排行榜中应用

1.在pom文件导入&#xff1a; <!-- redis --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframew…

安全见闻1-5

涵盖了编程语言、软件程序类型、操作系统、网络通讯、硬件设备、web前后端、脚本语言、病毒种类、服务器程序、人工智能等基本知识&#xff0c;有助于全面了解计算机科学和网络技术的各个方面。 安全见闻1 1.编程语言简要概述 C语言&#xff1a;面向过程&#xff0c;适用于系统…

入门车载以太网(4) -- 传输层(TCP\UDP)

目录 1.ECU通信方式的变化 2.传输层概述 2.1 UDP 2.2 TCP 3. TCP和ISO 15765-2 1.ECU通信方式的变化 我们先回顾下两种通信方式&#xff1a;Signal-Based Messaging、Service-Based Messaging。 Signal-Based Messaging 基于信号的通信方式&#xff0c;例如CAN通信&…

Tofu AI视频处理模块视频输入配置方法

应用Tofu产品对网络视频进行获取做视频处理时&#xff0c;首先需要配置Tofu产品的硬件连接关系与设备IP地址、视频拉流地址。 步骤1 Tofu设备点对点直连或者通过交换机连接到电脑&#xff0c;电脑IP配置到与Tofu默认IP地址同一个网段。 打开软件 点击右上角系统设置 单击左侧…

QT<30> Qt中使鼠标变为转圈忙状态

前言&#xff1a;当我们在写软件时&#xff0c;在等待阻塞耗时操作时可以将鼠标变为忙状态&#xff0c;并在一段时间后恢复状态&#xff0c;可以用到GxtWaitCursor&#xff1a;Qt下基于RAII的鼠标等待光标类。 一、效果演示 二、详细代码 在项目中添加C文件&#xff0c;命名为…

什么是CRM系统?

越来越多的企业意识到&#xff1a;如何有效管理与客户的关系、提升客户满意度&#xff0c;并通过这些提升推动销售增长&#xff0c;已经成为许多公司亟待解决的问题。为此&#xff0c;客户关系管理&#xff08;Customer Relationship Management&#xff0c;简称CRM&#xff09…

【青牛科技】 GC6153——TMI8152 的不二之选,可应用于摇头机等产品中

在电子工程领域&#xff0c;不断寻求性能更优、成本更低的解决方案是工程师们的永恒追求。今天&#xff0c;我们要为广大电子工程师带来一款极具竞争力的产品 —— GC6153&#xff0c;它将成为 TMI8152 的完美替代之选。 一、产品背景 随着科技的飞速发展&#xff0c;电子设备…

JS 实现游戏流畅移动与按键立即响应

AWSD 按键移动 <!DOCTYPE html> <html><head><meta charset"utf-8"><title></title><style>.box1 {width: 400px;height: 400px;background: yellowgreen;margin: 0 auto;position: relative;}.box2 {width: 50px;height:…

服务器上安装Orcale数据库以及PL SQL工具(中文)

一、前期准备 1、oracle数据库安装包–>Oracle下载地址&#xff0c;版本根据当时情况就下最新的就行&#xff0c;下载时间可能有点长&#xff0c;耐心点。 2、PL SQL工具下载地址–>PL SQL下载地址&#xff0c;百度网盘可以共享【限速&#xff0c;没办法&#xff01;&am…

javaWeb小白项目--学生宿舍管理系统

目录 一、检查并关闭占用端口的进程 二、修改 Tomcat 的端口配置 三、重新启动 Tomcat 一、javaw.exe的作用 二、结束javaw.exe任务的影响 三、如何判断是否可以结束 结尾&#xff1a; 这个错误提示表明在本地启动 Tomcat v9.0 服务器时遇到了问题&#xff0c;原因是所需…

python爬虫(二)爬取国家博物馆的信息

import requests from bs4 import BeautifulSoup# 起始网址 url https://www.chnmuseum.cn/zx/xingnew/index_1.shtml # 用于存储所有数据 all_data [] page 1 global_index 1 # 定义全局序号变量并初始化为1 while True:html_url requests.get(url).textif requests.get…

2024 年(第 7 届)“泰迪杯”数据分析技能赛B 题 特殊医学用途配方食品数据分析 完整代码 结果 可视化分享

一、背景特殊医学用途配方食品简称特医食品&#xff0c;是指为满足进食受限、消化吸收障碍、代谢素乱或者特定疾病状态人群对营养素或者膳食的特殊需要&#xff0c;专门加工配置而成的配方食品&#xff0c;包括0月龄至12月龄的特殊医学用途婴儿配方食品和适用于1岁以上的特殊医…

TofuAI处理BT1120时序视频要求

时序要求 BT.1120视频用于1920x108030Hz数字视频输入。具体时序必须严格按照说明。BT.1120输入电平为1.8V。 BT1120数字视频采用YCbCr彩色格式输出&#xff0c;串行数据位宽为16bit&#xff0c;亮度在 高8bit&#xff0c;色度在低8bit&#xff0c;亮度和色度在同一个时钟周期输…

ASP.NET MVC宠物商城系统

该系统采用B/S架构&#xff0c;使用C#编程语言进行开发&#xff0c;以ASP.NET MVC框架为基础&#xff0c;以Visual Studio 2019为开发工具&#xff0c;数据库采用SQL Server进行保存数据。系统主要功能包括登录注册、宠物展示、个人中心、我的订单、购物车、用户管理、宠物类别…