sparkstreaming 读取mysql_第十篇|SparkStreaming手动维护Kafka Offset的几种方式

e60f86e87d190e66faa562b195f71753.png

Spark Streaming No Receivers 方式的createDirectStream 方法不使用接收器,而是创建输入流直接从Kafka 集群节点拉取消息。输入流保证每个消息从Kafka 集群拉取以后只完全转换一次,保证语义一致性。但是当作业发生故障或重启时,要保障从当前的消费位点去处理数据(即Exactly Once语义),单纯的依靠SparkStreaming本身的机制是不太理想的,生产环境中通常借助手动管理offset的方式来维护kafka的消费位点。本文分享将介绍如何手动管理Kafka的Offset,希望对你有所帮助。本文主要包括以下内容:

  • 如何使用MySQL管理Kafka的Offset
  • 如何使用Redis管理Kafka的OffSet

如何使用MySQL管理Kafka的Offset

我们可以从Spark Streaming 应用程序中编写代码来手动管理Kafka偏移量,偏移量可以从每一批流处理中生成的RDDS偏移量来获取,获取方式为:

KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
// 获取偏移量
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges...}

当获取到偏移量之后,可以将将其保存到外部存储设备中(MySQL、Redis、Zookeeper、HBase等)。

使用案例代码

  • MySQL中用于保存偏移量的表
