Spark Streaming源码分析 – DStream

A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous sequence of RDDs (of the same type) representing a continuous stream of data.
Dstream本质就是离散化的stream,将stream离散化成一组RDD的list,所以基本的操作仍然是以RDD为基础
下面看到DStream的基本定义,对于普通的RDD而言,时间对于DStream是更为重要的因素
将stream切分成RDD的interval时间,stream开始的时间,DStream需要保留的RDD的时间,每个RDD所对于的时间key……

DStream抽象定义

/*** A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous* sequence of RDDs (of the same type) representing a continuous stream of data (see* org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs).* DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume,* etc.) using a [[org.apache.spark.streaming.StreamingContext]] or it can be generated by* transforming existing DStreams using operations such as `map`,* `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream* periodically generates a RDD, either from live data or by transforming the RDD generated by a* parent DStream.** This class contains the basic operations available on all DStreams, such as `map`, `filter` and* `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains* operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and* `join`. These operations are automatically available on any DStream of pairs* (e.g., DStream[(Int, Int)] through implicit conversions when* `org.apache.spark.streaming.StreamingContext._` is imported.** DStreams internally is characterized by a few basic properties:*  - A list of other DStreams that the DStream depends on*  - A time interval at which the DStream generates an RDD*  - A function that is used to generate an RDD after each time interval*/abstract class DStream[T: ClassTag] (@transient private[streaming] var ssc: StreamingContext) extends Serializable with Logging {// =======================================================================// Methods that should be implemented by subclasses of DStream// =======================================================================/** Time interval after which the DStream generates a RDD */def slideDuration: Duration   // 将stream切分成RDD的interval/** List of parent DStreams on which this DStream depends on */def dependencies: List[DStream[_]] // 和RDD一样,DStream之间也存在dependency关系/** Method that generates a RDD for the given time */def compute (validTime: Time): Option[RDD[T]] // RDD的生成逻辑// =======================================================================// Methods and fields available on all DStreams// =======================================================================// RDDs generated, marked as private[streaming] so that testsuites can access it@transientprivate[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] () // 最为核心的结构,可以看到DStream就是以time为key的RDD的hashmap// Time zero for the DStreamprivate[streaming] var zeroTime: Time = null // Stream开始的时间// Duration for which the DStream will remember each RDD createdprivate[streaming] var rememberDuration: Duration = null // Stream是无限的,而在DStream不可能保留所有的RDD,所以设置DStream需要remember的duration// Storage level of the RDDs in the streamprivate[streaming] var storageLevel: StorageLevel = StorageLevel.NONE// Checkpoint detailsprivate[streaming] val mustCheckpoint = falseprivate[streaming] var checkpointDuration: Duration = nullprivate[streaming] val checkpointData = new DStreamCheckpointData(this)// Reference to whole DStream graphprivate[streaming] var graph: DStreamGraph = null // DStreamGraph// Duration for which the DStream requires its parent DStream to remember each RDD createdprivate[streaming] def parentRememberDuration = rememberDuration/** Return the StreamingContext associated with this DStream */def context = ssc /** Persist the RDDs of this DStream with the given storage level */def persist(level: StorageLevel): DStream[T] = {this.storageLevel = levelthis}/** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)/** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */def cache(): DStream[T] = persist()/*** Enable periodic checkpointing of RDDs of this DStream* @param interval Time interval after which generated RDD will be checkpointed*/def checkpoint(interval: Duration): DStream[T] = {persist()checkpointDuration = intervalthis}
}


