kafka集成篇

kafka的Java客户端

生产者

1.引入依赖

        <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.6.3</version></dependency>

2.生产者发送消息的基本实现

/*** 消息的发送⽅*/
public class MyProducer {private final static String TOPIC_NAME = "my-replicated-topic";public static void main(String[] args) {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"124.222.253.33:9092,124.222.253.33:9093,124.222.253.33:9094");// 把发送的key从字符串序列化为字节数组props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 把发送消息value从字符串序列化为字节数组props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());RecordMetadata metadata = null;try (Producer<String, String> producer = new KafkaProducer<>(props)) {Order order = new Order(1L, 99.9D);// 未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNumProducerRecord<String, String> producerRecord = newProducerRecord<>(TOPIC_NAME, order.getOrderId().toString(), JSON.toJSONString(order));// 等待消息发送成功的同步阻塞⽅法metadata = producer.send(producerRecord).get();} catch (InterruptedException | ExecutionException e) {throw new RuntimeException(e);} finally {if (metadata != null) {// =====阻塞=======System.out.println("同步⽅式发送消息结果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" +metadata.offset());}}}
}

3.发送消息到指定分区

image-20230814170847906

4.发送消息未指定分区

发送消息未指定分区,会通过业务key的hash运算,算出消息往哪个分区上发

// 未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNum
ProducerRecord<String, String> producerRecord = newProducerRecord<>(TOPIC_NAME, order.getOrderId().toString(), JSON.toJSONString(order));

5.同步发送消息

image-20230814172356139

如果生产者发送消息没有收到ack,生产者会阻塞,阻塞到3s的时间,如果还没有收到消息,会进行重试。重试的次数3次。

    RecordMetadata metadata = producer.send(producerRecord).get();System.out.println("同步⽅式发送消息结果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());

6.异步发送消息

image-20230814173250894

异步发送,生产者发送完消息后就可以执行之后的业务,broker在收到消息后异步调用生产者提供的callback回调方法。

            // 异步发送消息 Callback回调接口producer.send(producerRecord, new Callback() {// 异步回调方法@Overridepublic void onCompletion(RecordMetadata metadata, Exception e) {if (e != null) {System.err.println("发送消息失败:" +e.getMessage());}if (metadata != null) {System.out.println("异步⽅式发送消息结果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());}}});System.out.println("处理之后的逻辑~");

输出结果:

image-20230814173709486

7.生产者中的ack的配置

在同步发消息的场景下:生产者发送消息到broker上后,ack会有3种不同的选择

  • ack = 0 :kafka-cluster不需要任何的broker收到消息,就立即返回ack给生产者就可以继续发送下一条消息,效率是最高的但最容易丢消息
  • ack=1(默认):多副本之间的leader已经收到消息,并把消息写⼊到本地的log中,才会返回ack给生产者,性能和安全性是最均衡的(这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失)
  • ack=-1/all:需要等待 min.insync.replicas(默认为1,推荐配置大于等于2) 这个参数配置的副本个数都成功写入日志才会返回ack给生产者,这种策略会保证只要有⼀个备份存活就不会丢失数据。这种方式最安全但性能最差。(⼀般除非是金融级别,或跟钱打交道的场景才会使用这种配置)

image-20230814175916962

code:

props.put(ProducerConfig.ACKS_CONFIG, "1");

关于ack和重试(如果没有收到ack,就开启重试)的配置

  • 发送会默认会重试3次,每次间隔100ms
props.put(ProducerConfig.ACKS_CONFIG, "1");/*发送失败会重试,默认重试间隔100ms,【重试能保证消息发送的可靠性,但是也可能造成消息重复发送】,⽐如⽹络抖动,所以【需要在接收者那边做好消息接收的幂等性处理】*/props.put(ProducerConfig.RETRIES_CONFIG, 3);// 重试间隔设置props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);

8.关于消息发送的缓冲区

发送的消息会先进入到本地缓冲区(32mb),kakfa会跑⼀个线程,该线程去缓冲区中取16k的数据,发送到kafka,如果到10毫秒数据没取满16k,也会发送⼀次。

image-20230814180715793

  • kafka默认会创建一个消息缓冲区,用来存放要发送的消息,缓冲区是32m
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
  • kafka本地线程会去缓冲区中⼀次拉16k的数据,发送到broker
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
  • 如果线程拉不到16k的数据,间隔10ms也会将已拉到的数据发到broker
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);

