Spark Streaming源码分析 – DStream

A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous sequence of RDDs (of the same type) representing a continuous stream of data.
Dstream本质就是离散化的stream,将stream离散化成一组RDD的list,所以基本的操作仍然是以RDD为基础
下面看到DStream的基本定义,对于普通的RDD而言,时间对于DStream是更为重要的因素
将stream切分成RDD的interval时间,stream开始的时间,DStream需要保留的RDD的时间,每个RDD所对于的时间key……

DStream抽象定义

/*** A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous* sequence of RDDs (of the same type) representing a continuous stream of data (see* org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs).* DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume,* etc.) using a [[org.apache.spark.streaming.StreamingContext]] or it can be generated by* transforming existing DStreams using operations such as `map`,* `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream* periodically generates a RDD, either from live data or by transforming the RDD generated by a* parent DStream.** This class contains the basic operations available on all DStreams, such as `map`, `filter` and* `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains* operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and* `join`. These operations are automatically available on any DStream of pairs* (e.g., DStream[(Int, Int)] through implicit conversions when* `org.apache.spark.streaming.StreamingContext._` is imported.** DStreams internally is characterized by a few basic properties:*  - A list of other DStreams that the DStream depends on*  - A time interval at which the DStream generates an RDD*  - A function that is used to generate an RDD after each time interval*/abstract class DStream[T: ClassTag] (@transient private[streaming] var ssc: StreamingContext) extends Serializable with Logging {// =======================================================================// Methods that should be implemented by subclasses of DStream// =======================================================================/** Time interval after which the DStream generates a RDD */def slideDuration: Duration   // 将stream切分成RDD的interval/** List of parent DStreams on which this DStream depends on */def dependencies: List[DStream[_]] // 和RDD一样,DStream之间也存在dependency关系/** Method that generates a RDD for the given time */def compute (validTime: Time): Option[RDD[T]] // RDD的生成逻辑// =======================================================================// Methods and fields available on all DStreams// =======================================================================// RDDs generated, marked as private[streaming] so that testsuites can access it@transientprivate[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] () // 最为核心的结构,可以看到DStream就是以time为key的RDD的hashmap// Time zero for the DStreamprivate[streaming] var zeroTime: Time = null // Stream开始的时间// Duration for which the DStream will remember each RDD createdprivate[streaming] var rememberDuration: Duration = null // Stream是无限的,而在DStream不可能保留所有的RDD,所以设置DStream需要remember的duration// Storage level of the RDDs in the streamprivate[streaming] var storageLevel: StorageLevel = StorageLevel.NONE// Checkpoint detailsprivate[streaming] val mustCheckpoint = falseprivate[streaming] var checkpointDuration: Duration = nullprivate[streaming] val checkpointData = new DStreamCheckpointData(this)// Reference to whole DStream graphprivate[streaming] var graph: DStreamGraph = null // DStreamGraph// Duration for which the DStream requires its parent DStream to remember each RDD createdprivate[streaming] def parentRememberDuration = rememberDuration/** Return the StreamingContext associated with this DStream */def context = ssc /** Persist the RDDs of this DStream with the given storage level */def persist(level: StorageLevel): DStream[T] = {this.storageLevel = levelthis}/** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)/** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */def cache(): DStream[T] = persist()/*** Enable periodic checkpointing of RDDs of this DStream* @param interval Time interval after which generated RDD will be checkpointed*/def checkpoint(interval: Duration): DStream[T] = {persist()checkpointDuration = intervalthis}
}