getOrCompute
注意的是,这里是产生RDD对象,而不是真正的进行计算,只有在runjob时才会做真正的计算
Spark RDD本身是不包含具体数据的,只是定义了workflow(依赖关系),处理逻辑

  /*** Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal* method that should not be called directly.*/private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {// If this DStream was not initialized (i.e., zeroTime not set), then do it// If RDD was already generated, then retrieve it from HashMapgeneratedRDDs.get(time) match {// If an RDD was already generated and is being reused, then// probably all RDDs in this DStream will be reused and hence should be cachedcase Some(oldRDD) => Some(oldRDD)// if RDD was not generated, and if the time is valid// (based on sliding time of this DStream), then generate the RDDcase None => { // 需要computeif (isTimeValid(time)) { // invalid的定义,(time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)compute(time) match { // 使用compute生成RDD对象case Some(newRDD) =>if (storageLevel != StorageLevel.NONE) {newRDD.persist(storageLevel) // 设置persist level}if (checkpointDuration != null &&(time - zeroTime).isMultipleOf(checkpointDuration)) {newRDD.checkpoint() // 设置checkpoint}generatedRDDs.put(time, newRDD) // 将产生的RDD对象放入generatedRDDsSome(newRDD)case None =>None}} else {None}}}}


generateJob
对于用getOrCompute产生的RDD对象,需要封装成job
而Job的关键,jobFunc,其实就是想Spark集群提交一个job
这里只是使用了emptyFunc,具体的output逻辑是需要被具体的outputDStream改写的

  /*** Generate a SparkStreaming job for the given time. This is an internal method that* should not be called directly. This default implementation creates a job* that materializes the corresponding RDD. Subclasses of DStream may override this* to generate their own jobs.*/private[streaming] def generateJob(time: Time): Option[Job] = {getOrCompute(time) match {case Some(rdd) => {val jobFunc = () => {val emptyFunc = { (iterator: Iterator[T]) => {} }context.sparkContext.runJob(rdd, emptyFunc)}Some(new Job(time, jobFunc))}case None => None}}


clearMetadata
清除过时的RDD对象,其中还会做unpersist,以及调用dependencies的clearMetadata

  /*** Clear metadata that are older than `rememberDuration` of this DStream.* This is an internal method that should not be called directly. This default* implementation clears the old generated RDDs. Subclasses of DStream may override* this to clear their own metadata along with the generated RDDs.*/private[streaming] def clearMetadata(time: Time) {val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))generatedRDDs --= oldRDDs.keysif (ssc.conf.getBoolean("spark.streaming.unpersist", false)) {oldRDDs.values.foreach(_.unpersist(false))}dependencies.foreach(_.clearMetadata(time))}

具体DStream的定义

FilteredDStream

package org.apache.spark.streaming.dstreamprivate[streaming]
class FilteredDStream[T: ClassTag](parent: DStream[T],filterFunc: T => Boolean) extends DStream[T](parent.ssc) {override def dependencies = List(parent)override def slideDuration: Duration = parent.slideDurationoverride def compute(validTime: Time): Option[RDD[T]] = {parent.getOrCompute(validTime).map(_.filter(filterFunc))}
}

 

WindowedDStream

image

private[streaming]
class WindowedDStream[T: ClassTag](parent: DStream[T],_windowDuration: Duration,_slideDuration: Duration)extends DStream[T](parent.ssc) {// Persist parent level by default, as those RDDs are going to be obviously reused.parent.persist(StorageLevel.MEMORY_ONLY_SER) //默认将parentRDD设置persist,因为parent RDD会在window slide中被反复读到def windowDuration: Duration =  _windowDuration // Windows大小override def dependencies = List(parent)override def slideDuration: Duration = _slideDuration // Windows滑动override def parentRememberDuration: Duration = rememberDuration + windowDuration // 保证RememberDuratioin一定大于windowDurationoverride def persist(level: StorageLevel): DStream[T] = {// Do not let this windowed DStream be persisted as windowed (union-ed) RDDs share underlying// RDDs and persisting the windowed RDDs would store numerous copies of the underlying data.// Instead control the persistence of the parent DStream.// 不要直接persist windowed RDDS,而是去persist parent RDD,原因是各个windows RDDs之间有大量的重复数据,直接persist浪费空间parent.persist(level)this}override def compute(validTime: Time): Option[RDD[T]] = {val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) //计算窗口intevalval rddsInWindow = parent.slice(currentWindow)val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow)} else {new UnionRDD(ssc.sc,rddsInWindow) //本质就是把parent DStream窗口内的RDD做union}Some(windowRDD)}
}

 

ShuffledDStream

private[streaming]
class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag](parent: DStream[(K,V)],createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiner: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true) extends DStream[(K,C)] (parent.ssc) {override def dependencies = List(parent)override def slideDuration: Duration = parent.slideDurationoverride def compute(validTime: Time): Option[RDD[(K,C)]] = {parent.getOrCompute(validTime) match {case Some(rdd) => Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))case None => None}}
}

 

