SparkStreaming【实例演示】

前言

1、环境准备

  1. 启动Zookeeper和Kafka集群
  2. 导入依赖:
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.2.4</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.2.4</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.2.4</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.30</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.10</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.14.2</version></dependency>

2、模拟生产数据

通过循环来不断生产随机数据、使用Kafka来发布订阅消息。

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}import java.util.Properties
import scala.collection.mutable.ListBuffer
import scala.util.Random// 生产模拟数据
object MockData {def main(args: Array[String]): Unit = {// 生成模拟数据// 格式: timestamp area city userid adid// 含义:   时间戳    省份  城市   用户  广告// 生产数据 => Kafka => SparkStreaming => 分析处理// 设置Zookeeper属性val props = new Properties()props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092")props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)while (true){mockData().foreach((data: String) => {// 向 Kafka 中生成数据val record = new ProducerRecord[String,String]("testTopic",data)producer.send(record)println(record)})Thread.sleep(2000)}}def mockData(): ListBuffer[String] = {val list = ListBuffer[String]()val areaList = ListBuffer[String]("华东","华南","华北","华南")val cityList = ListBuffer[String]("北京","西安","上海","广东")for (i <- 1 to 30){val area = areaList(new Random().nextInt(4))val city = cityList(new Random().nextInt(4))val userid = new Random().nextInt(6) + 1val adid = new Random().nextInt(6) + 1list.append(s"${System.currentTimeMillis()} ${area} ${city} ${userid} ${adid}")}list}}

3、模拟消费数据

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}// 消费数据
object Kafka_req1 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("kafka req1")val ssc = new StreamingContext(conf,Seconds(3))// 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化val kafkaPara: Map[String,Object] = Map[String,Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.GROUP_ID_CONFIG ->"lyh",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])// 读取Kafka数据创建DStreamval kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,  //优先位置ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数))// 将每条消息的KV取出val valueDStream: DStream[String] = kafkaDStream.map(_.value())// 计算WordCountvalueDStream.print()// 开启任务ssc.start()ssc.awaitTermination()}}

4、需求1 广告黑名单

实现实时的动态黑名单机制:将每天对某个广告点击超过 100 次的用户拉黑。(黑名单保存到 MySQL 中。)

先判断用户是否已经在黑名单中?过滤:判断用户点击是否超过阈值?拉入黑名单:更新用户的点击数量,并获取最新的点击数据再判断是否超过阈值?拉入黑名单:不做处理

需要两张表:黑名单、点击数量表。

create table black_list (userid char(1));
CREATE TABLE user_ad_count (
dt varchar(255),
userid CHAR (1),
adid CHAR (1),
count BIGINT,
PRIMARY KEY (dt, userid, adid)
);

JDBC工具类

import com.alibaba.druid.pool.DruidDataSourceFactoryimport java.sql.Connection
import java.util.Properties
import javax.sql.DataSourceobject JDBCUtil {var dataSource: DataSource = init()//初始化连接池def init(): DataSource = {val properties = new Properties()properties.setProperty("driverClassName", "com.mysql.jdbc.Driver")properties.setProperty("url", "jdbc:mysql://hadoop102:3306/spark-streaming?useUnicode=true&characterEncoding=UTF-8&useSSL=false")properties.setProperty("username", "root")properties.setProperty("password", "000000")properties.setProperty("maxActive", "50")DruidDataSourceFactory.createDataSource(properties)}//获取连接对象def getConnection(): Connection ={dataSource.getConnection}
}

需求实现:
 

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}import java.sql.Connection
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable.ListBuffer// 消费数据
object Kafka_req1 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("kafka req1")val ssc = new StreamingContext(conf,Seconds(3))// 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化val kafkaPara: Map[String,Object] = Map[String,Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.GROUP_ID_CONFIG ->"lyh",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])// 读取Kafka数据创建DStreamval kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,  //优先位置ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数))val clickData: DStream[AdClickData] = kafkaDStream.map(kafkaData => {val data = kafkaData.value()val datas = data.split(" ")AdClickData(datas(0), datas(1), datas(2), datas(3),datas(4))})val ds: DStream[((String,String,String),Int)] = clickData.transform( //周期性地拿到 RDD 数据rdd => {// todo 周期性获取黑名单数据,就要周期性读取MySQL中的数据val black_list = ListBuffer[String]()val con: Connection = JDBCUtil.getConnection()val stmt = con.prepareStatement("select * from black_list")val rs = stmt.executeQuery()while (rs.next()) {black_list.append(rs.getString(1))}rs.close()stmt.close()con.close()// todo 判断用户是否在黑名单当中,在就过滤掉val filterRDD = rdd.filter(data => {!black_list.contains(data.user)})// todo 如果不在,那么统计点击数量filterRDD.map(data => {val sdf = new SimpleDateFormat("yyyy-MM-dd")val day = sdf.format(new Date(data.ts.toLong))val user = data.userval ad = data.ad((day, user, ad), 1) // 返回键值对}).reduceByKey(_ + _)})ds.foreachRDD(rdd => {rdd.foreach {case ((day, user, ad), count) => {println(s"$day $user $ad $count")if (count>=30){// todo 如果统计数量超过点击阈值(30),拉入黑名单val con = JDBCUtil.getConnection()val stmt = con.prepareStatement("""|insert into black_list values(?)|on duplicate key|update userid=?|""".stripMargin)stmt.setString(1,user)stmt.setString(2,user)stmt.executeUpdate()stmt.close()con.close()}else{// todo 如果没有超过阈值,更新到当天点击数量val con = JDBCUtil.getConnection()val stmt = con.prepareStatement("""|select *|from user_ad_count|where dt=? and userid=? and adid=?|""".stripMargin)stmt.setString(1,day)stmt.setString(2,user)stmt.setString(3,ad)val rs = stmt.executeQuery()if (rs.next()){ //如果存在数据val stmt1 = con.prepareStatement("""|update user_ad_count|set count=count+?|where dt=? and userid=? and adid=?|""".stripMargin)stmt1.setInt(1,count)stmt1.setString(2,day)stmt1.setString(3,user)stmt1.setString(4,ad)stmt1.executeUpdate()stmt1.close()// todo 如果更新后的点击数量超过阈值,拉入黑名单val stmt2 = con.prepareStatement("""|select *|from user_ad_count|where dt=? and userid=? and adid=?|""".stripMargin)stmt2.setString(1,day)stmt2.setString(2,user)stmt2.setString(3,ad)val rs1 = stmt2.executeQuery()if (rs1.next()){val stmt3 = con.prepareStatement("""|insert into black_list(userid) values(?)|on duplicate key|update userid=?|""".stripMargin)stmt3.setString(1,user)stmt3.setString(2,user)stmt3.executeUpdate()stmt3.close()}rs1.close()stmt2.close()}else{// todo 如果不存在数据,那么新增val stmt1 = con.prepareStatement("""|insert into user_ad_count(dt,userid,adid,count) values(?,?,?,?)|""".stripMargin)stmt1.setString(1,day)stmt1.setString(2,user)stmt1.setString(3,ad)stmt1.setInt(4,count)stmt1.executeUpdate()stmt1.close()}rs.close()stmt.close()con.close()}}}})// 开启任务ssc.start()ssc.awaitTermination()}// 广告点击数据case class AdClickData(ts: String,area: String,city: String,user: String,ad: String)}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/120597.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ESP32网络开发实例-HTTP-POST请求