getOrCompute
注意的是,这里是产生RDD对象,而不是真正的进行计算,只有在runjob时才会做真正的计算
Spark RDD本身是不包含具体数据的,只是定义了workflow(依赖关系),处理逻辑

  /*** Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal* method that should not be called directly.*/private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {// If this DStream was not initialized (i.e., zeroTime not set), then do it// If RDD was already generated, then retrieve it from HashMapgeneratedRDDs.get(time) match {// If an RDD was already generated and is being reused, then// probably all RDDs in this DStream will be reused and hence should be cachedcase Some(oldRDD) => Some(oldRDD)// if RDD was not generated, and if the time is valid// (based on sliding time of this DStream), then generate the RDDcase None => { // 需要computeif (isTimeValid(time)) { // invalid的定义,(time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)compute(time) match { // 使用compute生成RDD对象case Some(newRDD) =>if (storageLevel != StorageLevel.NONE) {newRDD.persist(storageLevel) // 设置persist level}if (checkpointDuration != null &&(time - zeroTime).isMultipleOf(checkpointDuration)) {newRDD.checkpoint() // 设置checkpoint}generatedRDDs.put(time, newRDD) // 将产生的RDD对象放入generatedRDDsSome(newRDD)case None =>None}} else {None}}}}


generateJob
对于用getOrCompute产生的RDD对象,需要封装成job
而Job的关键,jobFunc,其实就是想Spark集群提交一个job
这里只是使用了emptyFunc,具体的output逻辑是需要被具体的outputDStream改写的

  /*** Generate a SparkStreaming job for the given time. This is an internal method that* should not be called directly. This default implementation creates a job* that materializes the corresponding RDD. Subclasses of DStream may override this* to generate their own jobs.*/private[streaming] def generateJob(time: Time): Option[Job] = {getOrCompute(time) match {case Some(rdd) => {val jobFunc = () => {val emptyFunc = { (iterator: Iterator[T]) => {} }context.sparkContext.runJob(rdd, emptyFunc)}Some(new Job(time, jobFunc))}case None => None}}


clearMetadata
清除过时的RDD对象,其中还会做unpersist,以及调用dependencies的clearMetadata

  /*** Clear metadata that are older than `rememberDuration` of this DStream.* This is an internal method that should not be called directly. This default* implementation clears the old generated RDDs. Subclasses of DStream may override* this to clear their own metadata along with the generated RDDs.*/private[streaming] def clearMetadata(time: Time) {val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))generatedRDDs --= oldRDDs.keysif (ssc.conf.getBoolean("spark.streaming.unpersist", false)) {oldRDDs.values.foreach(_.unpersist(false))}dependencies.foreach(_.clearMetadata(time))}

具体DStream的定义

FilteredDStream

package org.apache.spark.streaming.dstreamprivate[streaming]
class FilteredDStream[T: ClassTag](parent: DStream[T],filterFunc: T => Boolean) extends DStream[T](parent.ssc) {override def dependencies = List(parent)override def slideDuration: Duration = parent.slideDurationoverride def compute(validTime: Time): Option[RDD[T]] = {parent.getOrCompute(validTime).map(_.filter(filterFunc))}
}

 

WindowedDStream

image

private[streaming]
class WindowedDStream[T: ClassTag](parent: DStream[T],_windowDuration: Duration,_slideDuration: Duration)extends DStream[T](parent.ssc) {// Persist parent level by default, as those RDDs are going to be obviously reused.parent.persist(StorageLevel.MEMORY_ONLY_SER) //默认将parentRDD设置persist,因为parent RDD会在window slide中被反复读到def windowDuration: Duration =  _windowDuration // Windows大小override def dependencies = List(parent)override def slideDuration: Duration = _slideDuration // Windows滑动override def parentRememberDuration: Duration = rememberDuration + windowDuration // 保证RememberDuratioin一定大于windowDurationoverride def persist(level: StorageLevel): DStream[T] = {// Do not let this windowed DStream be persisted as windowed (union-ed) RDDs share underlying// RDDs and persisting the windowed RDDs would store numerous copies of the underlying data.// Instead control the persistence of the parent DStream.// 不要直接persist windowed RDDS,而是去persist parent RDD,原因是各个windows RDDs之间有大量的重复数据,直接persist浪费空间parent.persist(level)this}override def compute(validTime: Time): Option[RDD[T]] = {val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) //计算窗口intevalval rddsInWindow = parent.slice(currentWindow)val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow)} else {new UnionRDD(ssc.sc,rddsInWindow) //本质就是把parent DStream窗口内的RDD做union}Some(windowRDD)}
}

 

ShuffledDStream

private[streaming]
class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag](parent: DStream[(K,V)],createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiner: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true) extends DStream[(K,C)] (parent.ssc) {override def dependencies = List(parent)override def slideDuration: Duration = parent.slideDurationoverride def compute(validTime: Time): Option[RDD[(K,C)]] = {parent.getOrCompute(validTime) match {case Some(rdd) => Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))case None => None}}
}

 