PairDStreamFunctions
以groupByKey为例,和普通Spark里面没啥区别,依赖是基于combineByKey实现
比较有特点是提供groupByKeyAndWindow,其实就是先使用WindowedDStream将windows中的RDD union,然后再使用combineByKey

/*** Extra functions available on DStream of (key, value) pairs through an implicit conversion.* Import `org.apache.spark.streaming.StreamingContext._` at the top of your program to use* these functions.*/
class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])extends Serializable {private[streaming] def ssc = self.sscprivate[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism)= {new HashPartitioner(numPartitions)}/*** Return a new DStream by applying `groupByKey` on each RDD. The supplied* [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.*/def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {val createCombiner = (v: V) => ArrayBuffer[V](v)val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v)val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2)combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner).asInstanceOf[DStream[(K, Seq[V])]]}/*** Combine elements of each key in DStream's RDDs using custom functions. This is similar to the* combineByKey for RDDs. Please refer to combineByKey in* org.apache.spark.rdd.PairRDDFunctions in the Spark core documentation for more information.*/def combineByKey[C: ClassTag](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiner: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true): DStream[(K, C)] = {new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner,mapSideCombine)}
}

groupByKeyAndWindow

  /*** Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream.* Similar to `DStream.groupByKey()`, but applies it over a sliding window.* @param windowDuration width of the window; must be a multiple of this DStream's*                       batching interval* @param slideDuration  sliding interval of the window (i.e., the interval after which*                       the new DStream will generate RDDs); must be a multiple of this*                       DStream's batching interval* @param partitioner    partitioner for controlling the partitioning of each RDD in the new*                       DStream.*/def groupByKeyAndWindow(windowDuration: Duration,slideDuration: Duration,partitioner: Partitioner): DStream[(K, Seq[V])] = {val createCombiner = (v: Seq[V]) => new ArrayBuffer[V] ++= vval mergeValue = (buf: ArrayBuffer[V], v: Seq[V]) => buf ++= vval mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2self.groupByKey(partitioner).window(windowDuration, slideDuration) // DStream.window会将当前的dstream封装成WindowedDStream,见下面的代码.combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, partitioner).asInstanceOf[DStream[(K, Seq[V])]]}
  /*** Return a new DStream in which each RDD contains all the elements in seen in a* sliding window of time over this DStream.* @param windowDuration width of the window; must be a multiple of this DStream's*                       batching interval* @param slideDuration  sliding interval of the window (i.e., the interval after which*                       the new DStream will generate RDDs); must be a multiple of this*                       DStream's batching interval*/def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = {new WindowedDStream(this, windowDuration, slideDuration)}

 

updateStateByKey

  /*** Return a new "state" DStream where the state for each key is updated by applying* the given function on the previous state of the key and the new values of each key.* [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.* @param updateFunc State update function. If `this` function returns None, then*                   corresponding state key-value pair will be eliminated. Note, that*                   this function may generate a different a tuple with a different key*                   than the input key. It is up to the developer to decide whether to*                   remember the partitioner despite the key being changed.* @param partitioner Partitioner for controlling the partitioning of each RDD in the new*                    DStream* @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.* @tparam S State type*/def updateStateByKey[S: ClassTag](updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],partitioner: Partitioner,rememberPartitioner: Boolean): DStream[(K, S)] = {new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner)}

StateDStream
普通的DStream,都是直接从ParentRDD通过compute来得到当前的RDD
而StateDStream的特别之处,除了ParentRDD,还需要参考PreviousRDD,这个只存在在stream场景下,只有这个场景下,RDD之间才存在时间关系
PreviousRDD = getOrCompute(validTime - slideDuration),即在DStream的generatedRDDs上前一个时间interval上的RDD
处理函数,val finalFunc = (iterator: Iterator[(K, (Seq[V], Seq[S]))]) => { },需要3个参数,key,ParentRDD上的value,PreviousRDD上的value
处理函数需要考虑,当ParentRDD或PreviousRDD为空的情况

注意StateDStream,默认需要做persist和checkpoint

