spark消费kafka产生数据堆积怎么处理_SparkStreaming读取Kafka的两种方式

本文主要从以下几个方面介绍SparkStreaming读取Kafka的两种方式:

一、SparkStreaming简介

二、Kafka简介

三、Redis简介(可用于保存历史数据或偏移量数据)

四、SparkStreaming读取Kafka数据的两种方式

五、演示Demo

一、SparkStreaming简介

可以参考这篇文章:SparkStreaming 详解

二、Kafka简介

可以参考这篇文章:Kafka(分布式发布订阅消息系统) 简介

三、Redis简介

可以参考这篇文章:Redis简介

四、SparkStreaming读取Kafka数据的两种方式

spark streaming提供了两种获取方式,一种是利用接收器(receiver)和kafaka的高层API实现。
一种是不利用接收器,直接用kafka底层的API来实现(spark1.3以后引入)。

1、reciver链接方式(有些问题,开发中不采用这种方式)

  • 用KafkaUtils.createDstream创建链接。Receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。
  • Receiver方式是通过zookeeper来连接kafka队列,调用Kafka高阶API,offset存储在zookeeper,由Receiver维护。
  • 在executor上会有receiver从kafka接收数据并存储在Spark executor中,在到了batch时间后触发job去处理接收到的数据,1个receiver占用1个core使用wal预写机制,因为需要使用hdfs等存储,因此会降低性能。
f3d62ddee37202f97441c22cca868fe2.png

receiver方式

基于Receiver方式存在的问题:

  • 启用WAL机制,每次处理之前需要将该batch内的数据备份到checkpoint目录中,这降低了数据处理效率,同时加重了Receiver的压力;另外由于数据备份机制,会受到负载影响,负载一高就会出现延迟的风险,导致应用崩溃。
  • 采用MEMORY_AND_DISK_SER降低对内存的要求,但是在一定程度上影响了计算的速度。
  • 单Receiver内存。由于Receiver是属于Executor的一部分,为了提高吞吐量,提高Receiver的内存。但是在每次batch计算中,参与计算的batch并不会使用这么多内存,导致资源严重浪费。
  • 提高并行度,采用多个Receiver来保存kafka的数据。Receiver读取数据是异步的,不会参与计算。如果提高了并行度来平衡吞吐量很不划算。
  • Receiver和计算的Executor是异步的,在遇到网络等因素时,会导致计算出现延迟,计算队列一直在增加,而Receiver一直在接收数据,这非常容易导致程序崩溃。
  • 在程序失败恢复时,有可能出现数据部分落地,但是程序失败,未更新offsets的情况,这会导致数据重复消费。

2、Direct直连方式(开发中使用的方式)

  • 使用KafkaUtils.createDirectStream创建链接。这种方式定期从kafka的topic下对应的partition中查询最新偏移量,并在每个批次中根据相应的定义的偏移范围进行处理。Spark通过调用kafka简单的消费者API读取一定范围的数据。
  • Direct方式是直接连接kafka分区来获取数据。从每个分区直接读取数据大大提高了并行能力Direct方式调用Kafka低阶API(底层API),offset自己存储和维护,默认由Spark维护在checkpoint中,消除了与zk不一致的情况当然也可以自己手动维护,把offset存在mysql、redis中所以基于Direct模式可以在开发中使用,且借助Direct模式的特点+手动操作可以保证数据的Exactly once 精准一次

基于Direct方式的优势:

  • 简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对他们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从kafka中读取数据。所以在kafka partition和RDD partition之间,有一一对应的关系。
  • 高性能:如果要保证数据零丢失,在基于Receiver的方式中,需要开启WAL机制。这种方式其实效率很低,因为数据实际被复制了两份,kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于Direct的方式,不依赖于Receiver,不需要开启WAL机制,只要kafka中做了数据的复制,那么就可以通过kafka的副本进行恢复
  • 强一致语义:基于Receiver的方式,使用kafka的高阶API来在Zookeeper中保存消费过的offset。这是消费kafka数据的传统方式。这种方式配合WAL机制,可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和Zookeeper之间可能是不同步的。基于Direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据时消费一次且仅消费一次。
  • 降低资源:Direct不需要Receiver,其申请的Executors全部参与到计算任务中;而Receiver则需要专门的Receivers来读取kafka数据且不参与计算。因此相同的资源申请,Direct能够支持更大的业务。Receiver与其他Executor是异步的,并持续不断接收数据,对于小业务量的场景还好,如果遇到大业务量时,需要提高Receiver的内存,但是参与计算的Executor并不需要那么多的内存,而Direct因为没有Receiver,而是在计算的时候读取数据,然后直接计算,所以对内存的要求很低。
  • 鲁棒性更好:基于Receiver方式需要Receiver来异步持续不断的读取数据,因此遇到网络、存储负载等因素,导致实时任务出现堆积,但Receiver却还在持续读取数据,此种情况容易导致计算崩溃。Direct则没有这种顾虑,其Driver在触发batch计算任务时,才会读取数据并计算,队列出现堆积并不不会引起程序的失败。