HTTP-POST请求 文章目录 HTTP-POST请求1、HTTP POST2、软件准备3、硬件准备4、代码实现在本文中,我们将介绍如何使用 ESP32向 ThingSpeak等常用 API 发出 HTTP POST 请求。 1、HTTP POST 超文本传输协议 (HTTP) 用作服务器和客户端之间的请求-响应协议。 它使它们之间的通信顺…

2023年第四届MathorCup大数据挑战赛(B题)|电商零售商家需求预测及库存优化问题|数学建模完整代码+建模过程全解全析

当大家面临着复杂的数学建模问题时&#xff0c;你是否曾经感到茫然无措&#xff1f;作为2021年美国大学生数学建模比赛的O奖得主&#xff0c;我为大家提供了一套优秀的解题思路&#xff0c;让你轻松应对各种难题。 希望这些想法对大家的做题有一定的启发和借鉴意义。 让我们来…

数据结构与算法之排序: 归并排序 (Javascript版)

排序 排序&#xff1a;把某个乱序的数组变成升序或降序的数组 (这里用数组来做举例) 归并排序 该排序属于 分治 策略将一个问题分解为两个问题来计算&#xff0c;计算完成之后&#xff0c;就会得到子任务的解&#xff0c;这些解不是最终问题的解&#xff0c;还需要merge起来…

高级路由配置

目录 路由协议认证 Ripv2的认证配置 OSPF认证 BGP认证 OSPF特殊区域 BGP的选路规则 路由策略&#xff08;route-policy和filter-policy&#xff09; IP-Prefix List:前缀列表 Filter-Policy 路由引入&#xff08;import-route&#xff09; Filter-policy和route-pol…

数据结构与算法——分治法

分治算法&#xff08;Divide and Conquer Algorithm&#xff09;是一种算法设计策略&#xff0c;它将一个大问题分割成多个相同或相似的子问题&#xff0c;然后递归地解决这些子问题&#xff0c;最后将它们的解合并在一起&#xff0c;得到原始问题的解。分治算法通常包含三个关…

模型调参优化

模型超参数 参考链接&#xff1a;https://www.bilibili.com/video/BV1xw411q7cH/?buvidXU0E30D0C6006B7F1EE1425156434CFEC440F&from_spmidtm.recommend.0.0&is_story_h5false&midfMtk7pz9LsVpSyGt0Mcizg%3D%3D&p1&plat_id116&share_fromugc&sha…

目标跟踪ZoomTrack: Target-aware Non-uniform Resizing for Efficient Visual Tracking