PairDStreamFunctions
以groupByKey为例,和普通Spark里面没啥区别,依赖是基于combineByKey实现
比较有特点是提供groupByKeyAndWindow,其实就是先使用WindowedDStream将windows中的RDD union,然后再使用combineByKey

/*** Extra functions available on DStream of (key, value) pairs through an implicit conversion.* Import `org.apache.spark.streaming.StreamingContext._` at the top of your program to use* these functions.*/
class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])extends Serializable {private[streaming] def ssc = self.sscprivate[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism)= {new HashPartitioner(numPartitions)}/*** Return a new DStream by applying `groupByKey` on each RDD. The supplied* [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.*/def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {val createCombiner = (v: V) => ArrayBuffer[V](v)val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v)val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2)combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner).asInstanceOf[DStream[(K, Seq[V])]]}/*** Combine elements of each key in DStream's RDDs using custom functions. This is similar to the* combineByKey for RDDs. Please refer to combineByKey in* org.apache.spark.rdd.PairRDDFunctions in the Spark core documentation for more information.*/def combineByKey[C: ClassTag](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiner: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true): DStream[(K, C)] = {new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner,mapSideCombine)}
}

groupByKeyAndWindow

  /*** Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream.* Similar to `DStream.groupByKey()`, but applies it over a sliding window.* @param windowDuration width of the window; must be a multiple of this DStream's*                       batching interval* @param slideDuration  sliding interval of the window (i.e., the interval after which*                       the new DStream will generate RDDs); must be a multiple of this*                       DStream's batching interval* @param partitioner    partitioner for controlling the partitioning of each RDD in the new*                       DStream.*/def groupByKeyAndWindow(windowDuration: Duration,slideDuration: Duration,partitioner: Partitioner): DStream[(K, Seq[V])] = {val createCombiner = (v: Seq[V]) => new ArrayBuffer[V] ++= vval mergeValue = (buf: ArrayBuffer[V], v: Seq[V]) => buf ++= vval mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2self.groupByKey(partitioner).window(windowDuration, slideDuration) // DStream.window会将当前的dstream封装成WindowedDStream,见下面的代码.combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, partitioner).asInstanceOf[DStream[(K, Seq[V])]]}
  /*** Return a new DStream in which each RDD contains all the elements in seen in a* sliding window of time over this DStream.* @param windowDuration width of the window; must be a multiple of this DStream's*                       batching interval* @param slideDuration  sliding interval of the window (i.e., the interval after which*                       the new DStream will generate RDDs); must be a multiple of this*                       DStream's batching interval*/def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = {new WindowedDStream(this, windowDuration, slideDuration)}

 

updateStateByKey

  /*** Return a new "state" DStream where the state for each key is updated by applying* the given function on the previous state of the key and the new values of each key.* [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.* @param updateFunc State update function. If `this` function returns None, then*                   corresponding state key-value pair will be eliminated. Note, that*                   this function may generate a different a tuple with a different key*                   than the input key. It is up to the developer to decide whether to*                   remember the partitioner despite the key being changed.* @param partitioner Partitioner for controlling the partitioning of each RDD in the new*                    DStream* @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.* @tparam S State type*/def updateStateByKey[S: ClassTag](updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],partitioner: Partitioner,rememberPartitioner: Boolean): DStream[(K, S)] = {new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner)}

StateDStream
普通的DStream,都是直接从ParentRDD通过compute来得到当前的RDD
而StateDStream的特别之处,除了ParentRDD,还需要参考PreviousRDD,这个只存在在stream场景下,只有这个场景下,RDD之间才存在时间关系
PreviousRDD = getOrCompute(validTime - slideDuration),即在DStream的generatedRDDs上前一个时间interval上的RDD
处理函数,val finalFunc = (iterator: Iterator[(K, (Seq[V], Seq[S]))]) => { },需要3个参数,key,ParentRDD上的value,PreviousRDD上的value
处理函数需要考虑,当ParentRDD或PreviousRDD为空的情况

注意StateDStream,默认需要做persist和checkpoint