基于Direct方式的不足:

  • Direct方式需要采用checkpoint或者第三方存储来维护offset,而不是像Receiver那样,通过Zookeeper来维护offsets,提高了用户的开发成本。
  • 基于Receiver方式指定topic指定consumer的消费情况均能够通过Zookeeper来监控,而Direct则没有这么便利,如果想做监控并可视化,则需要投入人力开发。

五、演示Demo

1、reciver链接方式

package xxximport org.apache.spark.SparkConfimport org.apache.spark.storage.StorageLevelimport org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}import org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}/** * Receiver链接方式 */object KafkaWordCount {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[*]")    val ssc = new StreamingContext(conf, Seconds(5))    val zkQuorum = "slave2:2181,slave3:2181,slave4:2181"    val groupId = "g1"    val topic = Map[String, Int]("test1" -> 1)    //创建DStream,需要KafkaDStream    val data: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topic, StorageLevel.MEMORY_AND_DISK_SER)    //对数据进行处理    //Kafak的ReceiverInputDStream[(String, String)]里面装的是一个元组(key是写入的key,value是实际写入的内容)    val lines: DStream[String] = data.map(_._2)    //对DSteam进行操作,操作这个抽象(代理,描述),就像操作一个本地的集合一样,类似于RDD    val words: DStream[String] = lines.flatMap(_.split(" "))    val wordAndOne: DStream[(String, Int)] = words.map((_, 1))    val reduced: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)    //打印结果(Action)    reduced.print()    //启动sparksteaming程序    ssc.start()    //等待优雅的退出    ssc.awaitTermination()  }}

2、直连方式(用zookeeper存储偏移量)

步骤:

准备zookeeper集群存储读取到额kafka数据的每个分区的偏移量

调用KafkaUtils.createDirectStream建立直连链接

读取zookeeper集群中的已经存储的每个数据分区地偏移量,根据该偏移量继续读取数据。或者从头(当前)位置读取数据

调用kafkaStream.transform遍历每个RDD,获取该RDD对应数据的偏移量

对RDD进行操作,并将zookeeper中保存的数据偏移量进行更新

package sparkStreamingAndKafkaimport kafka.common.TopicAndPartitionimport kafka.message.MessageAndMetadataimport kafka.serializer.StringDecoderimport kafka.utils.{ZKGroupTopicDirs, ZkUtils}import org.I0Itec.zkclient.ZkClientimport org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.{DStream, InputDStream}import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}/** * 直连方式,用zookeeper存偏移量 */object KafkaDirection1 {  def main(args: Array[String]): Unit = {    val conf: SparkConf = new SparkConf().setAppName("kafkaDirection").setMaster("local[*]")    val ssc = new StreamingContext(conf, Seconds(3))    val group = "group1"  // 分组    val topic = "wordCount"    // topic    val brokerList = "slave1:9092,slave2:9092,slave3:9092"  // broker集群,sparkStream的Task直接连到kafka分区上    val zkQuorum = "slave2:2181,slave3:2181,slave4:2181"     //  zookeeper集群,用于记录偏移量(也可以选择MySQL、Redis等记录偏移量)    val topics = Set(topic)   // 创建stream时使用的topic名字集合,sparkStreaming可同时消费多个topic    val topicDirs = new ZKGroupTopicDirs(group, topic) // 创建一个ZKGroupTopicDirs对象,其实就是指定往zookeeper中写入数据的目录,该目录用于保存偏移量    val zkTopicPath: String = topicDirs.consumerOffsetDir  // 获取zookeeper中的路径"/group1/offsets/wordCount/"    // 准备kafka参数    val kafkaParams = Map(      "metadata.broker.list" -> brokerList,      "group.id" -> group,      "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString  // 偏移量最开始的时候从哪读,SmallestTimeString表示从头开始读,                                                                         // LargestTimeString表示从启动时刻产生的数据读    )    val zkClient = new ZkClient(zkQuorum) // zookeeper的客户端,可以从zk中读取偏移量数据,并更新偏移量    val numOfzkChildren: Int = zkClient.countChildren(zkTopicPath) // 检查该路径下是否保存有数据(偏移量),                                                                   // 例如:/group1/offsets/wordCount/2/1003 表示2号分区有偏移量1003    var kafkaStream: InputDStream[(String, String)] = null    // 如果zookeeper中保存有偏移量offfset,则利用这个偏移量作为kafkaStream的起始位置    var fromOffsets: Map[TopicAndPartition, Long] = Map()    if (numOfzkChildren > 0){  // 如果保存过offset      for (i  1003        fromOffsets += (tp -> fromOffset.toLong)  // 将topic不同分区所对应的偏移量放入集合中      }      //Key: kafka的key   values: "hello tom hello jerry"      //这个会将 kafka 的消息进行 transform,最终 kafka 的数据都会变成 (kafka的key, message) 这样的 tuple      val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())  // 读数据的规则      //通过KafkaUtils创建直连的DStream(fromOffsets参数的作用是:按照前面计算好了的偏移量继续消费数据)      // 泛型参数说明:      //[String, String, StringDecoder, StringDecoder,     (String, String)]      //  key    value    key的解码方式   value的解码方式    处理完成后Dstream中的数据类型      kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)    }else{  // 没有保存过offset,相当于从头读      //如果未保存,根据 kafkaParam 的配置使用最新(largest)或者最旧的(smallest) offset      //[String, String, StringDecoder, StringDecoder]      //  key    value    key的解码方式   value的解码方式      kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)    }    //偏移量的范围    var offsetRanges = Array[OffsetRange]()    //从kafka读取的消息,DStream的Transform方法可以将当前批次的RDD获取出来    //该transform方法计算获取到当前批次RDD,然后将RDD的偏移量取出来,然后在将RDD返回到DStream    val transform: DStream[(String, String)] = kafkaStream.transform { rdd =>      //得到该 RDD对应 kafka 的消息的 offset      //该RDD是一个KafkaRDD,可以获得它的偏移量的范围      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges  // 偏移量范围      rdd   // 不对RDD进行操作,再放回DStream    }    // DStream 是RDD的工厂,每隔一段时间产生一个RDD    val messages: DStream[String] = transform.map(_._2)    //依次迭代DStream中的RDD    messages.foreachRDD { rdd =>    // foreachRDD,每隔一段时间产生一个RDD      rdd.foreachPartition(partition =>   // foreachPartition 每个分区一个连接链接        partition.foreach(x => {    // foreach 分区中的每条数据          println(x)        })      )      // 更新偏移量offset      for (o 

但是,在这个方案中,为了获取偏移量需要遍历RDD,后续又要遍历RDD操作RDD,代码冗余

3、直连方式(获取数据偏移量的同时处理数据)

package xxximport kafka.common.TopicAndPartitionimport kafka.message.MessageAndMetadataimport kafka.serializer.StringDecoderimport kafka.utils.{ZKGroupTopicDirs, ZkUtils}import org.I0Itec.zkclient.ZkClientimport org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.dstream.InputDStreamimport org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}/** * 直连方式,用zookeeper存偏移量(获取偏移量的同时,对数据进行操作) */object kafkaDirection2 {  def main(args: Array[String]): Unit = {    val conf: SparkConf = new SparkConf().setAppName("kafkaDirection").setMaster("local[*]")    val ssc = new StreamingContext(conf, Seconds(3))    val group = "group3"  // 分组    val topic = "wordCount"    // topic    val brokerList = "slave1:9092,slave2:9092,slave3:9092"  // broker集群,sparkStream的Task直接连到kafka分区上    val zkQuorum = "slave2:2181,slave3:2181,slave4:2181"     //  zookeeper集群,用于记录偏移量(也可以选择MySQL、Redis等记录偏移量)    val topics = Set(topic)   // 创建stream时使用的topic名字集合,sparkStreaming可同时消费多个topic    val topicDirs = new ZKGroupTopicDirs(group, topic) // 创建一个ZKGroupTopicDirs对象,其实就是指定往zookeeper中写入数据的目录,该目录用于保存偏移量    val zkTopicPath: String = topicDirs.consumerOffsetDir  // 获取zookeeper中的路径"/group1/offsets/wordCount/"    // 准备kafka参数    val kafkaParams = Map(      "metadata.broker.list" -> brokerList,      "group.id" -> group,      "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString  // 偏移量最开始的时候从哪读,SmallestTimeString表示从头开始读,      // LargestTimeString表示从启动时刻产生的数据读    )    val zkClient = new ZkClient(zkQuorum) // zookeeper的客户端,可以从zk中读取偏移量数据,并更新偏移量    val numOfzkChildren: Int = zkClient.countChildren(zkTopicPath) // 检查该路径下是否保存有数据(偏移量),    // 例如:/group1/offsets/wordCount/2/1003 表示2号分区有偏移量1003    var kafkaStream: InputDStream[(String, String)] = null    // 如果zookeeper中保存有偏移量offfset,则利用这个偏移量作为kafkaStream的起始位置    var fromOffsets: Map[TopicAndPartition, Long] = Map()    if (numOfzkChildren > 0){  // 如果保存过offset      for (i  1003        fromOffsets += (tp -> fromOffset.toLong)  // 将topic不同分区所对应的偏移量放入集合中      }      //Key: kafka的key   values: "hello tom hello jerry"      //这个会将 kafka 的消息进行 transform,最终 kafka 的数据都会变成 (kafka的key, message) 这样的 tuple      val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())  // 读数据的规则      //通过KafkaUtils创建直连的DStream(fromOffsets参数的作用是:按照前面计算好了的偏移量继续消费数据)      // 泛型参数说明:      //[String, String, StringDecoder, StringDecoder,     (String, String)]      //  key    value    key的解码方式   value的解码方式    处理完成后Dstream中的数据类型      kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)    }else{  // 没有保存过offset,相当于从头读      //如果未保存,根据 kafkaParam 的配置使用最新(largest)或者最旧的(smallest) offset      //[String, String, StringDecoder, StringDecoder]      //  key    value    key的解码方式   value的解码方式      kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)    }    //偏移量的范围    var offsetRanges = Array[OffsetRange]()    // 获取偏移量的同时处理数据        // 直连方式只有在kakaDstream中的RDD才能获取偏移量,那么就不能调用DStream的Transformation    // 所以只能在KafkaStream中调用foreachRDD,获取RDD的偏移量,然后就是对RDD进行操作了    //依次迭代DStream中的RDD    // 如果使用直连方式进行累加数据,就需要在外部的数据库中进行累加(用kay-value的内存数据库,NoSQL型数据库  Redis)    kafkaStream.foreachRDD { kafkaRDD =>{      // 只有kafkaRDD可以强转成HashOffSetRanges,并获取偏移量      val offsetRanges: Array[OffsetRange] = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges      val lines: RDD[String] = kafkaRDD.map(_._2)      val words: RDD[String] = lines.flatMap(u => {        u.split(" ")      })      val wordsAndOne: RDD[(String, Int)] = words.map(word => {        (word, 1)      })      val reduced: RDD[(String, Int)] = wordsAndOne.reduceByKey((a, b) => {        a + b      })      reduced.foreach(println)      // 更新偏移量offset      for (o 

但是该方案,无法获取历史数据。这里统计到的wordcount只是某一时间片内对应数据的统计结果,并不包含历史数据。

4、直连方式,zookeeper存储偏移量数据,redis存储历史数据。

redis的连接池:

package xxximport redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}object JedisConnectePool {  val config = new JedisPoolConfig()  //最大连接数,  config.setMaxTotal(20)  //最大空闲连接数,  config.setMaxIdle(10)  //当调用borrow Object方法时,是否进行有效性检查 -->  config.setTestOnBorrow(true)  //10000代表超时时间(10秒)  val pool = new JedisPool(config, "192.168.247.8", 6379, 10000, "123")  def getConnection():Jedis={    pool.getResource  }}
package xxximport jedis.JedisConnectionPoolimport kafka.common.TopicAndPartitionimport kafka.message.MessageAndMetadataimport kafka.serializer.StringDecoderimport kafka.utils.{ZKGroupTopicDirs, ZkUtils}import org.I0Itec.zkclient.ZkClientimport org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.dstream.{DStream, InputDStream}import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}import redis.clients.jedis.Jedis/** * 直连方式,在获取RDD偏移量的同时操作偏移量,并且能够wordcount统计时包含历史统计数据 */object kafkaDirection3 {  def main(args: Array[String]): Unit = {    val conf: SparkConf = new SparkConf().setAppName("kafkaDirection").setMaster("local[*]")    val ssc = new StreamingContext(conf, Seconds(3))    val group = "group2"  // 分组    val topic = "wordCount"    // topic    val brokerList = "slave1:9092,slave2:9092,slave3:9092"  // broker集群,sparkStream的Task直接连到kafka分区上    val zkQuorum = "slave2:2181,slave3:2181,slave4:2181"     //  zookeeper集群,用于记录偏移量(也可以选择MySQL、Redis等记录偏移量)    val topics = Set(topic)   // 创建stream时使用的topic名字集合,sparkStreaming可同时消费多个topic    val topicDirs = new ZKGroupTopicDirs(group, topic) // 创建一个ZKGroupTopicDirs对象,其实就是指定往zookeeper中写入数据的目录,该目录用于保存偏移量    val zkTopicPath: String = topicDirs.consumerOffsetDir  // 获取zookeeper中的路径"/group1/offsets/wordCount/"    // 准备kafka参数    val kafkaParams = Map(      "metadata.broker.list" -> brokerList,      "group.id" -> group,      "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString  // 偏移量最开始的时候从哪读,SmallestTimeString表示从头开始读,      // LargestTimeString表示从启动时刻产生的数据读    )    val zkClient = new ZkClient(zkQuorum) // zookeeper的客户端,可以从zk中读取偏移量数据,并更新偏移量    val numOfzkChildren: Int = zkClient.countChildren(zkTopicPath) // 检查该路径下是否保存有数据(偏移量),    // 例如:/group1/offsets/wordCount/2/1003 表示2号分区有偏移量1003    var kafkaStream: InputDStream[(String, String)] = null    // 如果zookeeper中保存有偏移量offfset,则利用这个偏移量作为kafkaStream的起始位置    var fromOffsets: Map[TopicAndPartition, Long] = Map()    if (numOfzkChildren > 0){  // 如果保存过offset      for (i  1003        fromOffsets += (tp -> fromOffset.toLong)  // 将topic不同分区所对应的偏移量放入集合中      }      //Key: kafka的key   values: "hello tom hello jerry"      //这个会将 kafka 的消息进行 transform,最终 kafka 的数据都会变成 (kafka的key, message) 这样的 tuple      val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())  // 读数据的规则      //通过KafkaUtils创建直连的DStream(fromOffsets参数的作用是:按照前面计算好了的偏移量继续消费数据)      // 泛型参数说明:      //[String, String, StringDecoder, StringDecoder,     (String, String)]      //  key    value    key的解码方式   value的解码方式    处理完成后Dstream中的数据类型      kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)    }else{  // 没有保存过offset,相当于从头读      //如果未保存,根据 kafkaParam 的配置使用最新(largest)或者最旧的(smallest) offset      //[String, String, StringDecoder, StringDecoder]      //  key    value    key的解码方式   value的解码方式      kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)    }    //偏移量的范围    var offsetRanges = Array[OffsetRange]()    // 直连方式只有在kakaDstream中的RDD才能获取偏移量,那么就不能调用DStream的Transformation    // 所以只能在KafkaStream中调用foreachRDD,获取RDD的偏移量,然后就是对RDD进行操作了    //依次迭代DStream中的RDD    // 如果使用直连方式进行累加数据,就需要在外部的数据库中进行累加(用kay-value的内存数据库,NoSQL型数据库  Redis)    kafkaStream.foreachRDD { kafkaRDD =>{      // 只有kafkaRDD可以强转成HashOffSetRanges,并获取偏移量      val offsetRanges: Array[OffsetRange] = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges      val lines: RDD[String] = kafkaRDD.map(_._2)      val words: RDD[String] = lines.flatMap(u => {        u.split(" ")      })      val wordsAndOne: RDD[(String, Int)] = words.map(word => {        (word, 1)      })      val reduced: RDD[(String, Int)] = wordsAndOne.reduceByKey((a, b) => {        a + b      })      val stated: RDD[(String, Int)] = reduced.map(u => {       // 获取redis存放的历史统计数据        val conn: Jedis = JedisConnectionPool.getConnection()        val str: String = conn.get(u._1)        var num = 0        if(str != null){          num = str.toInt        }        val value: Int = u._2        val value1: Int = num+value        // 更新redis中的统计数据        conn.set(u._1, value1.toString)        conn.close()        (u._1, value1)      })      stated.foreach(println)      // 更新偏移量offset      for (o 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/551187.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python字符串操作作业_python 第二天作业

#python 中的循环#先介绍for循环#格式#for 临时变量 in 可迭代对象# 循环体#name neusoft#for a in name :# print(a)# if a s# print(嘿嘿)#循环次数哪里去了?# #这个a是什么鬼? a是临时变量 提前声明 python自动为你创建#range (起始位置&#xff0c…

python 白化_Python新疆某气候要素IDW(反距离权重)插值

1、Rbf插值import numpy as npimport cartopy.crs as ccrsimport cartopy.feature as cfeatfrom cartopy.mpl.gridliner import LONGITUDE_FORMATTER, LATITUDE_FORMATTERfrom cartopy.io.shapereader import Readerimport matplotlib.pyplot as pltimport matplotlib.ticker a…

帆软报表参数传给网络报表_报表开发工具FineReport的使用: 程序网络报表

1定义程序网络报表程序网络报表所在类需要继承com.fr.web.reportlet这个抽象类,并且需要实现createReport(ReportletRequest arg0)这个方法,并返回报表对象。具体代码如下:2//程序网络报表package com.fr.demo; import java.util.Map;impo…

bootstrap 取消_学习写个网站(5)Bootstrap学习2

吃了2天烧烤夜宵&#xff0c;还是得讲点自律。【正文】继续bootstrap&#xff0c;还是菜鸟教程。11. 分页就是还有种就是翻页&#xff0c;12. 标签class"label label-default">默认标签</span>label-primarylabel-success13. 警告就是那种可以取消的消息&am…

jdbc mysql user_tab_comments_MySQL学习(五)——使用JDBC完成用户表CRUD的操作

通过案例我们发现“获得连接”和“释放资源”两次代码将在之后的增删改查所有功能中都存在&#xff0c;开发中遇到此种情况&#xff0c;将采用工具类的方法进行抽取&#xff0c;从而达到代码的重复利用。1、使用properties配置文件开发中获得连接的4个参数(驱动、URL、用户名、…

mysql中try的意思_java中try是什么意思

try是Java中的关键字&#xff0c;主要用于异常处理机制&#xff0c;那么它有什么作用呢&#xff1f;try – 用于监听。将要被监听的代码(可能抛出异常的代码)放在try语句块之内&#xff0c;当try语句块内发生异常时&#xff0c;异常就被抛出。它一般与catch..finally组合使用块…

linux用java连接mysql_Java使用JDBC方式连接数据库

开发一个JDBC应用程序&#xff0c;基本需要以下几个步骤&#xff1a;1.把JDBC驱动类装载入Java虚拟机中。使用java.lang.Class类的静态方法forName(String className)实现。例&#xff1a; Class.forName("JDBC驱动类名称")2.加载驱动&#xff0c;并与数据库建立连接…

java web gradle_gradle学习之旅(四) 使用gradle构建简单的java web项目

本节通过一个简单的javaweb项目来体会gradle的使用需求构建一个javaweb项目&#xff0c;搭建jspservlet开发环境可以将需求分解为两步&#xff1a;使用gradle构建一个java项目为该项目构建web视图层工具gradle4.3ideajdk 1.8实验过程首先在idea中创建一个空的gradle项目创建如下…

java 整数 引用传递_关于Java引用传递的一个困惑?

Java的引用(包括基本类型&#xff0c;对象引用类型)在声明、方法调用等时候都会产生新的引用&#xff0c;复制等号右侧的引用。分为下面3种情况&#xff1a;基本类型代表的值存储在引用里面&#xff0c;引用中专门有个区域存储这个值&#xff0c;所以在复制的时候&#xff0c;值…

java fastjson 泛型_解决fastjson泛型转换报错的解决方法

错误信息Exception in thread "main" java.lang.ClassCastException: com.alibaba.fastjson.JSONObject cannot be cast to com.xh.demo.UserDO泛性类Datapublic class ResultSetDTO {private Integer totalSize;private Integer count;private List records;}实体类…

通达OA header身份认证绕过漏洞复现

通达OA是中国通达公司的一套协同办公自动化软件&#xff0c;通达OA2013&#xff0c;通达OA2016&#xff0c;通达OA2017 存在身份认证绕过漏洞&#xff0c;攻击者可以利用漏洞生成cookie&#xff0c;实现未授权访问。 1.漏洞级别 高危 2.漏洞搜索 fofa title"office An…

leetcode mysql 排名_Leetcode178.分数排名(中等)

题目编写一个 SQL 查询来实现分数排名。如果两个分数相同&#xff0c;则两个分数排名(Rank)相同。请注意&#xff0c;平分后的下一个名次应该是下一个连续的整数值。换句话说&#xff0c;名次之间不应该有“间隔”。-----------| Id | Score |-----------| 1 | 3.50 || 2 | 3.6…

java post 中文乱码问题_java post中文乱码问题

java post中文乱码问题function addcategory() {if (document.myform.category.value "") {alert("商品分类不能为空");return;}//var categorydocument.getElementById("category").value;var c document.myform.category.value;alert(c);wit…

java struct工作原理_Struts2的工作原理(图解)详解

Struts2的工作原理上图来源于Struts2官方站点&#xff0c;是Struts 2 的整体结构。一个请求在Struts2框架中的处理大概分为以下几个步骤(可查看源码&#xff1a;https://github.com/apache/struts):1 客户端初始化一个指向Servlet容器(例如Tomcat)的请求2 这个请求经过一系列的…

java 旅行家的预算_洛谷 P1016 旅行家的预算 Java解法

洛谷 P1016 旅行家的预算 Java解法洛谷 P1016 旅行家的预算 Java解法package com.two;import java.util.Scanner;public class Main {public static void main(String[] args) {Scanner sc new Scanner(System.in);double D1 sc.nextDouble();// 两个城市之间的距离double C …

java语言执行过程_Java程序的运行过程(执行流程)分析

万事知其然&#xff0c;要知其所以然&#xff0c;所以本节带大家来详细了解一下 Java 程序的执行过程。从《使用记事本编写运行Java程序》一节的案例可以看出&#xff0c;Java 程序的运行必须经过编写、编译和运行 3 个步骤。编写&#xff1a;是指在 Java 开发环境中进行程序代…

php搭建云服务器,云服务器上如何搭建php环境

1) 安装libtool和libtool-ltdl[rootlocalhost mysql]# yum -y install "libtool*"[rootlocalhost mysql]# yum -y install "libtool-ltdl*"2) 手工修改gd库文件PHP 5.4 也有一些 Bug&#xff0c;在检测 gd 库时会报错&#xff0c;需要我们手工修改。命令如…

php实现文本替换,php 如何实现文字替换

php实现文字替换的方法&#xff1a;首先创建一个PHP示例文件&#xff1b;然后输入代码“str_replace("iwind", "kiki", "i love iwind, iwind said");”&#xff1b;最后输出执行结果即可。在php替换字符效率最高也是最简单字符替换函数str_repl…

php 删除指定html标签,php删除html标签的三种解决办法

分享下PHP删除HTMl标签的三种方法。方法1&#xff1a;直接取出想要取出的标记function strip($str){$strstr_replace("","",$str);//$strhtmlspecialchars($str);return strip_tags($str);}//edit by www.jbxue.com?>方法2.PHP 中有个 strip_tags 函数…

达内php第三次月考,达内第三次月考

1. 下列不属于浏览器内置的对象的是&#xff1a;()A. navigatorB. documentC. windowD. request正确答案&#xff1a;D2. Servlet 可以存储数据的三个不同的作用域是()。A. 请求、会话和上下文B. 响应、会话和上下文C. 请求、响应和会话D. 请求、响应和上下文正确答案&#xff…