CREATE TABLE `topic_par_group_offset` (`topic` varchar(255) NOT NULL,`partition` int(11) NOT NULL,`groupid` varchar(255) NOT NULL,`offset` bigint(20) DEFAULT NULL,PRIMARY KEY (`topic`,`partition`,`groupid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ;
  • 常量配置类:ConfigConstants
object ConfigConstants {// Kafka配置val kafkaBrokers = "kms-2:9092,kms-3:9092,kms-4:9092"val groupId = "group_test"val kafkaTopics = "test"val batchInterval = Seconds(5)val streamingStorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2val kafkaKeySer = "org.apache.kafka.common.serialization.StringSerializer"val kafkaValueSer = "org.apache.kafka.common.serialization.StringSerializer"val sparkSerializer = "org.apache.spark.serializer.KryoSerializer"val batchSize = 16384val lingerMs = 1val bufferMemory = 33554432// MySQL配置val user = "root"val password = "123qwe"val url = "jdbc:mysql://localhost:3306/kafka_offset"val driver = "com.mysql.jdbc.Driver"// 检查点配置val checkpointDir = "file:///e:/checkpoint"val checkpointInterval = Seconds(10)// Redis配置val redisAddress = "192.168.10.203"val redisPort = 6379val redisAuth = "123qwe"val redisTimeout = 3000
}
  • JDBC连接工具类:JDBCConnPool
object JDBCConnPool {val log: Logger = Logger.getLogger(JDBCConnPool.getClass)var dataSource: BasicDataSource = null/*** 创建数据源** @return*/def getDataSource(): BasicDataSource = {if (dataSource == null) {dataSource = new BasicDataSource()dataSource.setDriverClassName(ConfigConstants.driver)dataSource.setUrl(ConfigConstants.url)dataSource.setUsername(ConfigConstants.user)dataSource.setPassword(ConfigConstants.password)dataSource.setMaxTotal(50)dataSource.setInitialSize(3)dataSource.setMinIdle(3)dataSource.setMaxIdle(10)dataSource.setMaxWaitMillis(2 * 10000)dataSource.setRemoveAbandonedTimeout(180)dataSource.setRemoveAbandonedOnBorrow(true)dataSource.setRemoveAbandonedOnMaintenance(true)dataSource.setTestOnReturn(true)dataSource.setTestOnBorrow(true)}return dataSource}/*** 释放数据源*/def closeDataSource() = {if (dataSource != null) {dataSource.close()}}/*** 获取数据库连接** @return*/def getConnection(): Connection = {var conn: Connection = nulltry {if (dataSource != null) {conn = dataSource.getConnection()} else {conn = getDataSource().getConnection()}} catch {case e: Exception =>log.error(e.getMessage(), e)}conn}/*** 关闭连接*/def closeConnection (ps:PreparedStatement , conn:Connection ) {if (ps != null) {try {ps.close();} catch  {case e:Exception =>log.error("预编译SQL语句对象PreparedStatement关闭异常!" + e.getMessage(), e);}}if (conn != null) {try {conn.close();} catch  {case e:Exception =>log.error("关闭连接对象Connection异常!" + e.getMessage(), e);}}}
}
  • Kafka生产者:KafkaProducerTest
object KafkaProducerTest {def main(args: Array[String]): Unit = {val  props : Properties = new Properties()props.put("bootstrap.servers", ConfigConstants.kafkaBrokers)props.put("batch.size", ConfigConstants.batchSize.asInstanceOf[Integer])props.put("linger.ms", ConfigConstants.lingerMs.asInstanceOf[Integer])props.put("buffer.memory", ConfigConstants.bufferMemory.asInstanceOf[Integer])props.put("key.serializer",ConfigConstants.kafkaKeySer)props.put("value.serializer", ConfigConstants.kafkaValueSer)val  producer :  Producer[String, String] = new KafkaProducer[String, String](props)val startTime : Long  = System.currentTimeMillis()for ( i <- 1 to 100) {producer.send(new ProducerRecord[String, String](ConfigConstants.kafkaTopics, "Spark", Integer.toString(i)))}println("消耗时间:" + (System.currentTimeMillis() - startTime))producer.close()}
}
  • 读取和保存Offset:

该对象的作用是从外部设备中读取和写入Offset,包括MySQL和Redis

object OffsetReadAndSave {/*** 从MySQL中获取偏移量** @param groupid* @param topic* @return*/def getOffsetMap(groupid: String, topic: String): mutable.Map[TopicPartition, Long] = {val conn = JDBCConnPool.getConnection()val selectSql = "select * from topic_par_group_offset where groupid = ? and topic = ?"val ppst = conn.prepareStatement(selectSql)ppst.setString(1, groupid)ppst.setString(2, topic)val result: ResultSet = ppst.executeQuery()// 主题分区偏移量val topicPartitionOffset = mutable.Map[TopicPartition, Long]()while (result.next()) {val topicPartition: TopicPartition = new TopicPartition(result.getString("topic"), result.getInt("partition"))topicPartitionOffset += (topicPartition -> result.getLong("offset"))}JDBCConnPool.closeConnection(ppst, conn)topicPartitionOffset}/*** 从Redis中获取偏移量** @param groupid* @param topic* @return*/def getOffsetFromRedis(groupid: String, topic: String): Map[TopicPartition, Long] = {val jedis: Jedis = JedisConnPool.getConnection()var offsets = mutable.Map[TopicPartition, Long]()val key = s"${topic}_${groupid}"val fields : java.util.Map[String, String] = jedis.hgetAll(key)for (partition <- JavaConversions.mapAsScalaMap(fields)) {offsets.put(new TopicPartition(topic, partition._1.toInt), partition._2.toLong)}offsets.toMap}/*** 将偏移量写入MySQL** @param groupid     消费者组ID* @param offsetRange 消息偏移量范围*/def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = {val conn = JDBCConnPool.getConnection()val insertSql = "replace into topic_par_group_offset(`topic`, `partition`, `groupid`, `offset`) values(?,?,?,?)"val ppst = conn.prepareStatement(insertSql)for (offset <- offsetRange) {ppst.setString(1, offset.topic)ppst.setInt(2, offset.partition)ppst.setString(3, groupid)ppst.setLong(4, offset.untilOffset)ppst.executeUpdate()}JDBCConnPool.closeConnection(ppst, conn)}/*** 将偏移量保存到Redis中* @param groupid* @param offsetRange*/def saveOffsetToRedis(groupid: String, offsetRange: Array[OffsetRange]) = {val jedis :Jedis = JedisConnPool.getConnection()for(offsetRange<-offsetRange){val topic=offsetRange.topicval partition=offsetRange.partitionval offset=offsetRange.untilOffset// key为topic_groupid,field为partition,value为offsetjedis.hset(s"${topic}_${groupid}",partition.toString,offset.toString)}}
}
  • 业务处理类

该对象是业务处理逻辑,主要是消费Kafka数据,再处理之后进行手动将偏移量保存到MySQL中。在启动程序时,会判断外部存储设备中是否存在偏移量,如果是首次启动则从最初的消费位点消费,如果存在Offset,则从当前的Offset去消费。

观察现象:当首次启动时会从头消费数据,手动停止程序,然后再次启动,会发现会从当前提交的偏移量消费数据。
object ManualCommitOffset {def main(args: Array[String]): Unit = {val brokers = ConfigConstants.kafkaBrokersval groupId = ConfigConstants.groupIdval topics = ConfigConstants.kafkaTopicsval batchInterval = ConfigConstants.batchIntervalval conf = new SparkConf().setAppName(ManualCommitOffset.getClass.getSimpleName).setMaster("local[1]").set("spark.serializer",ConfigConstants.sparkSerializer)val ssc = new StreamingContext(conf, batchInterval)// 必须开启checkpoint,否则会报错ssc.checkpoint(ConfigConstants.checkpointDir)ssc.sparkContext.setLogLevel("OFF")//使用broker和topic创建direct kafka streamval topicSet = topics.split(" ").toSet// kafka连接参数val kafkaParams = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,ConsumerConfig.GROUP_ID_CONFIG -> groupId,ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest")// 从MySQL中读取该主题对应的消费者组的分区偏移量val offsetMap = OffsetReadAndSave.getOffsetMap(groupId, topics)var inputDStream: InputDStream[ConsumerRecord[String, String]] = null//如果MySQL中已经存在了偏移量,则应该从该偏移量处开始消费if (offsetMap.size > 0) {println("存在偏移量,从该偏移量处进行消费!!")inputDStream = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap))} else {//如果MySQL中没有存在了偏移量,从最早开始消费inputDStream = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))}// checkpoint时间间隔,必须是batchInterval的整数倍inputDStream.checkpoint(ConfigConstants.checkpointInterval)// 保存batch的offsetvar offsetRanges = Array[OffsetRange]()// 获取当前DS的消息偏移量val transformDS = inputDStream.transform { rdd =>// 获取offsetoffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd}/*** 状态更新函数* @param newValues:新的value值* @param stateValue:状态值* @return*/def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {var oldvalue = stateValue.getOrElse(0) // 获取状态值// 遍历当前数据,并更新状态for (newValue <- newValues) {oldvalue += newValue}// 返回最新的状态Option(oldvalue)}// 业务逻辑处理// 该示例统计消息key的个数,用于查看是否是从已经提交的偏移量消费数据transformDS.map(meg => ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print()// 打印偏移量和数据信息,观察输出的结果transformDS.foreachRDD { (rdd, time) =>// 遍历打印该RDD数据rdd.foreach { record =>println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")}// 打印消费偏移量信息for (o <- offsetRanges) {println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}")}//将偏移量保存到到MySQL中OffsetReadAndSave.saveOffsetRanges(groupId, offsetRanges)}ssc.start()ssc.awaitTermination()}
}

如何使用Redis管理Kafka的OffSet

  • Redis连接类
object JedisConnPool {val config = new JedisPoolConfig//最大连接数config.setMaxTotal(60)//最大空闲连接数config.setMaxIdle(10)config.setTestOnBorrow(true)//服务器ipval redisAddress :String = ConfigConstants.redisAddress.toString// 端口号val redisPort:Int = ConfigConstants.redisPort.toInt//访问密码val redisAuth :String = ConfigConstants.redisAuth.toString//等待可用连接的最大时间val redisTimeout:Int = ConfigConstants.redisTimeout.toIntval pool = new JedisPool(config,redisAddress,redisPort,redisTimeout,redisAuth)def getConnection():Jedis = {pool.getResource}}
  • 业务逻辑处理

该对象与上面的基本类似,只不过使用的是Redis来进行存储Offset,存储到Redis的数据类型是Hash,基本格式为:[key field value] -> [ topic_groupid partition offset],即 key为topic_groupid,field为partition,value为offset。

object ManualCommitOffsetToRedis {def main(args: Array[String]): Unit = {val brokers = ConfigConstants.kafkaBrokersval groupId = ConfigConstants.groupIdval topics = ConfigConstants.kafkaTopicsval batchInterval = ConfigConstants.batchIntervalval conf = new SparkConf().setAppName(ManualCommitOffset.getClass.getSimpleName).setMaster("local[1]").set("spark.serializer", ConfigConstants.sparkSerializer)val ssc = new StreamingContext(conf, batchInterval)// 必须开启checkpoint,否则会报错ssc.checkpoint(ConfigConstants.checkpointDir)ssc.sparkContext.setLogLevel("OFF")//使用broker和topic创建direct kafka streamval topicSet = topics.split(" ").toSet// kafka连接参数val kafkaParams = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,ConsumerConfig.GROUP_ID_CONFIG -> groupId,ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest")// 从Redis中读取该主题对应的消费者组的分区偏移量val offsetMap = OffsetReadAndSave.getOffsetFromRedis(groupId, topics)var inputDStream: InputDStream[ConsumerRecord[String, String]] = null//如果Redis中已经存在了偏移量,则应该从该偏移量处开始消费if (offsetMap.size > 0) {println("存在偏移量,从该偏移量处进行消费!!")inputDStream = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap))} else {//如果Redis中没有存在了偏移量,从最早开始消费inputDStream = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))}// checkpoint时间间隔,必须是batchInterval的整数倍inputDStream.checkpoint(ConfigConstants.checkpointInterval)// 保存batch的offsetvar offsetRanges = Array[OffsetRange]()// 获取当前DS的消息偏移量val transformDS = inputDStream.transform { rdd =>// 获取offsetoffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd}/*** 状态更新函数** @param newValues  :新的value值* @param stateValue :状态值* @return*/def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {var oldvalue = stateValue.getOrElse(0) // 获取状态值// 遍历当前数据,并更新状态for (newValue <- newValues) {oldvalue += newValue}// 返回最新的状态Option(oldvalue)}// 业务逻辑处理// 该示例统计消息key的个数,用于查看是否是从已经提交的偏移量消费数据transformDS.map(meg => ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print()// 打印偏移量和数据信息,观察输出的结果transformDS.foreachRDD { (rdd, time) =>// 遍历打印该RDD数据rdd.foreach { record =>println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")}// 打印消费偏移量信息for (o <- offsetRanges) {println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}")}//将偏移量保存到到Redis中OffsetReadAndSave.saveOffsetToRedis(groupId, offsetRanges)}ssc.start()ssc.awaitTermination()}}

总结

本文介绍了如何使用外部存储设备来保存Kafka的消费位点,通过详细的代码示例说明了使用MySQL和Redis管理消费位点的方式。当然,外部存储设备很多,用户也可以使用其他的存储设备进行管理Offset,比如Zookeeper和HBase等,其基本处理思路都十分相似。

大数据技术与数仓

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/527538.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

anaconda 安装pytorch_conda上安装PyTorch

conda上安装PyTorch这里的安装系统&#xff1a;Windows系统第一步&#xff0c;打开Anaconda Prompt第二步&#xff0c;为pytorch创建一个环境&#xff1a;conda create -n pytorch python3.8&#xff08;这里的pytorch是环境的名称&#xff0c;python3.8表示安装的是3.8版本的p…

curd什么意思中文_查英英字典:What a shame是什么意思?

查英英字典&#xff1a;What a shame是什么意思&#xff1f;“互联网”时代学习英语有两种“学习方法”&#xff1a;简单粗暴学法和自己动手丰衣足食法。一、简单粗暴法&#xff1a;直接问老师要“中文答案”在过去非互联网时代&#xff0c;“老师”往往是提供“答案”的唯一渠…

c++读出像素矩阵_Python传numpy矩阵调c++(求3D图像连通区域)

Python有很多种调c的方法&#xff0c;有的复杂有的简单&#xff0c;有时使用的时候反而不知道到底该用哪一种比较好&#xff0c;其实没有最好的方法&#xff0c;只有适合不适合自己。本文从我所遇到的问题说起&#xff0c;然后讲述另一种比较简单的python调c并且传参numpy矩阵的…

android四大组件的作用简书,Android四大组件是什么

Android四大组件是&#xff1a;活动、服务、广播接收器、内容提供商。它们的英文名称是ACTIVITY、SERVICE、BroadcastReceiver、Content Provider。四个组件分别起到不同的作用&#xff0c;相互配合才能确保安卓系统的正常运行&#xff0c;因此是缺一不可的。Android四大组件及…

python 获取文件大小_第41p,超级重要,Python中的os库

大家好&#xff0c;我是杨数Tos&#xff0c;这是《从零基础到大神》系列课程的第41篇文章&#xff0c;第二阶段的课程&#xff1a;Python基础知识&#xff1a;Python内置库之os库的使用。学习本课程&#xff0c;建议先看一遍&#xff1a;【计算机基础知识】课程。os模块是与操作…

惠普打印机节能环保认证证书_低成本高效办公 苏宁惠普超品日这几款打印机了解下!...

【PConline 导购】说到打印机&#xff0c;很多朋友会想到公司那台不停运转的打印机。其实&#xff0c;伴随着近几年来打印机技术的成熟&#xff0c;其打印成本也一降再降&#xff0c;这就让有打印的需求的中下型企业&#xff0c;甚至个人&#xff0c;都会去选购一款合适的打印产…

vm客户机隔离不能选_开汽车美容店,这些位置绝对不能选,会让你门可罗雀,生意惨淡...

之前的文章里&#xff0c;讲了一些开汽车美容店选址的要领&#xff0c;今天&#xff0c;来聊聊一些更加细致的选址要素&#xff0c;让你避免错误选址而导致生意不佳。门面宽度小于2.5米的不要选一辆普通汽车的宽度在1.8米-2米之间&#xff0c;加上两侧后视镜各20公分左右&#…

markdown格式_第1篇:如何将Markdown笔记转入ANKI复习? | 学习骇客

用技术和心理学改善学习 第128次摘要&#xff1a;将日常使用的Markdown笔记软件与复习工具ANKI结合起来&#xff0c;于ANKI用户而言可以简化学习过程&#xff0c;于一般的学习者而言可以解决笔记“记而不学”的问题。本文摘选自视频课程《复习的技术&#xff0c;跟LEO学ANKI》(…

android手机无分区无法刷机,手机刷死了别说没提醒!安卓设备刷机前必看

大家好&#xff0c;清明节已经过去了&#xff0c;上班的感觉是不是很不爽&#xff1f;但是告诉大家一个好消息是&#xff1a;本周只需要煎熬三天&#xff0c;大家就又可以休息了&#xff01;听了这个消息&#xff0c;不爽的心情是不是稍微好一些了&#xff1f;本期的微信和大家…

mysqlbinlog工具_带你解析MySQL binlog

前言&#xff1a;我们都知道&#xff0c;binlog可以说是MySQL中比较重要的日志了&#xff0c;在日常学习及运维过程中&#xff0c;也经常会遇到。不清楚你对binlog了解多少呢&#xff1f;本篇文章将从binlog作用、binlog相关参数、解析binlog内容三个方面带你了解binlog。1.bin…

inputstream 初始化_如何完美回答面试官问的Mybatis初始化原理!

前言对于任何框架而言&#xff0c;在使用前都要进行一系列的初始化&#xff0c;MyBatis也不例外。本章将通过以下几点详细介绍MyBatis的初始化过程。MyBatis的初始化做了什么MyBatis基于XML配置文件创建Configuration对象的过程手动加载XML配置文件创建Configuration对象完成初…

html中加减号怎么输入,jQuery 实现点击加减号改变input标签中的value值,该怎么解决...

jQuery 实现点击加减号改变input标签中的value值我想点击左右两边的加减号&#xff0c;让中间input标签中的value属性值做出相应的改变&#xff0c;jQuery怎么实现&#xff1f;------解决思路----------------------$("button1").click(function(){var num $("…

java mybatis狂神说sql_狂神说MyBatis01:第一个程序

狂神说MyBatis系列连载课程&#xff0c;通俗易懂&#xff0c;基于MyBatis3.5.2版本&#xff0c;欢迎各位狂粉转发关注学习&#xff0c;视频同步文档。未经作者授权&#xff0c;禁止转载MyBatis简介环境说明&#xff1a;jdk 8 MySQL 5.7.19maven-3.6.1IDEA学习前需要掌握&#x…

鸿蒙系统暗黑2,暗黑破坏神2为什么被称为神作!看看装备强化系统就知道有多完美...

暗黑破坏神2之所以被玩家们称为神作是因为真的好玩&#xff0c;那么游戏的精髓到底在哪呢&#xff1f;个人觉得还要算其出色的装备强化系统&#xff0c;如果应用在现在的部分作品中&#xff0c;暗黑破坏神2的特色可以总结为肝&#xff0c;彻底肝。但它又区别于传统的必须肝&…

c语言api_用C语言来拓展python的功能

python是一门功能强大的高级脚本语言&#xff0c;它的强大不仅表现在其自身的功能上&#xff0c;而且还表现在其良好的可扩展性上&#xff0c;正因如此&#xff0c;python已经开始受到越来越多人的青睐&#xff0c;并且被屡屡成功地应用于各类大型软件系统的开发过程中。与其它…

html模拟在线股票走势,基于Html5的股票行情k线图源码

K线图 滑块控制这个K线图和flash实现的K线图非常接近&#xff0c;滑块控制是实现的难点&#xff0c;这里是根据滑块滑动的位置计算k线数据的范围&#xff0c;并实时重画&#xff0c;事实证明html5 canvas标签的性能还是相当的好的&#xff0c;在PC机上每秒可以重画20次以上&…

html5 css 三角形,css怎么画三角形?

css怎么画三角形&#xff1f;下面本篇文章就来给大家介绍一下使用CSS画三角形的方法。有一定的参考价值&#xff0c;有需要的朋友可以参考一下&#xff0c;希望对大家有所帮助。css怎么画三角形&#xff1f;三角形实现原理&#xff1a;宽度width为0&#xff1b;height为0&#…

springboot整合JPA 多表关联 :一对多 多对多

补充一下自定义SQL 这是连表查询&#xff0c;可以任意查出字符&#xff0c;用Map接收 Testvoid test3() {JPAQueryFactory jpaQueryFactory new JPAQueryFactory(em);QStudent student QStudent.student;QMessage message QMessage.message;//constructor(StuMesDto.class, …

python网络库_python的网络库

最近新功能上线&#xff0c;帮忙加了几个监控脚本。上次用的perl&#xff0c;语法太随意了&#xff0c;看起来很是不整洁&#xff0c;自己写的都觉得不好&#xff0c;更不要说给别人看。好久没用python了&#xff0c;反正这次准备使用新的监控设计方案&#xff0c;刚好换一下。…

求一批整数中出现最多的个位数字_(43)C++面试之从1到n整数中1出现的次数

// 面试题43&#xff1a;从1到n整数中1出现的次数// 题目&#xff1a;输入一个整数n&#xff0c;求从1到n这n个整数的十进制表示中1出现的次数。例如// 输入12&#xff0c;从1到12这些整数中包含1 的数字有1&#xff0c;10&#xff0c;11和12&#xff0c;1一共出现了5次。#incl…