private[streaming]
class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](parent: DStream[(K, V)],updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],partitioner: Partitioner,preservePartitioning: Boolean) extends DStream[(K, S)](parent.ssc) {super.persist(StorageLevel.MEMORY_ONLY_SER) // RDD persist默认设为memory,因为后面的RDD需要用到override def dependencies = List(parent)override def slideDuration: Duration = parent.slideDurationoverride val mustCheckpoint = true  // 默认需要checkpoint,需要保持状态override def compute(validTime: Time): Option[RDD[(K, S)]] = {// Try to get the previous state RDDgetOrCompute(validTime - slideDuration) match {case Some(prevStateRDD) => {    // If previous state RDD exists// Try to get the parent RDDparent.getOrCompute(validTime) match {  // 既有PreviousRDD,又有ParentRDD的casecase Some(parentRDD) => {   // If parent RDD exists, then compute as usual// Define the function for the mapPartition operation on cogrouped RDD;// first map the cogrouped tuple to tuples of required type,// and then apply the update functionval updateFuncLocal = updateFuncval finalFunc = (iterator: Iterator[(K, (Seq[V], Seq[S]))]) => {val i = iterator.map(t => {(t._1, t._2._1, t._2._2.headOption)})updateFuncLocal(i)}val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner) //`(k, a) cogroup (k, b)` produces k -> Seq(ArrayBuffer as, ArrayBuffer bs) val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning)Some(stateRDD)}case None => {    // If parent RDD does not exist,ParentRDD不存在// Re-apply the update function to the old state RDDval updateFuncLocal = updateFuncval finalFunc = (iterator: Iterator[(K, S)]) => {val i = iterator.map(t => (t._1, Seq[V](), Option(t._2))) // 直接把ParentRDD置空,Seq[V]()updateFuncLocal(i)}val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning)Some(stateRDD)}}}case None => {    // If previous session RDD does not exist (first input data)// Try to get the parent RDDparent.getOrCompute(validTime) match {case Some(parentRDD) => {   // If parent RDD exists, then compute as usual,PreviousRDD为空的case,说明是第一个state RDD// Define the function for the mapPartition operation on grouped RDD;// first map the grouped tuple to tuples of required type,// and then apply the update functionval updateFuncLocal = updateFuncval finalFunc = (iterator: Iterator[(K, Seq[V])]) => {updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2, None))) // 把PreviousRDD置为None}val groupedRDD = parentRDD.groupByKey(partitioner)val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning)//logDebug("Generating state RDD for time " + validTime + " (first)")Some(sessionRDD)}case None => { // If parent RDD does not exist, then nothing to do!,previous和parent都没有,当然啥也做不了//logDebug("Not generating state RDD (no previous state, no parent)")None}}}}}
}

 

TransformedDStream
首先这是个比较通用的operation,可以通过自定义的transformFunc,将一组parentRDDs计算出当前的RDD
需要注意的是,这些parentRDDs必须在同一个streamContext下,并且有相同的slideDuration
在DStream接口中,可以提供transform和transformWith两种,参考下面源码