private[streaming]
class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](parent: DStream[(K, V)],updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],partitioner: Partitioner,preservePartitioning: Boolean) extends DStream[(K, S)](parent.ssc) {super.persist(StorageLevel.MEMORY_ONLY_SER) // RDD persist默认设为memory,因为后面的RDD需要用到override def dependencies = List(parent)override def slideDuration: Duration = parent.slideDurationoverride val mustCheckpoint = true  // 默认需要checkpoint,需要保持状态override def compute(validTime: Time): Option[RDD[(K, S)]] = {// Try to get the previous state RDDgetOrCompute(validTime - slideDuration) match {case Some(prevStateRDD) => {    // If previous state RDD exists// Try to get the parent RDDparent.getOrCompute(validTime) match {  // 既有PreviousRDD,又有ParentRDD的casecase Some(parentRDD) => {   // If parent RDD exists, then compute as usual// Define the function for the mapPartition operation on cogrouped RDD;// first map the cogrouped tuple to tuples of required type,// and then apply the update functionval updateFuncLocal = updateFuncval finalFunc = (iterator: Iterator[(K, (Seq[V], Seq[S]))]) => {val i = iterator.map(t => {(t._1, t._2._1, t._2._2.headOption)})updateFuncLocal(i)}val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner) //`(k, a) cogroup (k, b)` produces k -> Seq(ArrayBuffer as, ArrayBuffer bs) val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning)Some(stateRDD)}case None => {    // If parent RDD does not exist,ParentRDD不存在// Re-apply the update function to the old state RDDval updateFuncLocal = updateFuncval finalFunc = (iterator: Iterator[(K, S)]) => {val i = iterator.map(t => (t._1, Seq[V](), Option(t._2))) // 直接把ParentRDD置空,Seq[V]()updateFuncLocal(i)}val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning)Some(stateRDD)}}}case None => {    // If previous session RDD does not exist (first input data)// Try to get the parent RDDparent.getOrCompute(validTime) match {case Some(parentRDD) => {   // If parent RDD exists, then compute as usual,PreviousRDD为空的case,说明是第一个state RDD// Define the function for the mapPartition operation on grouped RDD;// first map the grouped tuple to tuples of required type,// and then apply the update functionval updateFuncLocal = updateFuncval finalFunc = (iterator: Iterator[(K, Seq[V])]) => {updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2, None))) // 把PreviousRDD置为None}val groupedRDD = parentRDD.groupByKey(partitioner)val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning)//logDebug("Generating state RDD for time " + validTime + " (first)")Some(sessionRDD)}case None => { // If parent RDD does not exist, then nothing to do!,previous和parent都没有,当然啥也做不了//logDebug("Not generating state RDD (no previous state, no parent)")None}}}}}
}

 

TransformedDStream
首先这是个比较通用的operation,可以通过自定义的transformFunc,将一组parentRDDs计算出当前的RDD
需要注意的是,这些parentRDDs必须在同一个streamContext下,并且有相同的slideDuration
在DStream接口中,可以提供transform和transformWith两种,参考下面源码

