【Kafka】开发实战和Springboot集成kafka

目录

  • 消息的发送与接收
    • 生产者
    • 消费者
  • SpringBoot 集成kafka
  • 服务端参数配置

消息的发送与接收

生产者

在这里插入图片描述

生产者主要的对象有: KafkaProducer , ProducerRecord 。

其中 KafkaProducer 是用于发送消息的类, ProducerRecord 类用于封装Kafka的消息。

KafkaProducer 的创建需要指定的参数和含义:

  1. bootstrap.servers:配置生产者如何与broker建立连接。该参数设置的是初始化参数。如果生产者需要连接的是Kafka集群,则这里配置集群中几个部分broker的地址,而不是全部,当生产者连接上此处指定的broker之后,在通过该连接发现集群中的其他节点。
  2. key.serializer:要发送信息的key数据的序列化类。设置的时候可以写类名,也可以使用该类的Class对象。
  3. value.serializer:要发送消息的value数据的序列化类。设置的时候可以写类名,也可以使用该类的Class对象。
  4. acks:默认值:all
    • acks=0:生产者不等待broker对消息的确认,只要将消息放到缓冲区,就认为消息已经发送完成。该情形不能保证broker是否真的收到了消息,retries配置也不会生效。发送的消息的返回的消息偏移量永远是-1。
    • acks=1:表示消息只需要写到主分区即可,然后就响应客户端,而不等待副本分区的确认。在该情形下,如果主分区收到消息确认之后就宕机了,而副本分区还没来得及同步该消息,则该消息丢失。
    • acks=all:leader分区会等待所有的ISR副本分区确认记录。该处理保证了只要有一个ISR副本分区存活,消息就不会丢失。这是Kafka最强的可靠性保证,等效于 acks=-1。
  5. retries:retries重试次数。当消息发送出现错误的时候,系统会重发消息。跟客户端收到错误时重发一样。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1,否则在重试此失败消息的时候,其他的消息可能发送成功了。

其他参数可以从 org.apache.kafka.clients.producer.ProducerConfig 中找到。后面的内容会介绍到。

消费者生产消息后,需要broker端的确认,可以同步确认,也可以异步确认。同步确认效率低,异步确认效率高,但是需要设置回调对象。