消费者

1.消费者消费消息的基本实现

public class MyConsumer {private final static String TOPIC_NAME = "my-replicated-topic";private final static String CONSUMER_GROUP_NAME = "testGroup";public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"124.222.253.33:9092,124.222.253.33:9093,124.222.253.33:9094");// 消费分组名props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 1.创建⼀个消费者的客户端try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {// 2.消费者订阅主题列表consumer.subscribe(Collections.singletonList(TOPIC_NAME));while (true) {/** 3.poll()API 是拉取消息的⻓轮询*/ConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {// 4.操作消息System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n ", record.partition(), record.offset(), record.key(), record.value());}}} catch (Exception e) {throw new RuntimeException(e);}}
}

2.消费者自动提交和手动提交offset

1)提交的内容

消费者无论是自动提交还是手动提交,都需要把所属的消费组+消费的某个主题+消费的某个分区及消费的偏移量,这样的信息提交到集群的_consumer_offsets主题里面。

2)自动提交

消费者poll消息下来以后就会自动提交offset

// 是否自动提交offset,默认就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自动提交offset的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

注意自动提交会丢消息。因为消费者在消费前提交offset,有可能提交完后还没消费时消费者挂了。于是下⼀个消费者会从已提交的offset的下一个位置开始消费消息。之前未被消费的消息就丢失掉了。

3)手动提交

需要把自动提交的配置改成false

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

手动提交又分成了两种

  • 手动同步提交