private[streaming]
class TransformedDStream[U: ClassTag] (parents: Seq[DStream[_]],transformFunc: (Seq[RDD[_]], Time) => RDD[U]) extends DStream[U](parents.head.ssc) {require(parents.length > 0, "List of DStreams to transform is empty")require(parents.map(_.ssc).distinct.size == 1, "Some of the DStreams have different contexts")require(parents.map(_.slideDuration).distinct.size == 1,"Some of the DStreams have different slide durations")override def dependencies = parents.toListoverride def slideDuration: Duration = parents.head.slideDurationoverride def compute(validTime: Time): Option[RDD[U]] = {val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeqSome(transformFunc(parentRDDs, validTime))}
}
  /*** Return a new DStream in which each RDD is generated by applying a function* on each RDD of 'this' DStream.*/def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {val cleanedF = context.sparkContext.clean(transformFunc)val realTransformFunc =  (rdds: Seq[RDD[_]], time: Time) => {assert(rdds.length == 1)cleanedF(rdds.head.asInstanceOf[RDD[T]], time)}new TransformedDStream[U](Seq(this), realTransformFunc) // this,单个RDD}/*** Return a new DStream in which each RDD is generated by applying a function* on each RDD of 'this' DStream and 'other' DStream.*/def transformWith[U: ClassTag, V: ClassTag](other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]): DStream[V] = {val cleanedF = ssc.sparkContext.clean(transformFunc)val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {assert(rdds.length == 2)val rdd1 = rdds(0).asInstanceOf[RDD[T]]val rdd2 = rdds(1).asInstanceOf[RDD[U]]cleanedF(rdd1, rdd2, time)}new TransformedDStream[V](Seq(this, other), realTransformFunc) // this and other,多个RDDs}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/546850.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PowerShell遍历文件夹下的子文件夹和文件

PowerShell遍历文件夹下的子文件夹和文件是一件很容易的事儿。Get-ChildItem这个cmdlet就有一个recurse参数是用于遍历文件夹的。 PowerShell中&#xff0c;使用Get-ChildItem来获取文件夹下面的子文件夹和文件&#xff08;当然&#xff0c;它的功能不仅于此&#xff09;。然后…

Spring Boot(十三)RabbitMQ安装与集成

一、前言 RabbitMQ是一个开源的消息代理软件&#xff08;面向消息的中间件&#xff09;&#xff0c;它的核心作用就是创建消息队列&#xff0c;异步接收和发送消息&#xff0c;MQ的全程是&#xff1a;Message Queue中文的意思是消息队列。 1.1 使用场景 削峰填谷&#xff1a;用…

C++ DNN Opencv3.4 实现人脸计数和人脸检测

前言 OpenCV 3.3正式发布后&#xff0c;对深度学习&#xff08;dnn模块&#xff09;提供了更好的支持&#xff0c;dnn模块目前支持Caffe、TensorFlow、Torch、PyTorch等深度学习框架。 另外&#xff0c;新版本中使用预训练深度学习模型的API同时兼容C和Python&#xff0c;让系…

C++ SVM Opencv3.4实现人脸检测

很通俗的来说&#xff0c;haar算法计算特征就是用一块区域内黑色的值减去白色的值。但是一张图片像素点是非常多的&#xff0c;如果用普通的方法去计算一块区域的值&#xff0c;效率相当低下。这里有一种加速计算的方法--积分图&#xff1a;定义如下&#xff1a;&#xff08;维…

Spring Boot(十四)RabbitMQ延迟队列

一、前言 延迟队列的使用场景&#xff1a;1.未按时支付的订单&#xff0c;30分钟过期之后取消订单&#xff1b;2.给活跃度比较低的用户间隔N天之后推送消息&#xff0c;提高活跃度&#xff1b;3.过1分钟给新注册会员的用户&#xff0c;发送注册邮件等。 实现延迟队列的方式有…

三、Win10 64位PyCharm下打包.py程序为可执行exe文件且兼容32位和64位

WIN10 64位下Pycharm打包.py程序为可执行文件exe 上面衔接WIN10 64位下Pycharm打包.py程序为可执行文件exe,存在不兼容32位和64位的情况。 下面Win10 64位PyCharm下打包.py程序为可执行exe文件且兼容32位和64位说明: 前提条件 python3.8.2 32 位;注意:原来有 64 位 Pyth…

Java核心(一)线程Thread详解

一、概述 在开始学习Thread之前&#xff0c;我们先来了解一下 线程和进程之间的关系&#xff1a; 线程(Thread)是进程的一个实体&#xff0c;是CPU调度和分派的基本单位。 线程不能够独立执行&#xff0c;必须依存在应用程序中&#xff0c;由应用程序提供多个线程执行控制。 线…

Jetson Nano配置与使用(5)cuda测试及tensorflow gpu安装

Jetson Nano利用官方镜像进行安装后&#xff0c;系统已经安装好了JetPack&#xff0c;cuda&#xff0c;cudaa&#xff0c;OpenCV等组件&#xff0c;不过需要修改下环境变量才可以使用。 1.修改环境变量 利用vim打开 ~ 路径下.bashrc文件&#xff1a; sudo vi ~./bashrc文件的…

Java核心(二)深入理解线程池ThreadPool

本文你将获得以下信息&#xff1a; 线程池源码解读线程池执行流程分析带返回值的线程池实现延迟线程池实现 为了方便读者理解&#xff0c;本文会由浅入深&#xff0c;先从线程池的使用开始再延伸到源码解读和源码分析等高级内容&#xff0c;读者可根据自己的情况自主选择阅读…

Jetson Nano安装pytorch 基于torch1.6和torchvision0.7

需要注意的是&#xff0c;博主使用的是win10主机&#xff0c;通过局域网连接的jetson nano&#xff0c; 其中jetson nano的预制CUDA版本为10.2 Jetpack 4.1.1 分别执行以下命令&#xff0c;即可查看自己的jetson nano 预搭载的CUDA版本 sudo pip3 install jetson-stats sudo …

【Jetson-Nano】2.Tensorflow和Pytorch的安装

文章目录 1、Tensorflow多版本安装 1.1 Protobuf 安装1.2 安装依赖包及tensorflow1.151.3 安装其它常用库1.4 测试python包是否安装成功1.5 TensorRT和Opencv的安装1.6 pycuda和onnx安装1.7 Tensorflow2.3安装2、Pytorch安装 2.1 安装pytroch和torchvision2.2 安装环境验证参考…

Spring Boot 终极清单

一、Spring Boot 终极清单诞生原因我上学那会主要学的是 Java 和 .Net 两种语言&#xff0c;当时对于语言分类这事儿没什么概念&#xff0c;恰好在2009年毕业那会阴差阳错的先找到了 .Net 的工作&#xff0c;此后就开始了漫长的 .Net 编程之旅&#xff0c;说实话最初的“编程思…

简单的喷淋实验--嵌入式实训

目录 喷淋实验--嵌入式实训 1.MQTT通信原理 2.MQTT库的移植 3.代码流程 运行视频如下: 喷淋实验--嵌入式实训 1.MQTT通信原理 MQTT&#xff08;Message Queuing Telemetry Transport&#xff09;是一种轻量级的发布/订阅消息传输协议&#xff0c;旨在提供可靠、高效的通信…

Yolov5系列AI常见数据集(1)车辆,行人,自动驾驶,人脸,烟雾

下述所有数据可在下方二维码公众号回复&#xff1a; 数据大礼包 获得&#xff01;&#xff01;&#xff01; Fashion-MNIST图像数据集&#xff08;200.4MB&#xff09; 每个训练和测试样本都按照以下类别进行了标注&#xff1a; 标注编号描述0T-shirt/top&#xff08;T恤&…

Java核心(四)你不知道的数据集合

导读&#xff1a;Map竟然不属于Java集合框架的子集&#xff1f;队列也和List一样属于集合的三大子集之一&#xff1f;更有队列的正确使用姿势&#xff0c;一起来看吧&#xff01; Java中的集合通常指的是Collection下的三个集合框架List、Set、Queue和Map集合&#xff0c;Map并…

【Jetson-Nano】2.Tensorflow object API和Pytorch的安装

文章目录 1、Tensorflow多版本安装 1.1 Protobuf 安装1.2 安装依赖包及tensorflow1.151.3 安装其它常用库1.4 测试python包是否安装成功1.5 TensorRT和Opencv的安装1.6 pycuda和onnx安装1.7 Tensorflow2.3安装2、Pytorch安装 2.1 安装pytroch和torchvision2.2 安装环境验证参考…

Java核心(五)深入理解BIO、NIO、AIO

导读&#xff1a;本文你将获取到&#xff1a;同/异步 阻/非阻塞的性能区别&#xff1b;BIO、NIO、AIO 的区别&#xff1b;理解和实现 NIO 操作 Socket 时的多路复用&#xff1b;同时掌握 IO 最底层最核心的操作技巧。 BIO、NIO、AIO 的区别是什么&#xff1f; 同/异步、阻/非阻…

pyqt5让主窗口居中显示(显示在显示器的中间位置)

原文&#xff1a;https://blog.csdn.net/zzx188891020/article/details/105940024 课程重点&#xff1a; 就是让窗口居中显示 # QDesktopWidget import sys from PyQt5.QtWidgets import QDesktopWidget,QMainWindow,QApplication from PyQt5.QtGui import QIconclass Cente…

Basic4android v3.50 发布

这次发布的主要是debug 的增强。说实话&#xff0c;在这一方面B4a 比delphi做的要好。希望delphi 在新的版本里面 能进一步加强。 Im happy to release Basic4android v3.50. This update brings major improvements to the debugging features of Basic4android. With this up…

荔枝派 Nano 全志 F1C100s 编译运行 Linux ubuntu并升级gcc

首先是荔枝派的官方文档&#xff0c;写的不是很细&#xff0c;应当说我们必须明确几点&#xff1a; 出厂时 SPI Flash 自带了一个 U-BootLinux Kernel&#xff08;出厂的时候可能烧过了&#xff09;&#xff0c;可直接拿来用。如果希望自己烧固件&#xff0c;才需要后续步骤必…