示例如下:

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;public class MyProducer1 {public static void main(String[] args) throws ExecutionException, InterruptedException {Map<String, Object> configs = new HashMap<>();// 指定初始连接用到的broker地址configs.put("bootstrap.servers", "192.168.100.101:9092");// 指定key的序列化类configs.put("key.serializer", IntegerSerializer.class);// 指定value的序列化类configs.put("value.serializer", StringSerializer.class);//        configs.put("acks", "all");
//        configs.put("reties", "3");KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);// 用于设置用户自定义的消息头字段List<Header> headers = new ArrayList<>();headers.add(new RecordHeader("biz.name", "producer.demo".getBytes()));ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic_1",  // topic0,  // 分区0,  // key"hello lagou 0", // valueheaders  // headers);// 消息的同步确认final Future<RecordMetadata> future = producer.send(record);final RecordMetadata metadata = future.get();System.out.println("消息的主题:" + metadata.topic());System.out.println("消息的分区号:" + metadata.partition());System.out.println("消息的偏移量:" + metadata.offset());// 关闭生产者producer.close();}
}

如果需要异步发送,如下:

package com.lagou.kafka.demo.producer;import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;public class MyProducer1 {public static void main(String[] args) throws ExecutionException, InterruptedException {Map<String, Object> configs = new HashMap<>();// 指定初始连接用到的broker地址configs.put("bootstrap.servers", "192.168.100.101:9092");// 指定key的序列化类configs.put("key.serializer", IntegerSerializer.class);// 指定value的序列化类configs.put("value.serializer", StringSerializer.class);//        configs.put("acks", "all");
//        configs.put("reties", "3");KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);// 用于设置用户自定义的消息头字段List<Header> headers = new ArrayList<>();headers.add(new RecordHeader("biz.name", "producer.demo".getBytes()));ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic_1",  // topic0,  // 分区0,  // key"hello lagou 0", // valueheaders  // headers);// 消息的异步确认producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("消息的主题:" + metadata.topic());System.out.println("消息的分区号:" + metadata.partition());System.out.println("消息的偏移量:" + metadata.offset());} else {System.out.println("异常消息:" + exception.getMessage());}}});// 关闭生产者producer.close();}
}

消费者

kafka不支持消息的推送(当然可以自己已实现),采用的消息的拉取(poll方法)。

消费者主要的对象是kafkaConsumer,用于消费消息的类。

其主要参数:

  1. bootstrap.servers:与kafka建立初始连接的broker地址列表
  2. key.deserializer:key的反序列化器
  3. value.deserializer:value的反序列化器
  4. group.id:指定消费者组id,用于标识该消费者属于哪个消费者组
  5. auto.offset.reset:当kafka中没有初始化偏移量或当前偏移量在服务器中不存在(如数据被删除了),处理办法
    • earliest:自动重置偏移量到最早的偏移量
    • latest:自动重置偏移量到最新的偏移量
    • none:如果消费者组原来的偏移量(previous)不存在,向消费者抛出异常
    • anything:向消费者抛异常

ConsumerConfig类中包含了所有的可以给kafkaConsumer的参数。

示例:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;public class MyConsumer1 {public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();// node1对应于192.168.100.101,windows的hosts文件中手动配置域名解析configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");// 使用常量代替手写的字符串,配置key的反序列化器configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);// 配置value的反序列化器configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 配置消费组IDconfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_demo1");// 如果找不到当前消费者的有效偏移量,则自动重置到最开始// latest表示直接重置到消息偏移量的最后一个configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs);// 先订阅,再消费consumer.subscribe(Arrays.asList("topic_1"));// 如果主题中没有可以消费的消息,则该方法可以放到while循环中,每过3秒重新拉取一次// 如果还没有拉取到,过3秒再次拉取,防止while循环太密集的poll调用。// 批量从主题的分区拉取消息final ConsumerRecords<Integer, String> consumerRecords = consumer.poll(3_000);// 遍历本次从主题的分区拉取的批量消息consumerRecords.forEach(new Consumer<ConsumerRecord<Integer, String>>() {@Overridepublic void accept(ConsumerRecord<Integer, String> record) {System.out.println(record.topic() + "\t"+ record.partition() + "\t"+ record.offset() + "\t"+ record.key() + "\t"+ record.value());}});consumer.close();}
}

SpringBoot 集成kafka

这里把生产者和消费者放在一个项目中,实际可能是在两个里的。

1、引入依赖

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId>
</dependency>

2、 配置

spring.application.name=springboot-kafka-02 
server.port=8080 # 用于建立初始连接的broker地址 
spring.kafka.bootstrap-servers=node1:9092 
# producer用到的key和value的序列化类 
spring.kafka.producer.key- serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value- serializer=org.apache.kafka.common.serialization.StringSerializer 
# 默认的批处理记录数 
spring.kafka.producer.batch-size=16384 
# 32MB的总发送缓存 
spring.kafka.producer.buffer-memory=33554432 
# consumer用到的key和value的反序列化类 
spring.kafka.consumer.key- deserializer=org.apache.kafka.common.serialization.IntegerDeserializer 
spring.kafka.consumer.value- deserializer=org.apache.kafka.common.serialization.StringDeserializer
# consumer的消费组id 
spring.kafka.consumer.group-id=spring-kafka-02-consumer 
# 是否自动提交消费者偏移量
spring.kafka.consumer.enable-auto-commit=true
# 每隔100ms向broker提交一次偏移量 
spring.kafka.consumer.auto-commit-interval=100 
# 如果该消费者的偏移量不存在,则自动设置为最早的偏移量 
spring.kafka.consumer.auto-offset-reset=earliest

3、启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Demo02SpringbootKafkaApplication {public static void main(String[] args) {SpringApplication.run(Demo02SpringbootKafkaApplication.class, args);}}

4、生产者

这里我们就写在Controller里就好,如下:

import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.ExecutionException;@RestController
public class KafkaSyncProducerController {@Autowiredprivate KafkaTemplate<Integer, String> template;@RequestMapping("send/sync/{message}")public String send(@PathVariable String message) {final ListenableFuture<SendResult<Integer, String>> future = template.send("topic-spring-01", 0, 0, message);// 同步发送消息try {final SendResult<Integer, String> sendResult = future.get();final RecordMetadata metadata = sendResult.getRecordMetadata();System.out.println(metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}return "success";}}

上面是同步发送消息,如果异步发送消息,可改为如下:

import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaAsyncProducerController {@Autowiredprivate KafkaTemplate<Integer, String> template;@RequestMapping("send/async/{message}")public String send(@PathVariable String message) {final ListenableFuture<SendResult<Integer, String>> future = this.template.send("topic-spring-01", 0, 1, message);// 设置回调函数,异步等待broker端的返回结果future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {@Overridepublic void onFailure(Throwable throwable) {System.out.println("发送消息失败:" + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<Integer, String> result) {final RecordMetadata metadata = result.getRecordMetadata();System.out.println("发送消息成功:" + metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());}});return "success";}}

5、消费者

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class MyConsumer {@KafkaListener(topics = "topic-spring-01")public void onMessage(ConsumerRecord<Integer, String> record) {System.out.println("消费者收到的消息:"+ record.topic() + "\t"+ record.partition() + "\t"+ record.offset() + "\t"+ record.key() + "\t"+ record.value());}}

6、kafka配置类

上面当我们启动生产者和消费者时,kafka会自动为我们创建好topic和分区等。那是因为kafka的KafkaAutoConfigration里有个KafkaAdmin,他负责自动检测需要创建的topic和分区等。如果我们想自己创建,或者自定义KafkaTemplate(一般不会这么做),可以使用配置类,如下:

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaConfig {@Beanpublic NewTopic topic1() {return new NewTopic("nptc-01", 3, (short) 1);}@Beanpublic NewTopic topic2() {return new NewTopic("nptc-02", 5, (short) 1);}@Beanpublic KafkaAdmin kafkaAdmin() {Map<String, Object> configs = new HashMap<>();configs.put("bootstrap.servers", "node1:9092");KafkaAdmin admin = new KafkaAdmin(configs);return admin;}@Bean@Autowiredpublic KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {// 覆盖ProducerFactory原有设置Map<String, Object> configsOverride = new HashMap<>();configsOverride.put(ProducerConfig.BATCH_SIZE_CONFIG, 200);KafkaTemplate<Integer, String> template = new KafkaTemplate<Integer, String>(producerFactory, configsOverride);return template;}}

服务端参数配置

$KAFKA_HOME/config/server.properties文件中的一些配置。

1、zookeeper.connect

该参数用于配置Kafka要连接的Zookeeper/集群的地址。

它的值是一个字符串,使用逗号分隔Zookeeper的多个地址。Zookeeper的单个地址是 host:port形式的,可以在最后添加Kafka在Zookeeper中的根节点路径。

如:

zookeeper.connect=node2:2181,node3:2181,node4:2181/myKafka 1

2、listeners

用于指定当前Broker向外发布服务的地址和端口。

配置项为

listeners=PLAINTEXT://:9092

如下:

在这里插入图片描述

PLAINTEXT是一种协议名称;上面ip地址没写,可以配置成listeners=PLAINTEXT://0.0.0.0:9092,则只有本机可以访问。也可以是其他配置。

可以配置多个,逗号分割。但是多个listener的协议名称不能相同,且端口号不能相同。如果想用一个协议,则需要在listener.security.protocol.map维护听器名称和协议的map。

可以与 advertised.listeners 配合,用于做内外网隔离,比如创建topic和分区的等管理方面的使用一个地址,发送和消费消息则使用另一个地址,即管理和使用分开。

内外网隔离配置:

  1. listener.security.protocol.map

监听器名称和安全协议的映射配置。比如,可以将内外网隔离,即使它们都使用SSL。

listener.security.protocol.map=INTERNAL:SSL,EXTERNAL:SSL

冒号前面代表监听器名称,后面代表真正的协议。每个监听器的名称只能在map中出现一次。

  1. listeners

用于配置broker监听的URI以及监听器名称列表,使用逗号隔开多个URI及监听器名称。如果监听器名称代表的不是安全协议,必须配置listener.security.protocol.map。每个监听器必须使用不同的网络端口。

  1. advertised.listeners

需要将该地址发布到zookeeper供客户端使用。

可以在zookeeper的 get /myKafka/brokers/ids/<broker.id> 中找到。

在IaaS环境,该条目的网络接口得与broker绑定的网络接口不同。

如果不设置此条目,就使用listeners的配置。跟listeners不同,该条目不能使用0.0.0.0网络端口。

advertised.listeners的地址必须是listeners中配置的或配置的一部分。

  1. inter.broker.listener.name

用于配置broker之间通信使用的监听器名称,该名称必须在advertised.listeners列表中。

inter.broker.listener.name=EXTERNAL

典型配置如下:

在这里插入图片描述

3、 broker.id

该属性用于唯一标记一个Kafka的Broker,它的值是一个任意integer值。当Kafka以分布式集群运行的时候,尤为重要。

最好该值跟该Broker所在的物理主机有关的,如主机名为 host1.lagou.com ,则 broker.id=1 ;如果主机名为 192.168.100.101 ,则 broker.id=101 等等。

4、 log.dir

通过该属性的值,指定Kafka在磁盘上保存消息的日志片段的目录。它是一组用逗号分隔的本地文件系统路径。

如果指定了多个路径,那么broker 会根据“最少使用”原则,把同一个分区的日志片段保存到同一个路径下。

broker 会往拥有最少数目分区的路径新增分区,而不是往拥有最小磁盘空间的路径新增分区。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/641106.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

仅使用K-M法+Cox比例风险模型就能发二区文章 | SEER公共数据库周报(1.17)

欢迎各位参加本周中山大学著名卫生统计学家方积乾教授公益直播讲座&#xff01; 就在本周三晚&#xff0c;主题为“真实世界研究与RCT研究”&#xff0c;欢迎各位预约参加&#xff01; SEER&#xff08;The Surveillance, Epidemiology, and End Results&#xff09;数据库是由…

回溯算法篇-01:全排列

力扣46&#xff1a;全排列 题目分析 这道题属于上一篇——“回溯算法解题框架与思路”中的 “元素不重复不可复用” 那一类中的 排列类问题。 我们来回顾一下当时是怎么说的&#xff1a; 排列和组合的区别在于&#xff0c;排列对“顺序”有要求。比如 [1,2] 和 [2,1] 是两个不…

柔性数组和C语言内存划分

柔性数组和C语言内存划分 1. 柔性数组1.1 柔性数组的特点&#xff1a;1.2 柔性数组的使用1.3 柔性数组的优势 2. 总结C/C中程序内存区域划分 1. 柔性数组 也许你从来没有听说过柔性数组&#xff08;flexible array)这个概念&#xff0c;但是它确实是存在的。 C99 中&#xff…

力扣740. 删除并获得点数

动态规划 思路&#xff1a; 选择元素 x&#xff0c;获得其点数&#xff0c;删除 x 1 和 x - 1&#xff0c;则其他的 x 的点数也会被获得&#xff1b;可以将数组转换成一个有序 map&#xff0c;key 为 x&#xff0c; value 为对应所有 x 的和&#xff1b;则问题转换成了不能同…

Postman基本使用、测试环境(Environment)配置

文章目录 准备测试项目DemoController测试代码Interceptor模拟拦截配置 Postman模块简单介绍Postman通用环境配置新建环境(Environment)配置环境(Environment)设置域名变量引用域名变量查看请求结果打印 Postman脚本设置变量登录成功后设置全局Auth-Token脚本编写脚本查看conso…

即插即用篇 | UniRepLKNet:用于音频、视频、点云、时间序列和图像识别的通用感知大卷积神经网络 | DRepConv

大卷积神经网络(ConvNets)近来受到了广泛研究关注,但存在两个未解决且需要进一步研究的关键问题。1)现有大卷积神经网络的架构主要遵循传统ConvNets或变压器的设计原则,而针对大卷积神经网络的架构设计仍未得到解决。2)随着变压器在多个领域的主导地位,有待研究ConvNets…

软件设计师——项目管理(一)

&#x1f4d1;前言 本文主要是【项目管理】——软件设计师——项目管理的文章&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️ &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是听风与他&#x1f947; ☁️博客首页&#xff1a;CSDN主页听风与他 &#x1f304…

Databend 开源周报第 129 期

Databend 是一款现代云数仓。专为弹性和高效设计&#xff0c;为您的大规模分析需求保驾护航。自由且开源。即刻体验云服务&#xff1a;https://app.databend.cn 。 Whats On In Databend 探索 Databend 本周新进展&#xff0c;遇到更贴近你心意的 Databend 。 支持标准流 标…

如何在 Ubuntu 22.04 上安装 Apache Web 服务器

前些天发现了一个人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;最重要的屌图甚多&#xff0c;忍不住分享一下给大家。点击跳转到网站。 如何在 Ubuntu 22.04 上安装 Apache Web 服务器 介绍 Apache HTTP 服务器是世界上使用最广泛的 Web 服务器。它…

模拟器单窗口ip有问题?试试关闭IPV6来解决

目前应该不止雷电9有这个问题了&#xff0c;最早是看到无忧群里在说有这个问题&#xff0c;后面发现很多其他的ip软件也有同样的问题&#xff0c;很多人都遇到&#xff0c;所以做个图文教程在这里&#xff0c;没出问题的也可以设置一下&#xff0c;目前ipv6也还没普及&#xff…

x-cmd pkg | hurl - HTTP 请求处理工具

目录 简介首次用户功能特点竞品和相关作品进一步探索 简介 Hurl 是 HTTP 请求处理工具&#xff0c;支持使用简单的纯文本格式定义的 HTTP 请求。它的用途非常广泛&#xff0c;既可以用于获取数据&#xff0c;也可以用于测试HTTP会话。 它可以链式处理请求&#xff0c;捕获数值…

ORA-01033: ORACLE initialization or shutdown in progress---惜分飞

客户反馈数据库使用plsql dev登录报ORA-01033: ORACLE initialization or shutdown in progress的错误 出现该错误一般是由于数据库没有正常open成功,查看oracle 告警日志发现 Mon Jan 22 16:55:50 2024 Database mounted in Exclusive Mode Lost write protection disabled …

Unity SRP 管线【第五讲:URP烘培光照】

本节&#xff0c;我们将跟随数据流向讲解UEP管线中的烘培光照。 文章目录 一、URP烘培光照1. 搭建场景2. 烘培光照参数设置MixedLight光照设置&#xff1a;直观感受 Lightmapping Settings参数设置&#xff1a; 3. 我们如何记录次表面光源颜色首先我们提取出相关URP代码&#…

企业数字档案馆的构成要素

企业数字档案馆的构成要素包括以下几个方面&#xff1a; 1. 系统平台&#xff1a;企业数字档案馆需要有一个稳定的系统平台&#xff0c;用于存储、管理和检索档案信息。这个平台可以是基于云计算、数据库或其他技术的&#xff0c;能够支持大容量的数据存储和快速的检索功能。 2…

设计模式二(工厂模式)

本质&#xff1a;实例化对象不用new&#xff0c;用工厂代替&#xff0c;实现了创建者和调用者分离 满足&#xff1a; 开闭原则&#xff1a;对拓展开放&#xff0c;对修改关闭 依赖倒置原则&#xff1a;要针对接口编程 迪米特原则&#xff1a;最少了解原则&#xff0c;只与自己直…

Unity—配置lua环境变量+VSCode 搭建 Lua 开发环境

每日一句&#xff1a;保持须臾的浪漫&#xff0c;理想的喧嚣&#xff0c;平等的热情 Windows 11下配置lua环境变量 一、lua-5.4.4版本安装到本地电脑 链接&#xff1a;https://pan.baidu.com/s/14pAlOjhzz2_jmvpRZf9u6Q?pwdhd4s 提取码&#xff1a;hd4s 二、高级系统设置 此电…

P9232 [蓝桥杯 2023 省 A] 更小的数

[蓝桥杯 2023 省 A] 更小的数 终于本弱一次通关了一道研究生组别的题了[普及/提高−] 一道较为简单的双指针题,但一定有更好的解法. 题目描述 小蓝有一个长度均为 n n n 且仅由数字字符 0 ∼ 9 0 \sim 9 0∼9 组成的字符串&#xff0c;下标从 0 0 0 到 n − 1 n-1 n−1&a…

C++ //练习 2.35 判断下列定义推断出的类型是什么,然后编写程序进行验证。

C Primer&#xff08;第5版&#xff09; 练习 2.35 练习 2.35 判断下列定义推断出的类型是什么&#xff0c;然后编写程序进行验证。 const int i 42; auto j i; const auto &k i; auto *p &i; const auto j2 i, &k2 i;环境&#xff1a;Linux Ubuntu&#x…

HarmonyOS鸿蒙学习基础篇 - 运行第一个程序 Hello World

下载与安装DevEco Studio 古话说得好&#xff0c;“磨刀不误砍柴工”&#xff0c;对于HarmonyOS应用开发&#xff0c;我们首先得确保工具齐全。这就好比要进行HarmonyOS应用开发&#xff0c;我们需要确保已经安装了DevEco Studio&#xff0c;这是HarmonyOS的一站式集成开发环境…

企业网架构

企业网架构 局域网通信不同网段 局域网通信 MAC地址&#xff1a;硬件地址&#xff0c;固定在网卡上的地址(唯一标识一个网卡)&#xff0c;确定网络设备位置的,数据链路层。一个设备可以有多个网卡&#xff0c;每一个网卡都需要一个唯一MAC。ARP协议&#xff1a;通过目的IP&…