在消费完消息后调用同步提交的方法,当集群返回ack前⼀直阻塞,返回ack后表示提交成功,执行之后的逻辑

            while (true) {/** poll()API 是拉取消息的⻓轮询*/ConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {// 操作消息System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n ", record.partition(), record.offset(), record.key(), record.value());}// 所有的消息已消费完if (records.count() > 0) {// 有消息// ⼿动同步提交offset,当前线程会阻塞直到offset提交成功// 【⼀般使⽤同步提交】,因为提交之后⼀般也没有什么逻辑代码了consumer.commitSync();// =======阻塞=== 提交成功}}
  • 手动异步提交

在消息消费完后提交,不需要等到集群ack,直接执行之后的逻辑,可以设置⼀个回调方法,供集群调用

            while (true) {/** poll()API 是拉取消息的⻓轮询*/ConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {// 操作消息System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n ", record.partition(), record.offset(), record.key(), record.value());}// 所有的消息已消费完if (records.count() > 0) {// 有消息// ⼿动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后⾯的程序逻辑consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition,OffsetAndMetadata> offsets, Exception exception) {if (exception != null) {System.err.println("Commit failed for " + offsets);System.err.println("Commit failed exception: " + exception.getMessage());}}});}}

3.长轮询poll消息(消费者拉取消息)

  • 消费者建立了与broker之间的长连接,开始poll消息

  • 默认情况下,消费者一次会poll500条消息

// ⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
  • 代码中设置了长轮询的时间是1000毫秒
            while (true) {/** poll()API 是拉取消息的⻓轮询*/ConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n ", record.partition(), record.offset(), record.key(), record.value());}}
  • 意味着:
    • 如果⼀次poll到500条,就直接执行for循环
    • 如果这⼀次没有poll到500条。且时间在1秒内,那么长轮询继续poll,要么到500条,要么到1s,执行后续for循环
    • 如果多次poll都没达到500条,且1秒时间到了,那么直接执行for循环
    • 如果两次poll的间隔超过30s(poll时间短但是消费时间长,消费者消费可能会达到30s左右),集群会认为该消费者的消费能力过 弱,该消费者被踢出消费组,触发rebalance机制,rebalance机制会造成性能开销

可以通过设置参数, 让⼀次poll的消息条数少⼀点,避免触发rebalance损耗性能

 // ⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);// 如果两次poll的时间如果超出了30s的时间间隔,kafka会认为其消费能⼒过弱,将其踢出消费组。将分区分配给其他消费者。-rebalanceprops.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);

4.消费者的健康状态检查

消费者每隔1s向kafka集群发送心跳,集群发现如果有超过10s没有续约的消费者,将被踢出消费组,触发该消费组的rebalance机制,将该分区交给消费组里的其他消费者进行消费。

// consumer给broker发送心跳的间隔时间  1s一次
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
// kafka如果超过10秒没有收到消费者的心跳,则会把消费者踢出消费组,进⾏rebalance,把分区分配给其他消费者。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);

5.指定分区和偏移量、时间消费

  • 指定分区消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
  • 从头消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
  • 指定offset消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);
  • 指定时间消费

根据时间,去所有的partition中确定该时间对应的offset,然后去所有的partition中找到该offset之后的消息开始消费。

// topic对应所有分区
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
// 从1小时前开始消费
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);
}
Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {TopicPartition key = entry.getKey();OffsetAndTimestamp value = entry.getValue();if (key == null || value == null) continue;long offset = value.offset();System.out.println("partition-" + key.partition() +"|offset-" + offset);System.out.println();//根据消费⾥的timestamp确定offsetconsumer.assign(Arrays.asList(key));consumer.seek(key, offset);
}

6.新消费组的消费offset规则

新消费组中的消费者在启动以后,默认会从当前分区的最后⼀条消息的offset+1开始消费(消费新消息)。可以通过以下的设置,让新的消费者第⼀次从头开始消费。之后开始消费新消息(最后消费的位置的偏移量+1)

  • Latest:默认的,消费新消息

  • earliest:第⼀次从头开始消费。之后开始消费新消息(最后消费的位置的偏移量+1),这个需要区别于consumer.seekToBeginning(每次都从头开始消费)

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

SpringBoot集成kafka

1.引入依赖

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

2.配置文件

server:port: 8080
spring:kafka:bootstrap-servers: 124.222.253.33:9092,124.222.253.33:9093,124.222.253.33:9094producer: # 生产者retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送batch-size: 16384 # 每次拉取多少数据发送broker buffer-memory: 33554432 # 本地缓冲区大小acks: 1# 指定消息key和消息体的编解码⽅式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: default-groupenable-auto-commit: falseauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 500listener:# 当每⼀条记录被消费者监听器(ListenerConsumer)处理之后提交# RECORD# 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交# BATCH# 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交# TIME# 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交# COUNT# TIME | COUNT 有⼀个条件满足时提交# COUNT_TIME# 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交# MANUAL# 【手动调用Acknowledgment.acknowledge()后立即提交,⼀般使用这种】# MANUAL_IMMEDIATEack-mode: MANUAL_IMMEDIATE

3.消息生产者

发送消息到指定topic

image-20230815110653415

4.消息消费者

设置消费组,消费指定topic

@Component
public class MyConsumer {@KafkaListener(topics = "my-replicated-topic", groupId = "MyGroup1")public void listenGroup(ConsumerRecord<String, String> record,Acknowledgment ack) {String value = record.value();System.out.println(record);System.out.println(value);//⼿动提交offsetack.acknowledge();}
}

5.消费者中配置消费主题、分区和偏移量

设置消费组、多topic、指定分区、指定偏移量消费及设置消费者个数

    @KafkaListener(groupId = "testGroup", topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0", "1"}),@TopicPartition(topic = "topic2", partitions = "0",partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))}, concurrency = "3")// concurrency:同消费组中消费者个数,就是并发消费数,建议小于等于分区总数public void listenGroupPro(ConsumerRecord<String, String> record,Acknowledgment ack) {String value = record.value();System.out.println(value);System.out.println(record);//⼿动提交offsetack.acknowledge();}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/43041.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

分类预测 | MATLAB实现DRN深度残差网络多输入分类预测

分类预测 | MATLAB实现DRN深度残差网络多输入分类预测 目录 分类预测 | MATLAB实现DRN深度残差网络多输入分类预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.分类预测 | MATLAB实现DRN深度残差网络多输入分类预测 2.代码说明&#xff1a;MATLAB实现DRN深度残差网络…

LVS集群和nginx负载均衡

目录 1、基于 CentOS 7 构建 LVS-DR 群集。 2、配置nginx负载均衡。 1、基于 CentOS 7 构建 LVS-DR 群集。 1.部署LVS负载调度器 1>安装配置工具 [rootnode6 ~]# yum install -y ipvsadm 2>配置LVS虚拟IP&#xff08;VIP地址&#xff09; [rootnode6 ~]# ifconfig ens…

32.Netty源码之服务端如何处理客户端新建连接

highlight: arduino-light 服务端如何处理客户端新建连接 Netty 服务端完全启动后&#xff0c;就可以对外工作了。接下来 Netty 服务端是如何处理客户端新建连接的呢&#xff1f; 主要分为四步&#xff1a; md Boss NioEventLoop 线程轮询客户端新连接 OP_ACCEPT 事件&#xff…

算法与数据结构(七)--堆

一.堆 1.堆的定义 堆是计算机科学中一类特殊的数据结构的通常&#xff0c;堆通常可以被看做是一颗完全二叉树的数组对象。 堆的特性 1.它是完全二叉树&#xff0c;除了树的最后一层结点不需要是满的&#xff0c;其他的每一层从左到右都是满的&#xff0c;如果最后一层结点不…

linux 文件权限识别及其修改

一、文件权限认识 在 Linux 系统中&#xff0c;一切皆文件&#xff0c;目录也是一种文件形式叫目录文件&#xff0c;它们的属性主要包含&#xff1a;索引节点(inode)&#xff0c;类型、权限属性、链接数、所归属的用户和用户组、最近修改时间等内容。 如下为根目录下目录&…

改进YOLO系列:3.添加SOCA注意力机制

添加SOCA注意力机制 1. SOCA注意力机制论文2. SOCA注意力机制原理3. SOCA注意力机制的配置3.1common.py配置3.2yolo.py配置3.3yaml文件配置1. SOCA注意力机制论文 暂未找到 2. SOCA注意力机制原理 3. SOCA注意力机制的配置 3.1common.py配置 ./models/common.p…

Linux 网络发包流程

哈喽大家好&#xff0c;我是咸鱼 之前咸鱼在《Linux 网络收包流程》一文中介绍了 Linux 是如何实现网络接收数据包的 简单回顾一下&#xff1a; 数据到达网卡之后&#xff0c;网卡通过 DMA 将数据放到内存分配好的一块 ring buffer 中&#xff0c;然后触发硬中断CPU 收到硬中…

Lnton羚通关于Optimization在【PyTorch】中的基础知识

OPTIMIZING MODEL PARAMETERS &#xff08;模型参数优化&#xff09; 现在我们有了模型和数据&#xff0c;是时候通过优化数据上的参数来训练了&#xff0c;验证和测试我们的模型。训练一个模型是一个迭代的过程&#xff0c;在每次迭代中&#xff0c;模型会对输出进行猜测&…

python3 0基础学习----数据结构(基础+练习)

python 0基础学习笔记之数据结构 &#x1f4da; 几种常见数据结构列表 &#xff08;List&#xff09;1. 定义2. 实例&#xff1a;3. 列表中常用方法.append(要添加内容) 向列表末尾添加数据.extend(列表) 将可迭代对象逐个添加到列表中.insert(索引&#xff0c;插入内容) 向指定…

国家一带一路和万众创业创新的方针政策指引下,Live Market探索跨境产业的创新发展

现代社会&#xff0c;全球经济互联互通&#xff0c;跨境产业也因此而崛起。为了推动跨境产业的创新发展&#xff0c;中国政府提出了“一带一路”和“万众创业、万众创新”的方针政策&#xff0c;旨在促进全球经济的互联互通和创新发展。在这个大环境下&#xff0c;Live Market积…

Mariadb高可用MHA

本节主要学习了Mariadb高可用MHA的概述&#xff0c;案例如何构建MHA 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 一、概述 1、概念 MHA&#xff08;MasterHigh Availability&#xff09;是一套优秀的MySQL高可用环境下故障切换和主从复制的软件。…

合宙Air724UG LuatOS-Air LVGL API--简介

为何是 LVGL LVGL 是一个开源的图形库&#xff0c;它提供了创建嵌入式 GUI 所需的一切&#xff0c;具有易于使用的图形元素、漂亮的视觉效果和低内存占用的特点。 LVGL特点&#xff1a; 强大的 控件 &#xff1a;按钮、图表、列表、滑动条、图像等 高级图形引擎&#xff1a;动…

BIO、NIO和AIO

一.引言 何为IO 涉及计算机核心(CPU和内存)与其他设备间数据迁移的过程&#xff0c;就是I/O。数据输入到计算机内存的过程即输入&#xff0c;反之输出到外部存储&#xff08;比如数据库&#xff0c;文件&#xff0c;远程主机&#xff09;的过程即输出。 I/O 描述了计算机系统…

插入排序优化——超越归并排序的超级算法

插入排序及优化 插入排序算法算法讲解数据模拟代码 优化思路一、二分查找二、copy函数 优化后代码算法的用途题目&#xff1a;数星星&#xff08;POJ2352 star&#xff09;输入输出格式输入格式&#xff1a;输出格式 输入输出样例输入样例输出样例 题目讲解步骤如下AC 代码 插入…

Linux系统中基于NGINX的代理缓存配置指南

作为一名专业的爬虫程序员&#xff0c;你一定知道代理缓存在加速网站响应速度方面的重要性。而使用NGINX作为代理缓存服务器&#xff0c;能够极大地提高性能和效率。本文将为你分享Linux系统中基于NGINX的代理缓存配置指南&#xff0c;提供实用的解决方案&#xff0c;助你解决在…

C语言刷题训练DAY.8

1.计算单位阶跃函数 解题思路&#xff1a; 这个非常简单&#xff0c;只需要if else语句即可完成 解题代码&#xff1a; #include <stdio.h>int main() {int t 0;while(scanf("%d",&t)!EOF){if (t > 0)printf("1\n");else if (t < 0)pr…

大模型基础02:GPT家族与提示学习

大模型基础&#xff1a;GPT 家族与提示学习 从 GPT-1 到 GPT-3.5 GPT(Generative Pre-trained Transformer)是 Google 于2018年提出的一种基于 Transformer 的预训练语言模型。它标志着自然语言处理领域从 RNN 时代进入 Transformer 时代。GPT 的发展历史和技术特点如下: GP…

【校招VIP】java语言类和对象之map、set集合

考点介绍&#xff1a; map、set集合相关内容是校招面试的高频考点之一。 map和set是一种专门用来进行搜索的容器或者数据结构&#xff0c;其搜索效率与其具体的实例化子类有关系。 『java语言类和对象之map、set集合』相关题目及解析内容可点击文章末尾链接查看&#xff01; …

深入了解Maven(一)

目录 一.Maven介绍与功能 二.依赖管理 1.依赖的配置 2.依赖的传递性 3.排除依赖 4.依赖的作用范围 5.依赖的生命周期 一.Maven介绍与功能 maven是一个项目管理和构建工具&#xff0c;是基于对象模型POM实现。 Maven的作用&#xff1a; 便捷的依赖管理&#xff1a;使用…

【java安全】Log4j反序列化漏洞

文章目录 【java安全】Log4j反序列化漏洞关于Apache Log4j漏洞成因CVE-2017-5645漏洞版本复现环境漏洞复现漏洞分析 CVE-2019-17571漏洞版本漏洞复现漏洞分析 参考 【java安全】Log4j反序列化漏洞 关于Apache Log4j Log4j是Apache的开源项目&#xff0c;可以实现对System.out…