Spark【Spark Streaming】

1、基本数据源

1.1、文件流

在spark Shell 下运行:

[lyh@hadoop102 spark-yarn-3.2.4]$ spark-shell 
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2022-09-08 08:56:21,875 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://hadoop102:4040
Spark context available as 'sc' (master = local[*], app id = local-1662598583370).
Spark session available as 'spark'.
Welcome to____              __/ __/__  ___ _____/ /___\ \/ _ \/ _ `/ __/  '_//___/ .__/\_,_/_/ /_/\_\   version 3.2.4/_/Using Scala version 2.12.15 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_241)
Type in expressions to have them evaluated.
Type :help for more information.scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._scala> val ssc = new StreamingContext(sc,Seconds(20))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@379899f4scala> val lines = ssc.textFileStream("file:///home/lyh/streaming/logfile")
lines: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@531245fescala> val kv = lines.map((_,1)).reduceByKey(_+_)
kv: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@c207c10scala> kv.print()scala> ssc.start()------------------------------------------
Time: 1662598860000 ms
--------------------------------------------------------------------------------------
Time: 1662598880000 ms
--------------------------------------------------------------------------------------
Time: 1662598900000 ms
-------------------------------------------
(c#,1)
(hh,1)
(h,1)
(javafx,1)
(spark,1)
(hadoop,1)
(js,1)
(java,1)
(s,1)
(c,1)

执行后立即新建终端在  /home/lyh/streaming/logfile 目录下创建文件并写入数据

1.2、Socket 套接字流

// todo 创建环境对象val conf = new SparkConf()conf.setAppName("word count").setMaster("local[*]")val ssc = new StreamingContext(conf,Seconds(3))// todo 逻辑处理// 获取端口数据(Socket)val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)val words: DStream[String] = lines.flatMap(_.split(" "))val word: DStream[(String,Int)] = words.map((_, 1))val wordCount: DStream[(String,Int)] = word.reduceByKey(_ + _)wordCount.print()// todo 关闭环境// 由于SparkStreaming的采集器是长期运行的,所以不能直接关闭// 而且main方法的关闭也会使SparkStreaming的采集器关闭ssc.start()// 等待采集器关闭ssc.awaitTermination()

启动 NetCat

> nc -lp 9999 
> hello world
> hello spark
> ...

运行结果: 

1.3、自定义 Socket 数据源

通过自定义 Socket 实现数据源不断产生数据

import java.io.PrintWriter
import java.net.{ServerSocket, Socket}
import scala.io.Source/*** 通过自定义的Socket来不断给客户端发送数据*/
object MySocketReceiver {def index(length: Int): Int = {val rdm = new java.util.Random()rdm.nextInt(length)}def main(args: Array[String]): Unit = {val fileName = "input/1.txt"val lines: List[String] = Source.fromFile(fileName).getLines().toListval listener: ServerSocket = new ServerSocket(9999)while(true){val socket: Socket = listener.accept()new Thread(){override def run(){val out: PrintWriter = new PrintWriter(socket.getOutputStream,true)while (true){Thread.sleep(1000)val content = lines(index(lines.length)) // 源源不断,每次打印list的第(1~length)随机行println(content)out.write(content + '\n')out.flush()}socket.close()}}.start()}}
}

定义一个处理器接收自定义数据源端口发送过来的数据。

def main(args: Array[String]): Unit = {// todo 创建环境对象val conf = new SparkConf()conf.setAppName("word count").setMaster("local[*]")val ssc = new StreamingContext(conf,Seconds(3))// todo 逻辑处理// 获取端口数据(Socket)val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)val words: DStream[String] = lines.flatMap(_.split(" "))val word: DStream[(String,Int)] = words.map((_, 1))val wordCount: DStream[(String,Int)] = word.reduceByKey(_ + _)wordCount.print()// todo 关闭环境// 由于SparkStreaming的采集器是长期运行的,所以不能直接关闭ssc.start()// 等待采集器关闭ssc.awaitTermination()}

先运行我们的数据源,再运行处理器:

处理器:

1.4、RDD 队列流

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.mutableobject SparkStreaming02_RDDStream {def main(args: Array[String]): Unit = {// 1. 初始化配置信息val conf = new SparkConf()conf.setAppName("rdd Stream").setMaster("local[*]")// 2.初始化SparkStreamingContextval ssc = new StreamingContext(conf,Seconds(4))// 3.创建RDD队列val rddQueue: mutable.Queue[RDD[Int]] = new mutable.Queue[RDD[Int]]()// 4.创建QueueInputStream// oneAtATime = true 默认,一次读取队列里面的一个数据// oneAtATime = false, 按照设定的时间,读取队列里面数据val inputStream: InputDStream[Int] = ssc.queueStream(rddQueue,oneAtATime = false)// 5. 处理队列中的RDD数据val sumStream: DStream[Int] = inputStream.reduce(_ + _)// 6. 打印结果sumStream.print()// 7.启动任务ssc.start()// 8.向队列中放入RDDfor(i <- 1 to 5){rddQueue += ssc.sparkContext.makeRDD(1 to 5)Thread.sleep(2000)}// 9. 等待数据源进程停止后关闭ssc.awaitTermination()}}

2、高级数据源

2.1、Kafka 数据源

2.1.1、消费者程序处理流数据

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}object SparkStreaming03_Kafka {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("kafka source")val ssc = new StreamingContext(conf,Seconds(3))// 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化val kafkaPara: Map[String,Object] = Map[String,Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.GROUP_ID_CONFIG ->"lyh",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])// 读取Kafka数据创建DStreamval kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,  //优先位置ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数))// 将每条消息的KV取出val valueDStream: DStream[String] = kafkaDStream.map(_.value())// 计算WordCountvalueDStream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()// 开启任务ssc.start()ssc.awaitTermination()}}

2.1.2、生产者生产数据

(1)kafka 端生产数据

启动 Kafka 集群

创建 Topic(指定一个分区三个副本): 

kafka-topics.sh --bootstrap-server hadoop102:9092 --topic <topic名称> --create --partitions 1 --replication-factor 3 

 查看是否生成 Topic:

kafka-topics.sh --bootstrap-server hadoop102:9092 --list

生产者生产数据:

> kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic <topic名称>
> hello world
> hello spark
> ...
(2)编写生产者程序
package com.lyhimport org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}object SparkStreaming03_Kafka {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("kafka source")val ssc = new StreamingContext(conf,Seconds(3))// 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化val kafkaPara: Map[String,Object] = Map[String,Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.GROUP_ID_CONFIG ->"lyh",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])// 读取Kafka数据创建DStreamval kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,  //优先位置ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数))// 将每条消息的KV取出val valueDStream: DStream[String] = kafkaDStream.map(_.value())// 计算WordCountvalueDStream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()// 开启任务ssc.start()ssc.awaitTermination()}}

3、转换操作

3.1、无状态转换操作

3.2、有状态转换操作

3.1.1、滑动窗口转换操作

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}object SparkStreaming05_Window {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming window")val ssc = new StreamingContext(conf,Seconds(3))val lines:DStream[String] = ssc.socketTextStream("localhost", 9999)val word_kv = lines.map((_, 1))/*** 收集器收集RDD合成DStream: 3s 窗口范围: 12s 窗口滑动间隔: 6s/次* 1. windowLength:表示滑动窗口的长度,即窗口内包含的数据的时间跨度。它是一个Duration对象,用于指定窗口的时间长度。* 2. slideInterval:表示滑动窗口的滑动间隔,即每隔多长时间将窗口向右滑动一次。同样是一个Duration对象。* 返回一个新的 DStream**/val wordToOneByWindow:DStream[(String,Int)] = word_kv.window(Seconds(12), Seconds(6))// 窗口每滑动一次(6s),对窗口内的数据进行一次聚合操作.val res: DStream[(String,Int)] = wordToOneByWindow.reduceByKey(_ + _)res.print()ssc.start()ssc.awaitTermination()}
}

3.1.2、updateStateByKey

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** DStream 有状态转换操作之 updateStateByKey(func) 转换操作*/
object SparkStreaming04_State {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("kafka state")val ssc = new StreamingContext(conf,Seconds(3))/*** 设置检查点目录的作用是为了确保Spark Streaming应用程序的容错性和可恢复性。* 在Spark Streaming应用程序运行过程中,它会将接收到的数据分成一批批进行处理。* 通过设置检查点目录,Spark Streaming会定期将当前的处理状态、接收到的数据偏移量等信息保存到可靠的存储系统中,* 比如分布式文件系统(如HDFS)或云存储服务(如Amazon S3)。* 一旦应用程序出现故障或崩溃,它可以从最近的检查点中恢复状态,并从上次处理的位置继续处理数据,从而确保数据的完整性和一致性。*///检查点的路径如果是本地路径要+ file:// 否则认为是 hdfs路径 / 开头ssc.checkpoint("file:///D://IdeaProject/SparkStudy/data/")  //设置检查点,检查点具有容错机制val lines: DStream[String] = ssc.socketTextStream("localhost",9999)val word_kv = lines.map((_, 1))val stateDStream: DStream[(String, Int)] = word_kv.updateStateByKey(/** 参数是一个函数1. Seq[Int]: 当前key对应的所有value值的集合,因为我们的value是Int,所以这里也是Int2. Option[Int]: 当前key的历史状态,对于wordCount,历史值就是上一个DStream中这个key的value计算结果(求和结果)Option 是 Scala 中用来表示可能存在或可能不存在的值的容器,是一种避免空引用(null reference)问题的模式。Option[Int] 有两个可能的实例:(1) Some(value: Int):表示一个包含 Int 类型值的 Option。(2) None:表示一个空的 Option,不包含任何值。**/(values: Seq[Int], state: Option[Int]) => {val currentCount = values.foldLeft(0)(_ + _)val previousCount = state.getOrElse(0)Option(currentCount + previousCount)})stateDStream.print()stateDStream.saveAsTextFiles("./out") //输出结果保存到 文本文件中ssc.start()ssc.awaitTermination()}
}

4、输出操作

4.1、输出到文本文件

上面 3.1.2 中就保存DStream输出到了本地:

stateDStream.saveAstextFiles("./out")

4.2、输出到MySQL数据库

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}import java.sql.{Connection, PreparedStatement}object NetWorkWordCountStateMySQL {def main(args: Array[String]): Unit = {val updateFunc = (values: Seq[Int],state: Option[Int]) => {val currentCount = values.foldLeft(0)(_+_)val previousCount = state.getOrElse(0)Some(currentCount + previousCount)}val conf = new SparkConf().setMaster("local[*]").setAppName("state mysql")val ssc = new StreamingContext(conf,Seconds(5))// file:\\ 代表本地文件系统 如果用的是 /user/... 这种形式是 HDFS 文件系统 需要启动Hadoop集群ssc.checkpoint("file:\\D:\\IdeaProjects\\SparkStudy\\data\\state")val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)val word_kv: DStream[(String, Int)] = lines.flatMap(_.split(" ").map((_, 1))).reduceByKey(_ + _)val stateDStream: DStream[(String, Int)] = word_kv.updateStateByKey[Int](updateFunc)stateDStream.print()stateDStream.foreachRDD( rdd=> {def func(records: Iterator[(String,Int)]): Unit ={var conn: Connection = nullvar stmt: PreparedStatement = nulltry{conn = DBUtils.getConnection("jdbc:mysql://127.0.0.1:3306/spark","root","Yan1029.")records.foreach(p=>{val sql = "insert into wordcount values (?,?)"stmt = conn.prepareStatement(sql)stmt.setString(1,p._1.trim)stmt.setInt(2,p._2)stmt.executeUpdate()    //不executeUpdate就不会写入数据库})}catch {case e: Exception => e.printStackTrace()}finally {
//          if (stmt!=null) stmt.close()
//          DBUtils.close()}}val repartitionedRDD: RDD[(String,Int)] = rdd.repartition(3)  //扩大分区用 repartitionrepartitionedRDD.foreachPartition(func)})ssc.start()ssc.awaitTermination()}}

运行结果:

5、优雅的关闭和恢复数据

5.1、关闭SparkStreaming

        流式任务通常都需要7*24小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所以配置优雅的关闭就显得至关重要了。
       关闭方式:我们通常使用外部文件系统来控制内部程序关闭。

package com.lyhimport org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}import java.net.URIobject SparkStreaming06_Close {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming close")val ssc = new StreamingContext(conf,Seconds(3))val lines:DStream[String] = ssc.socketTextStream("localhost", 9999)val word_kv = lines.map((_, 1))word_kv.print()ssc.start()// 再创建一个线程去关闭new Thread(new MonitorStop(ssc)).start()ssc.awaitTermination()  //阻塞当前main线程}
}class MonitorStop(ssc: StreamingContext) extends Runnable{override def run(): Unit = {while (true){ // 一直轮询判断Thread.sleep(5000)  //每5s检查一遍val fs: FileSystem = FileSystem.get(new URI("hdfs://hadoop102:9000"),new Configuration(),"lyh")val exists: Boolean = fs.exists(new Path("hdfs://hadoop102:9000/stopSpark"))if (exists) { //如果比如(MySQL出现了一行数据、Zookeeper的某个节点出现变化、hdfs是否存在某个目录...)就关闭val state: StreamingContextState = ssc.getState()if (state == StreamingContextState.ACTIVE){// 优雅地关闭-处理完当前的数据再关闭// 计算节点不再接受新的数据,而是把现有的数据处理完毕,然后关闭ssc.stop(true,true)System.exit(0)}}}}
}

5.2、恢复检查点的数据

使用 getActiveOrCreate 的方法来对上一个失败的 Spark 任务进行数据恢复(通过检查点来进行恢复)

方法说明:

        若Application为首次重启,将创建一个新的StreamingContext实例;如果Application从失败中重启,从checkpoint目录导入checkpoint数据来重新创建StreamingContext实例。

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}import java.net.URIobject SparkStreaming07_Resume {def main(args: Array[String]): Unit = {//好处:若Application为首次重启,将创建一个新的StreamingContext实例;如果Application从失败中重启,从checkpoint目录导入checkpoint数据来重新创建StreamingContext实例。val ssc: StreamingContext = StreamingContext.getActiveOrCreate("file:\\D:\\IdeaProjects\\SparkStudy\\data\\state", () => {val conf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming resume")val ssc = new StreamingContext(conf, Seconds(3))val lines: DStream[String] = ssc.socketTextStream("localhost", 9999)val word_kv = lines.map((_, 1))word_kv.print()ssc})// 依然设置检查点 防止application失败后丢失数据ssc.checkpoint("file:\\D:\\IdeaProjects\\SparkStudy\\data\\state")ssc.start()ssc.awaitTermination()  //阻塞当前main线程}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/118572.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HTML笔记-狂神

1. 初识HTML 什么是HTML&#xff1f; Hyper Text Markup Language : 超文本标记语言 超文本包括&#xff1a;文字、图片、音频、视频、动画等 目前使用的是HTML5&#xff0c;使用 W3C标准 W3C标准包括&#xff1a; 结构化标准语言&#xff08;HTML、XML&#xff09; 表现标…

Kafak - 单机/集群快速安装指北(3.x版本)

文章目录 官方下载地址上传安装包解压安装包到指定目录修改解压包名为kafka修改config目录下的配置文件server.propertie配置环境变量其他机器同上 - 修改配置文件中的brokerid启动集群停止Kraft 方式部署集群----(不使用zookeeper) 官方下载地址 http://kafka.apache.org/dow…

【ROS入门】机器人系统仿真——相关组件以及URDF集成Rviz

文章结构 相关组件URDF(Unified Robot Description Format)——创建机器人模型Gazebo——搭建仿真环境Rviz(ROS Visualization Tool)——显示机器人各种传感器感知到的环境信息 URDF集成RvizURDF相关语法robotlinkjoint URDF优化——xacro相关语法属性与算数运算宏文件包含 实操…

工作:三菱伺服驱动器连接参数及其电机钢性参数配置与调整

工作&#xff1a;三菱伺服驱动器参数及电机钢性参数配置与调整 一、三菱PLC与伺服驱动器连接参数的设置 1. 伺服配置 单个JET伺服从站链接侧占用点数:Rx/Ry占用64点、RWw/RWr占用32点 图中配置了22个JET伺服从站&#xff0c;占用点数:Rx/Ry占用64222048‬点、RWw/RWr占用322…

基于YOLOv8模型和UA-DETRAC数据集的车辆目标检测系统(PyTorch+Pyside6+YOLOv8模型)

摘要&#xff1a;基于YOLOv8模型和UA-DETRAC数据集的车辆目标检测系统可用于日常生活中检测与定位汽车&#xff08;car&#xff09;、公共汽车&#xff08;bus&#xff09;、面包车&#xff08;vans&#xff09;等目标&#xff0c;利用深度学习算法可实现图片、视频、摄像头等方…

金融行业网络安全保护与三级等保合规实施方案

金融行业网络安全保护与三级等保合规实施方案旨在帮助金融机构实施符合三级等保标准的网络安全保护措施。以下是一个基本的实施方案概述&#xff1a; 评估和规划&#xff1a; 进行风险评估&#xff1a;评估网络系统的风险&#xff0c;确定安全威胁、弱点和潜在风险。 确定等级…

Node学习笔记之MongoDB

一、简介 1.1 Mongodb 是什么 MongoDB 是一个基于分布式文件存储的数据库&#xff0c;官方地址 MongoDB: The Developer Data Platform | MongoDB 1.2 为什么选择 Mongodb 操作语法与 JavaScript 类似&#xff0c;容易上手&#xff0c;学习成本低 二、核心概念 Mongodb 中…

css 三栏布局的实现?

目录 前言 用法 代码 理解 高质量图片 1. 左侧栏 - 导航菜单 2. 中间栏 - 主要内容 3. 右侧栏 - 小部件和广告 布局的响应式设计 三栏布局在前端页面设计中是一个常见的布局方式&#xff0c;通常包含左侧、中间和右侧三个部分。这种布局方式在多种场景中都很受欢迎&am…

node实战——搭建带swagger接口文档的后端koa项目(node后端就业储备知识)

文章目录 ⭐前言⭐初始化项目⭐配置router目录自动扫描路由⭐swagger文件配置自动生成json文件⭐封装扫描目录路由加入swagger⭐配置项目入口总文件⭐运行效果⭐总结⭐结束⭐前言 大家好,我是yma16,本文分享关于node实战——搭建带swagger接口文档的后端koa项目(node后端就…

使用Redis部署 PHP 留言板应用

使用Redis部署 PHP 留言板应用 启动 Redis 领导者&#xff08;Leader&#xff09;启动两个 Redis 跟随者&#xff08;Follower&#xff09;公开并查看前端服务清理 启动 Redis 数据库 创建 Redis Deployment apiVersion: apps/v1 kind: Deployment metadata:name: redis-le…

机器学习之ROC与AUC

文章目录 定义ROC曲线&#xff1a;AUC&#xff08;Area Under the ROC Curve&#xff09;&#xff1a; 定义 ROC&#xff08;Receiver Operating Characteristic&#xff09;曲线和AUC&#xff08;Area Under the ROC Curve&#xff09;是用于评估二分类模型性能的重要工具。 …

kuaishou web端did注册激活 学习记录

快手web端 did 注册激活的流程大概如下&#xff1a; 1.访问web端的接口&#xff0c;主动触发滑块&#xff0c;拿到滑块信息 2.然后滑块验证did 获取captchaToken 3.携带captchaToken访问接口 4.最后校验web端的did 是否激活 最后激活以后的效果如下&#xff1a; 经过测试&…

查找算法-斐波那契查找法(Fibonacci Search)

目录 查找算法-斐波那契查找法&#xff08;Fibonacci Search&#xff09; 1、说明 2、算法分析 3、C代码 查找算法-斐波那契查找法&#xff08;Fibonacci Search&#xff09; 1、说明 斐波那契查找法又称为斐氏查找法&#xff0c;此查找法和二分法一样都是以分割范围来进…

有六家机器视觉公司今年11月份初放假到明年春节后,除夕不放假看住企业不跑路,不倒闭,明年大家日子会越来越甜

不幸的消息一个接着一个&#xff0c;请大家注意下面的消息 我已经收到已经有6家机器视觉公司今年11月份初放假到明年春节后&#xff0c;他们真的没有订单了&#xff0c;其中4家宣布员工可以自行寻找工作&#xff0c;今年除夕不放假是经济下行经济考量吗&#xff1f;看住企业不…

如何避免阿里云对象储存OSS被盗刷

网站app图片的云端存储离不开对象存储oss,而最难为的问题就是app做的出名了&#xff0c;少不了同行的攻击&#xff0c;包含ddos&#xff0c;cc攻击以及oss外链被盗刷&#xff01; 防盗链功能通过设置Referer白名单以及是否允许空Referer&#xff0c;限制仅白名单中的域名可以访…

OpenFeign实现分析、源码解析

什么是openfeign? 是springcloud全家桶的组件之一&#xff0c;其核心作用是为Rest API提供高效简洁的rpc调用方式。 为什么只定义接口而没有实现类&#xff1f; 源码解读&#xff08;省略&#xff09; 总结&#xff1a; 源码分析&#xff1a;如何发送http请求&#xff1f; …

竞赛 深度学习卫星遥感图像检测与识别 -opencv python 目标检测

文章目录 0 前言1 课题背景2 实现效果3 Yolov5算法4 数据处理和训练5 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; **深度学习卫星遥感图像检测与识别 ** 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学长非常推荐…

vscode json文件添加注释报错

在vscode中创建json文件&#xff0c;想要注释一波时&#xff0c;发现报了个错&#xff1a;Comments are not permitted in JSON. (521)&#xff0c;意思是JSON中不允许注释 以下为解决方法&#xff1a; 在vscode的右下角中找到这个&#xff0c;点击 在出现的弹窗中输入json wit…

selenium4 元素定位

selenium4 9种元素定位 ID driver.find_element(By.ID,"kw")NAME driver.find_element(By.NAME,"tj_settingicon")CLASS_NAME driver.find_element(By.CLASS_NAME,"ipt_rec")TAG_NAME driver.find_element(By.TAG_NAME,"area")LINK_T…

python time 模块

时间的三种格式time模块中的其他函数时间三种格式之间的转化 一&#xff0c;时间的三种模块 在python中对于时间的描述存在三种格式&#xff1a;1&#xff0c;时间戳&#xff0c;2&#xff0c;时间结构体 3&#xff0c;按某种格式形式展示的字符串 1,时间戳 import time pr…