private[streaming]
class TransformedDStream[U: ClassTag] (parents: Seq[DStream[_]],transformFunc: (Seq[RDD[_]], Time) => RDD[U]) extends DStream[U](parents.head.ssc) {require(parents.length > 0, "List of DStreams to transform is empty")require(parents.map(_.ssc).distinct.size == 1, "Some of the DStreams have different contexts")require(parents.map(_.slideDuration).distinct.size == 1,"Some of the DStreams have different slide durations")override def dependencies = parents.toListoverride def slideDuration: Duration = parents.head.slideDurationoverride def compute(validTime: Time): Option[RDD[U]] = {val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeqSome(transformFunc(parentRDDs, validTime))}
}
  /*** Return a new DStream in which each RDD is generated by applying a function* on each RDD of 'this' DStream.*/def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {val cleanedF = context.sparkContext.clean(transformFunc)val realTransformFunc =  (rdds: Seq[RDD[_]], time: Time) => {assert(rdds.length == 1)cleanedF(rdds.head.asInstanceOf[RDD[T]], time)}new TransformedDStream[U](Seq(this), realTransformFunc) // this,单个RDD}/*** Return a new DStream in which each RDD is generated by applying a function* on each RDD of 'this' DStream and 'other' DStream.*/def transformWith[U: ClassTag, V: ClassTag](other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]): DStream[V] = {val cleanedF = ssc.sparkContext.clean(transformFunc)val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {assert(rdds.length == 2)val rdd1 = rdds(0).asInstanceOf[RDD[T]]val rdd2 = rdds(1).asInstanceOf[RDD[U]]cleanedF(rdd1, rdd2, time)}new TransformedDStream[V](Seq(this, other), realTransformFunc) // this and other,多个RDDs}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/546850.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PowerShell遍历文件夹下的子文件夹和文件

PowerShell遍历文件夹下的子文件夹和文件是一件很容易的事儿。Get-ChildItem这个cmdlet就有一个recurse参数是用于遍历文件夹的。 PowerShell中&#xff0c;使用Get-ChildItem来获取文件夹下面的子文件夹和文件&#xff08;当然&#xff0c;它的功能不仅于此&#xff09;。然后…

Modbus 通信协议详解

Modbus 通信协议详解&#xff1a; https://www.cnblogs.com/txwtech/p/11104428.html Modbus 通信协议详解

Spring Boot(十三)RabbitMQ安装与集成

一、前言 RabbitMQ是一个开源的消息代理软件&#xff08;面向消息的中间件&#xff09;&#xff0c;它的核心作用就是创建消息队列&#xff0c;异步接收和发送消息&#xff0c;MQ的全程是&#xff1a;Message Queue中文的意思是消息队列。 1.1 使用场景 削峰填谷&#xff1a;用…

C++ DNN Opencv3.4 实现人脸计数和人脸检测

前言 OpenCV 3.3正式发布后&#xff0c;对深度学习&#xff08;dnn模块&#xff09;提供了更好的支持&#xff0c;dnn模块目前支持Caffe、TensorFlow、Torch、PyTorch等深度学习框架。 另外&#xff0c;新版本中使用预训练深度学习模型的API同时兼容C和Python&#xff0c;让系…

几种php 删除数组元素方法

看一完整删除重复数组实例 代码如下复制代码 //删除数组中的一个元素 function array_remove_value(&$arr, $var){ foreach ($arr as $key > $value) { if (is_array($value)) { array_remove_value($arr[$key], $var); } else { $value trim($value); if ($value $va…

Modbus通信协议之CRC16冗余循环校验函数

Modbus 通信协议详解&#xff1a;https://www.cnblogs.com/txwtech/p/11104428.html Modbus 通信协议详解 下面是在QT5 C控制台测试程序。 #include <QCoreApplication> #include <QTextStream> #include <stdio.h>//然后&#xff0c;在使用cin、cout、cer…

C++ SVM Opencv3.4实现人脸检测

很通俗的来说&#xff0c;haar算法计算特征就是用一块区域内黑色的值减去白色的值。但是一张图片像素点是非常多的&#xff0c;如果用普通的方法去计算一块区域的值&#xff0c;效率相当低下。这里有一种加速计算的方法--积分图&#xff1a;定义如下&#xff1a;&#xff08;维…

Spring Boot(十四)RabbitMQ延迟队列

一、前言 延迟队列的使用场景&#xff1a;1.未按时支付的订单&#xff0c;30分钟过期之后取消订单&#xff1b;2.给活跃度比较低的用户间隔N天之后推送消息&#xff0c;提高活跃度&#xff1b;3.过1分钟给新注册会员的用户&#xff0c;发送注册邮件等。 实现延迟队列的方式有…

bzoj 2121 DP

首先如果我们能处理出来i,j段能不能消掉&#xff0c;这样就可以直接dp转移了&#xff0c;设w[i]为前i为最少剩下多少&#xff0c;那么w[i]w[j-1] (flag[j][i])。 现在我们来求flag[i][j]&#xff0c;首先我们可以把字符串组建立trie然后处理在串L中从left位置开始的所有的flag&…

三、Win10 64位PyCharm下打包.py程序为可执行exe文件且兼容32位和64位

WIN10 64位下Pycharm打包.py程序为可执行文件exe 上面衔接WIN10 64位下Pycharm打包.py程序为可执行文件exe,存在不兼容32位和64位的情况。 下面Win10 64位PyCharm下打包.py程序为可执行exe文件且兼容32位和64位说明: 前提条件 python3.8.2 32 位;注意:原来有 64 位 Pyth…

Java核心(一)线程Thread详解

一、概述 在开始学习Thread之前&#xff0c;我们先来了解一下 线程和进程之间的关系&#xff1a; 线程(Thread)是进程的一个实体&#xff0c;是CPU调度和分派的基本单位。 线程不能够独立执行&#xff0c;必须依存在应用程序中&#xff0c;由应用程序提供多个线程执行控制。 线…

Jetson Nano配置与使用(5)cuda测试及tensorflow gpu安装

Jetson Nano利用官方镜像进行安装后&#xff0c;系统已经安装好了JetPack&#xff0c;cuda&#xff0c;cudaa&#xff0c;OpenCV等组件&#xff0c;不过需要修改下环境变量才可以使用。 1.修改环境变量 利用vim打开 ~ 路径下.bashrc文件&#xff1a; sudo vi ~./bashrc文件的…

工作方法

刚入职场的年轻人&#xff0c;总不喜欢写工作汇报&#xff0c;想来有如下原因:觉得每天都在做同样的事情&#xff0c;没有多少有趣新鲜的素材好写觉得这是领导对自己的监控&#xff0c;写了会有不少工作疏漏落在领导手上不太知道如何将本日工作进行总结其实每日工作汇报是非常重…

Python弹窗提示警告框MessageBox

需要安装pywin32模块&#xff0c;pip install pywin32 # pip install pywin32 import win32api import win32con# 提醒OK消息框 win32api.MessageBox(0, "这是一个测试提醒OK消息框", "提醒",win32con.MB_OK)# 是否信息框 win32api.MessageBox(0, "这…

一次失败的蛋疼的设计

需求&#xff1a;当一个用户上传一条记录之后&#xff0c;通知某一个组或者某几个组的用户查看。用户可以属于多个组。 分析&#xff1a;当用户登录之后&#xff0c;判断自己所在的组是否属于通知组&#xff0c;是&#xff0c;则提醒。 SQL&#xff1a; select * from newsGro…

Java核心(二)深入理解线程池ThreadPool

本文你将获得以下信息&#xff1a; 线程池源码解读线程池执行流程分析带返回值的线程池实现延迟线程池实现 为了方便读者理解&#xff0c;本文会由浅入深&#xff0c;先从线程池的使用开始再延伸到源码解读和源码分析等高级内容&#xff0c;读者可根据自己的情况自主选择阅读…

XCOPY不是内部或外部命令,也不是可运行程序 修复

系统常用命令无法识别 解决 1.进入系统安装目录的system32中&#xff0c;一般目录为C:\Windows\System32&#xff0c;找一下可执行文件是否存在&#xff0c;是否可以运行&#xff08;如ipconfig&#xff0c;直接点击会出现一个命令行窗口&#xff0c;一闪而逝&#xff09;&…

Jetson Nano安装pytorch 基于torch1.6和torchvision0.7

需要注意的是&#xff0c;博主使用的是win10主机&#xff0c;通过局域网连接的jetson nano&#xff0c; 其中jetson nano的预制CUDA版本为10.2 Jetpack 4.1.1 分别执行以下命令&#xff0c;即可查看自己的jetson nano 预搭载的CUDA版本 sudo pip3 install jetson-stats sudo …

SEO之基础篇(一)

1、关键字的竞争你能应对吗&#xff1f;那些每索20W次的关键字你能做到吗&#xff1f;因此我们选择关键字要适合自己的。你是单枪匹马还是团队&#xff0c;要考虑清楚。 2、找到适合自己的关键字。新站长最好选择1~2个关键字&#xff0c;切忌不要太多。等网站流量上去了后再…

Python获取硬件信息(硬盘序列号,CPU序列号)

原文衔接 https://www.cnblogs.com/blog-rui/p/12108072.html pip install wmi pip install pywin32import wmic wmi.WMI()# # 硬盘序列号 for physical_disk in c.Win32_DiskDrive():print(physical_disk.SerialNumber)# CPU序列号 for cpu in c.Win32_Processor():print(cp…