前言
1、环境准备
- 启动Zookeeper和Kafka集群
- 导入依赖:
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.2.4</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.2.4</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.2.4</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.30</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.10</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.14.2</version></dependency>
2、模拟生产数据
通过循环来不断生产随机数据、使用Kafka来发布订阅消息。
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}import java.util.Properties
import scala.collection.mutable.ListBuffer
import scala.util.Random// 生产模拟数据
object MockData {def main(args: Array[String]): Unit = {// 生成模拟数据// 格式: timestamp area city userid adid// 含义: 时间戳 省份 城市 用户 广告// 生产数据 => Kafka => SparkStreaming => 分析处理// 设置Zookeeper属性val props = new Properties()props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092")props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)while (true){mockData().foreach((data: String) => {// 向 Kafka 中生成数据val record = new ProducerRecord[String,String]("testTopic",data)producer.send(record)println(record)})Thread.sleep(2000)}}def mockData(): ListBuffer[String] = {val list = ListBuffer[String]()val areaList = ListBuffer[String]("华东","华南","华北","华南")val cityList = ListBuffer[String]("北京","西安","上海","广东")for (i <- 1 to 30){val area = areaList(new Random().nextInt(4))val city = cityList(new Random().nextInt(4))val userid = new Random().nextInt(6) + 1val adid = new Random().nextInt(6) + 1list.append(s"${System.currentTimeMillis()} ${area} ${city} ${userid} ${adid}")}list}}
3、模拟消费数据
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}// 消费数据
object Kafka_req1 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("kafka req1")val ssc = new StreamingContext(conf,Seconds(3))// 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化val kafkaPara: Map[String,Object] = Map[String,Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.GROUP_ID_CONFIG ->"lyh",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])// 读取Kafka数据创建DStreamval kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent, //优先位置ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数))// 将每条消息的KV取出val valueDStream: DStream[String] = kafkaDStream.map(_.value())// 计算WordCountvalueDStream.print()// 开启任务ssc.start()ssc.awaitTermination()}}
4、需求1 广告黑名单
实现实时的动态黑名单机制:将每天对某个广告点击超过 100 次的用户拉黑。(黑名单保存到 MySQL 中。)
先判断用户是否已经在黑名单中?过滤:判断用户点击是否超过阈值?拉入黑名单:更新用户的点击数量,并获取最新的点击数据再判断是否超过阈值?拉入黑名单:不做处理
需要两张表:黑名单、点击数量表。
create table black_list (userid char(1));
CREATE TABLE user_ad_count (
dt varchar(255),
userid CHAR (1),
adid CHAR (1),
count BIGINT,
PRIMARY KEY (dt, userid, adid)
);
JDBC工具类
import com.alibaba.druid.pool.DruidDataSourceFactoryimport java.sql.Connection
import java.util.Properties
import javax.sql.DataSourceobject JDBCUtil {var dataSource: DataSource = init()//初始化连接池def init(): DataSource = {val properties = new Properties()properties.setProperty("driverClassName", "com.mysql.jdbc.Driver")properties.setProperty("url", "jdbc:mysql://hadoop102:3306/spark-streaming?useUnicode=true&characterEncoding=UTF-8&useSSL=false")properties.setProperty("username", "root")properties.setProperty("password", "000000")properties.setProperty("maxActive", "50")DruidDataSourceFactory.createDataSource(properties)}//获取连接对象def getConnection(): Connection ={dataSource.getConnection}
}
需求实现:
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}import java.sql.Connection
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable.ListBuffer// 消费数据
object Kafka_req1 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("kafka req1")val ssc = new StreamingContext(conf,Seconds(3))// 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化val kafkaPara: Map[String,Object] = Map[String,Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.GROUP_ID_CONFIG ->"lyh",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])// 读取Kafka数据创建DStreamval kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent, //优先位置ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数))val clickData: DStream[AdClickData] = kafkaDStream.map(kafkaData => {val data = kafkaData.value()val datas = data.split(" ")AdClickData(datas(0), datas(1), datas(2), datas(3),datas(4))})val ds: DStream[((String,String,String),Int)] = clickData.transform( //周期性地拿到 RDD 数据rdd => {// todo 周期性获取黑名单数据,就要周期性读取MySQL中的数据val black_list = ListBuffer[String]()val con: Connection = JDBCUtil.getConnection()val stmt = con.prepareStatement("select * from black_list")val rs = stmt.executeQuery()while (rs.next()) {black_list.append(rs.getString(1))}rs.close()stmt.close()con.close()// todo 判断用户是否在黑名单当中,在就过滤掉val filterRDD = rdd.filter(data => {!black_list.contains(data.user)})// todo 如果不在,那么统计点击数量filterRDD.map(data => {val sdf = new SimpleDateFormat("yyyy-MM-dd")val day = sdf.format(new Date(data.ts.toLong))val user = data.userval ad = data.ad((day, user, ad), 1) // 返回键值对}).reduceByKey(_ + _)})ds.foreachRDD(rdd => {rdd.foreach {case ((day, user, ad), count) => {println(s"$day $user $ad $count")if (count>=30){// todo 如果统计数量超过点击阈值(30),拉入黑名单val con = JDBCUtil.getConnection()val stmt = con.prepareStatement("""|insert into black_list values(?)|on duplicate key|update userid=?|""".stripMargin)stmt.setString(1,user)stmt.setString(2,user)stmt.executeUpdate()stmt.close()con.close()}else{// todo 如果没有超过阈值,更新到当天点击数量val con = JDBCUtil.getConnection()val stmt = con.prepareStatement("""|select *|from user_ad_count|where dt=? and userid=? and adid=?|""".stripMargin)stmt.setString(1,day)stmt.setString(2,user)stmt.setString(3,ad)val rs = stmt.executeQuery()if (rs.next()){ //如果存在数据val stmt1 = con.prepareStatement("""|update user_ad_count|set count=count+?|where dt=? and userid=? and adid=?|""".stripMargin)stmt1.setInt(1,count)stmt1.setString(2,day)stmt1.setString(3,user)stmt1.setString(4,ad)stmt1.executeUpdate()stmt1.close()// todo 如果更新后的点击数量超过阈值,拉入黑名单val stmt2 = con.prepareStatement("""|select *|from user_ad_count|where dt=? and userid=? and adid=?|""".stripMargin)stmt2.setString(1,day)stmt2.setString(2,user)stmt2.setString(3,ad)val rs1 = stmt2.executeQuery()if (rs1.next()){val stmt3 = con.prepareStatement("""|insert into black_list(userid) values(?)|on duplicate key|update userid=?|""".stripMargin)stmt3.setString(1,user)stmt3.setString(2,user)stmt3.executeUpdate()stmt3.close()}rs1.close()stmt2.close()}else{// todo 如果不存在数据,那么新增val stmt1 = con.prepareStatement("""|insert into user_ad_count(dt,userid,adid,count) values(?,?,?,?)|""".stripMargin)stmt1.setString(1,day)stmt1.setString(2,user)stmt1.setString(3,ad)stmt1.setInt(4,count)stmt1.executeUpdate()stmt1.close()}rs.close()stmt.close()con.close()}}}})// 开启任务ssc.start()ssc.awaitTermination()}// 广告点击数据case class AdClickData(ts: String,area: String,city: String,user: String,ad: String)}