论文作者&#xff1a;Yutong Kou,Jin Gao,Bing Li,Gang Wang,Weiming Hu,Yizheng Wang,Liang Li 作者单位&#xff1a;CASIA; University of Chinese Academy of Sciences; ShanghaiTech University; Beijing Institute of Basic Medical Sciences; People AI, Inc 论文链接&…

Java 反射机制详解

目录 一. 前言 二. 反射基础 2.1. Class 类 2.2. 类加载 三. 反射的使用 3.1. Class类对象的获取 3.2. Constructor类及其用法 3.3. Field类及其用法 3.4. Method类及其用法 四. 反射机制执行的流程 4.1. 反射获取类实例 4.2. 获取构造器的过程 4.3. 反射获取方法…

吐血整理,Jmeter服务端性能测试-线程阻塞问题案例分析(超细)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、Jstack打印快照…

【微信小程序】数字化会议OA系统之投票模块(附源码)

&#x1f389;&#x1f389;欢迎来到我的CSDN主页&#xff01;&#x1f389;&#x1f389; &#x1f3c5;我是Java方文山&#xff0c;一个在CSDN分享笔记的博主。&#x1f4da;&#x1f4da; &#x1f31f;推荐给大家我的专栏《微信小程序开发实战》。&#x1f3af;&#x1f3a…

Linux CentOS 本地yum配置

本地开发用虚拟机&#xff0c;因为有光盘镜像在手&#xff0c;并不需要联网安装组件&#xff0c;只需要设置一下就可以让yum从本地获取源。 将安装盘挂载在合适位置 mount /dev/cdrom /mnt 进入目录 cd /etc/yum.repos.d/ 修改CentOS-Media.repo里面的挂载路径 …

在项目中同时使用SpringCloud和Dubbo,注册中心选用Eureka?

文章目录 一、前置知识1、在Spring Boot中使用Dubbo&#xff1f;1&#xff09;配置服务提供者2&#xff09;配置服务消费者 2、在Spring Boot中使用Eureka&#xff1f;1&#xff09;Eureka服务2&#xff09;Eureka客户端 二、项目代码分析1、dubbo服务提供者1&#xff09;启动类…

FoneDog iOS Unlocker(ios解锁工具) 适用macos电脑

FoneDog iOS Unlocker是一款专业的iOS设备解锁工具&#xff0c;旨在帮助用户解决iOS设备上的解锁问题。该软件支持解锁各种锁定类型&#xff0c;如数字密码锁、手势密码锁、Touch ID和Face ID等&#xff0c;可以解除iPhone、iPad和iPod Touch等设备的锁定状态。FoneDog iOS Unl…

react-组件间的通讯

一、父传子 父组件在使用子组件时&#xff0c;提供要传递的数据子组件通过props接收数据 class Parent extends React.Component {render() {return (<div><div>我是父组件</div><Child name"张" age{16} /></div>)} }const Child …

NEWCC:新时代的区块链生态新币私募造势平台

在当今区块链领域&#xff0c;这项技术已经为金融资产注入了全新的生机&#xff0c;同时也为初创企业提供了新的商业模式和融资机会。通过代币的金融属性&#xff0c;企业和项目方得以实现资本的初期积累&#xff0c;同时在区块链空间以更低成本和更高效率进行交易和服务创新。…

【广州华锐互动】VR公司工厂消防逃生演练带来沉浸式的互动体验

在工业生产过程中&#xff0c;安全问题始终是我们不能忽视的重要环节。特别是火灾事故&#xff0c;不仅会造成重大的经济损失&#xff0c;更会威胁到员工的生命安全。传统的消防安全训练方法&#xff0c;如讲座、实地演练等&#xff0c;虽然具有一定的效果&#xff0c;但是无法…

ZooKeeper中节点的操作命令(查看、创建、删除节点)

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

docker部署prometheus+grafana服务器监控(二) - 安装数据收集器 node-exporter

在目标服务器安装数据收集器 node-exporter 1. 安装数据收集器 node-exporter wget https://github.com/prometheus/node_exporter/releases/download/v1.6.1/node_exporter-1.6.1.linux-amd64.tar.gztar xvf node_exporter-1.6.1.linux-amd64.tar.gzmv node_exporter-1.6.1…

短视频矩阵系统搭建/源头----源码

一、智能剪辑、矩阵分发、无人直播、爆款文案于一体独立应用开发 抖去推----主要针对本地生活的----移动端(小程序软件系统&#xff0c;目前是全国源头独立开发)&#xff0c;开发功能大拆解分享&#xff0c;功能大拆解&#xff1a; 7大模型剪辑法&#xff08;数学阶乘&#xff…

SQL Server Management Studio (SSMS)的安装教程

文章目录 SQL Server Management Studio (SSMS)的安装教程从Microsoft官网下载SQL Server Management Studio安装程序。选中安装程序右键并选择“以管理员的身份运行”选项选择安装目录&#xff0c;单击“安装”按钮开始安装过程安装成功界面安装完成后&#xff0c;您可以启动S…