【2023】kafka原生以及配合springboot的使用(Kafka-3)

💻目录

    • 前言
  • 一、依赖
  • 二、原生使用kafka
    • 1、发送消息
      • 1.1、生产者同步发送消息
      • 1.2、生产者异步发送消息
      • 1.3、常用配置:
    • 2、接收消息
      • 2.1、关于消费者的自动提交和手动提交
      • 2.2、长轮训poll消息
      • 2.3、消费者的健康状态检查
      • 2.4、指定分区和偏移量,时间消费
      • 2.5、新消费组的消费offset规则
  • 三、Spring boot配置连接kafka
    • 1、配置yml配置文件
    • 2、配置生产者
    • 3、配置消费者

前言

本文主要是介绍通过使用原生代码方式和结合springboot分别如何更好的去使用理解kafka

如果需要看理论或者安装kafka可以看我前面两篇内容
🍅kafka使用和安装

一、依赖

主要分为springboot和原生代码的依赖,还有hutool工具包

<dependencies><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.20</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
<!--	springbootkafka--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!--	springmvc原生kafka--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
</dependencies>

二、原生使用kafka

构建项目就不做过多说了,普通maven项目就行,重点在于方法API的使用。

1、发送消息

发送消息流程

  1. 首先是先创建一个Properties对象用于传递配置参数
  2. 然后通过props.put()方法添加需要添加的配置
    • 添加连接kafka地址和设置序列化是必须的,后面的有默认的,可以根据情况设置
  3. 创建一个KafkaProducer连接客户端
  4. 创建ProducerRecord发送记录类,发送消息就是通过这个类进行发送
  5. 发送时可以选择同步或者异步进行发送

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;/*** @projectName: kafka-mode* @package: com.zheng.kafkamode.kafka* @className: MyProducer* 消息的发送者---简单发送* @version: 1.0*/
@Slf4j
public class MyProducer {private final static String TOPIC_NAME = "my-replicated-topic";public static void main(String[] args) throws ExecutionException, InterruptedException {Properties props = new Properties();
//        设置参数props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.211.55.6:9092");
//        设置序列化props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());        配置ACK
//        props.put(ProducerConfig.ACKS_CONFIG,"1");
        失败重试次数,3次
//        props.put(ProducerConfig.RETRIES_CONFIG,3);
//
        失败重试时间间隔,300毫秒后重试
//        props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG,300);//          kafka消息缓冲区大小,用来存放要发送到消息,缓冲区是32m,props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
//          本地线程,一次性从缓冲区拉取的数据大小,16kprops.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
//          如果线程拉取不到16k,间隔10ms也会将缓冲区的数据发送到kafkaprops.put(ProducerConfig.LINGER_MS_CONFIG,10);//        连接客户端KafkaProducer<String, String> producer = new KafkaProducer<>(props);//        发送的消息记录器(topic,partition(指定发到哪个),key(用于计算发到哪个partition),value)
//          默认partition数量和Broker创建的数量一致ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, 0,"my-keyValue3", "hello");//        同步send(producer,producerRecord);
//        异步asyncSend(producer,producerRecord);}/*** @param producer: 客户端对象* @return void* 同步发送* @date 2024/3/22 17:09*/private static void send(KafkaProducer<String, String> producer,ProducerRecord<String, String> producerRecord) throws InterruptedException, ExecutionException {//          等待发送成功的阻塞方法RecordMetadata metadata = producer.send(producerRecord).get();log.info("同步发送消息"+ "topic-"+metadata.topic()+"====partition:"+metadata.partition()+"=====offset:"+metadata.offset());}/*** @param producer: 客户端对象* @return void* 异步发送* @date 2024/3/22 17:09*/private static void asyncSend(KafkaProducer<String, String> producer,ProducerRecord<String, String> producerRecord) throws InterruptedException {int sum = 5;CountDownLatch countDownLatch = new CountDownLatch(sum);for (int i = 0; i < sum; i++) {ProducerRecord<String, String> producerRecord1 = new ProducerRecord<>(TOPIC_NAME, "my-keyValue"+i, "zhangsan"+i);//        异步发送消息producer.send(producerRecord1,(metadata, exception) -> {log.info("异步发送消息"+ "topic-"+metadata.topic()+"====partition:"+metadata.partition()+"=====offset:"+metadata.offset());countDownLatch.countDown();});}countDownLatch.await(5, TimeUnit.SECONDS);}
}

1.1、生产者同步发送消息

同步发送收到消息后会回复一个ack

ack有三个参数配置:(默认是1)

  • cak = 0:kafka收到消息后,不需要关心消息是否成功写入到分区中,马上就返回ack,
    • 会容易丢失消息,但效率最高
  • ack = 1 :多副本之间的leader分区已经收到消息,并把消息写入到本地log中,才会返回ack给生产者
    • 性能和安全是最均衡的
  • ack = -1/all :里面有默认配置min.insync.replicas=2(默认为1,推荐配置大于等于2),
    • min.insync.replicas:代表同步副本的个数(如果是1,则是只需要leader收到就可以)
    • 最安全但性能最差

生产者如果3秒没有收到回复(ack),则会重试,如果重试3次还没成功,则抛出异常。

  • 消息丢失概率较小

在这里插入图片描述

1.2、生产者异步发送消息

异步发送不需要等待客户端回复ack

生产者发送消息后就可以做之后的业务,不需要等待broker在收到消息后异步调用生产者提供的callback。

  • 会出现消息丢失问题

    在这里插入图片描述

1.3、常用配置:

  • 基础配置
//        设置参数props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.8.62:3392");
//        设置序列化props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
  • 发送消息配置
//        配置ACKprops.put(ProducerConfig.ACKS_CONFIG,"1");
//        失败重试次数,3次props.put(ProducerConfig.RETRIES_CONFIG,3);//        失败重试时间间隔,300毫秒后重试props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG,300);
  • 缓冲区配置

    kafka发送消息的流程是先创建一个缓冲区,把消息先发送到缓冲区,然后再有一个本地线程,来这个缓冲区拉取数据,通过本地线程把数据从缓冲区拉取发送到kafka客户端

在这里插入图片描述

  • kafka默认会创建一个消息缓冲区,用来存放要发送到消息,缓冲区是32m,
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
  • 本地线程,一次性从缓冲区拉取的数据大小,16k
props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
  • 如果线程拉取不到16k,间隔10ms也会将缓冲区的数据发送到kafka
props.put(ProducerConfig.LINGER_MS_CONFIG,10);

2、接收消息

消费消息流程

  1. 也是首先是先创建一个Properties对象用于传递配置参数
  2. 然后在props.put()方法传递参数
  3. 在创建一个连接客户端携带上Properties。
  4. 然后通过消费者客户端去poll()消息,默认可以一次可以poll到五百条消息下来,
    • Duration.ofMillis(1000):表示如果没poll到五百条,1000ms后也结束这次poll。然后处理poll下来的消息,在继续循环poll下一次
@Slf4j
public class MyConsumer {private final static String TOPIC_NAME = "my-replicated-topic";private final static String CONSUMER_GROUP_NAME = "testGroup";public static void main(String[] args) {Properties props = new Properties();
//        设置参数props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.8.62:3392");//        设置消费组名props.put(ConsumerConfig.GROUP_ID_CONFIG,"CONSUMER_GROUP_NAME");
//        设置序列化props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//          创建一个消费者的客户端KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//        消费者订阅主题列表consumer.subscribe(Arrays.asList(TOPIC_NAME));while (true){
//            poll() API 是拉取消息的长轮训ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));//            取出单个offsetfor (ConsumerRecord record : records){log.info("收到的消息:partition = {};offset = {};key = {};value = {}",record.partition(),record.offset(),record.key(),record.value());}}}
}

2.1、关于消费者的自动提交和手动提交

在这里插入图片描述

消费者无论是自动提交还是手动提交,都需要把所属的消费组+消费的某个主题+消费的某个分区及消费的偏移量,这样的信息交给集群的_consumer_offsets主题里面。

  • 自动提交

    消费者poll消息下来以后就会自动提交offset

    //        设置自动提交offset:true=自动提交、false=手动提交。props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
    //        自动提交offset的时间间隔props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
    

    注意:自动提交可能会丢失消息

  • 手动提交

    需要把自动提交的配置改为false

            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
    

    手动提交分为两种:

    • 手动同步提交

      在消费完消息后调用同步方法,会阻塞等待提交成功返回ack

      
      //            取出单个offsetfor (ConsumerRecord record : records){log.info("收到的消息:partition = {};offset = {};key = {};value = {}",record.partition(),record.offset(),record.key(),record.value());}
      //            所有的消息已消费完if (records.count()>0){ //有消息
      //                手动提交offset,当前线程会阻塞直到offset提交成功
      //                一般使用同步提交,因为提交之后页没什么业务逻辑consumer.commitSync();}
      
    • 手动异步提交

      在消息消费完后提交,不需要等到集群ack,直接执行之后的逻辑,可以设置一个回调方法,供集群调用

      
      //            取出单个offsetfor (ConsumerRecord record : records){log.info("收到的消息:partition = {};offset = {};key = {};value = {}",record.partition(),record.offset(),record.key(),record.value());}
      //            所有的消息已消费完if (records.count()>0){ //有消息//                手动异步提交,异步回调处理consumer.commitAsync((offsetAndMetadataMap,exception)->{if (exception != null){System.out.println("提交失败!失败原因:"+exception.getStackTrace());}System.out.println("提交成功!=="+offsetAndMetadataMap);});}
      

2.2、长轮训poll消息

  • 默认情况下,消费者一次性会poll500条消息。
//        一下poll消费的消息数,可以根据消费消息的快慢来决定,一次性消费多少消息props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,500);
  • 代码中设置了长轮训的时间是1000毫秒
        while (true){
//            poll() API 是拉取消息的长轮训,1000代表本次拉取1秒钟就结束
//              拉取消息到records中,ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));//            取出单个offset进行消费for (ConsumerRecord record : records){log.info("收到的消息"}}}

意味着:

  • 结束单次消费原因可能是:
    • 如果一次poll500条,就直接结束这次poll,去进入到for循环去消费这次poll到的消息。
    • 如果一次没有poll到500条,且时间在1秒内,那么长轮训继续poll;要么拉取到500条,要么达到1秒,才会结束拉取
    • 如果多次poll都没达到500条,且1秒时间到了,那么直接进入for循环。
  • 如果两次poll的时间间隔超过30s,集群会认为该消费者的消费能力过弱,该消费者被踢出消费组,触发rebalance机制,rebalance机制会造成性能开销。可以通过设置这个参数,让一次poll的消息条数少一点
//        如果两次poll的时间如果超过30s的时间间隔,kafka会认为其消费能力过弱,将其踢出消费组,将分区分配给其他消费者。props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,30*1000);

2.3、消费者的健康状态检查

消费者每隔1s向kafka集群发送心跳,集群发现如果超过10s没有续约的消费者,将被踢出消费组,触发rebalance机制,将该分区交给消费组里的其他消费者进行消费。

//        consumer给broker发送心跳的间隔时间props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,1000);
//        如果超过10s没收到消费者的心跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者。props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,10 * 1000);

2.4、指定分区和偏移量,时间消费

  • 指定分区消费

            consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
    
  • 从头消费

            consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
    
  • 指定offset

            consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));consumer.seek(new TopicPartition(TOPIC_NAME,0),10);
    
  • 指定时间节点开始消费

         List<PartitionInfo> topicPartition = consumer.partitionsFor(TOPIC_NAME);
    //          key=分区:value=偏移量HashMap<TopicPartition, Long> map = new HashMap<>();
    //        从1小时前开始消费long fetchDateTime = new Date().getTime() - 1000 * 60 * 60;for (PartitionInfo par : topicPartition){map.put(new TopicPartition(TOPIC_NAME, par.partition()),fetchDateTime);}
    //        根据时间查找指定分区的偏移量Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry:parMap.entrySet()){TopicPartition key = entry.getKey();OffsetAndTimestamp value = entry.getValue();if (key == null || value == null) continue;long offset = value.offset();System.out.println("partition-"+key.partition()+"===offset-"+offset);System.out.println();
    //            再通过指定offset消费if (value != null){consumer.assign(Arrays.asList(key));consumer.seek(key,offset);}}
    

2.5、新消费组的消费offset规则

新消费组的消费者在启动以后,默认会从连接后的分区的offset开始消费(消费新消息)。可以通过下面设置,让新消费者第一次从头开始消费,之后则会只消费新的消费者(最后消费的位置的偏移量+1)

  • Latest:默认的,消费新消息
  • earliest:第一次从头开始消费。之后开始消费新消息
//        新连接的消费组是否需要从头开始消费props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

三、Spring boot配置连接kafka

1、配置yml配置文件

和原生的区别是把客户端交给spring来管理,通过yml进行配置。配置的参数名字也基本上都是一样的,参数也是一样的。

spring:kafka:bootstrap-servers: 10.211.55.6:9092
#    生产者producer:retries: 3 #设置大于0的值,则客户端会将发送失败的记录重新发送batch-size: 16384 #一次从缓冲区拉取的大小16kbuffer-memory: 33554432 #本地缓冲区大小32macks: 1#      编解码规则(默认)key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
#     消费者consumer:group-id: default-group #消费组enable-auto-commit: false #手动提交auto-offset-reset: earliest  #新消费组从头消费#编解码规则key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 500  #一次拉取五百条listener:ack-mode: MANUAL_IMMEDIATE
#       手动调用acknowledge()后立即提交,一般使用这个
#      MANUAL_IMMEDIATE
#       当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
#      RECORD
#       当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
#      BATCH
#       当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于time时提交
#      TIME
#       当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于count时提交
#       COUNT
#       TIME |COUNT 有一个条件满足时提交
#      COUNT_TIME
#       poll()拉取一批消息,处理完业务后,手动调用acknowledge()后立即提交
#      MANUAL

2、配置生产者

简单创建一个Controller用来生产消息

生产消息主要是通过kafkaTemplate模版类,

@RestController
@RequestMapping("/kafka")
public class KafkaController {private final static String TOPIC_NAME = "my-replicated-topic";@Resourceprivate KafkaTemplate<String ,Object> kafkaTemplate;@PostMapping("/test")public String test(@RequestBody User user){JSONObject jsonObject = new JSONObject(user);ListenableFuture<SendResult<String, Object>> send = kafkaTemplate.send(TOPIC_NAME,0,"key", jsonObject.toString());return "成功!";}}

3、配置消费者

可以通过ConsumerRecord一条一条的接收处理或者通过ConsumerRecords批量接收处理,但我们还是得for一条一条的处理,所以一般选择第一种就好

通过KafkaListener注解,配置接收的topics,以及消费者组id,以及一些其他的消费者信息

如: listenGroupPro方法的使用

@Slf4j
@Component
public class MySpringBootConsumer {/*** @param record:* @param ack:* @return void* 一次性读取一条,实际上也是一次性接收500条,然后一条消息回调一次,和下面的一样* @date 2024/3/26 21:12*/@KafkaListener(topics = "my-replicated-topic",groupId = "testGroup")public void listenGroup(ConsumerRecord<String,String> record, Acknowledgment ack){log.info("testGroup收到的消息:partition = {};offset = {};key = {};value = {}",record.partition(),record.offset(),record.key(),record.value());//        手动提交ack,每处理完一条消息提交一次ack.acknowledge();}/*** @param records:* @param ack:* @return void* 一次性全部接收,指定分组和topic,和上面的区别是提交ack时机会不一样,* @date 2024/3/26 21:09*/@KafkaListener(topics = "my-replicated-topic",groupId = "testGroup1")public void listenGroupS(ConsumerRecords<String,Object> records, Acknowledgment ack){for (ConsumerRecord<String,Object> record: records){log.info("testGroup1收到的消息:partition = {};offset = {};key = {};value = {}",record.partition(),record.offset(),record.key(),record.value());}
//        手动提交ack,处理完records一批消息提交一次ack.acknowledge();}@KafkaListener(groupId = "testGroup2",topicPartitions = {@TopicPartition(topic = "my-replicated-topic2",partitions = {"0","1"}), //指定多个分区@TopicPartition(topic = "my-replicated-topic",partitions = "0",partitionOffsets = @PartitionOffset(partition = "1",initialOffset = "5")) //从5号offset开始消费},concurrency = "3") //concurrency就是同区消费组下的消费者个数,建议小于分区总数public void listenGroupPro(ConsumerRecord<String,String> record, Acknowledgment ack){log.info("testGroup收到的消息:partition = {};offset = {};key = {};value = {};topic={}",record.partition(),record.offset(),record.key(),record.value(),record.topic());//        手动提交ack,每处理完一条消息提交一次ack.acknowledge();}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/788433.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用docker-tc对host容器进行限流

docker-tc是一个github开源项目&#xff0c;项目地址是https://github.com/lukaszlach/docker-tc。 运行docker-tc docker run -d \ --name docker-tc \ --network host \ --cap-add NET_ADMIN \ --restart always \ -v /var/run/docker.sock:/var/run/docker.sock \ -v /var…

Transformer - model architecture

Transformer - model architecture flyfish Transformer总体架构可分为四个部分: 输⼊部分 输出部分 编码器部分 解码器部分 输入部分 输出部分 输⼊部分包含: 源嵌⼊层和位置编码 ⽬标嵌⼊层和位置编码 输出部分包含: 线性层 softmax处理器 左侧编码器部分和右侧解码器部…

微信小程序自定义弹窗组件

业务背景&#xff1a;弹窗有时字体较多&#xff0c;超过7个字&#xff0c;不适用wx.showToast. 组件代码 <view class"toast-box {{isShow? show:}}" animation"{{animationData}}"><view class"toast-content" ><view class&q…

Taro + vue3 小程序封装标题组件

分为没有跳转页面的title组件和 有跳转页面的title组件 我们可以把这个封装成一个组件 直接上代码 <template><div class"fixed-title-container"><div class"box"><div class"icon" v-if"isShow" click"…

【论文阅读】DETR 论文逐段精读

【论文阅读】DETR 论文逐段精读 文章目录 【论文阅读】DETR 论文逐段精读&#x1f4d6;DETR 论文精读【论文精读】&#x1f310;前言&#x1f4cb;摘要&#x1f4da;引言&#x1f9ec;相关工作&#x1f50d;方法&#x1f4a1;目标函数&#x1f4dc;模型结构⚙️代码 &#x1f4…

ubuntu-server部署hive-part4-部署hive

参照 https://blog.csdn.net/qq_41946216/article/details/134345137 操作系统版本&#xff1a;ubuntu-server-22.04.3 虚拟机&#xff1a;virtualbox7.0 部署hive 下载上传 下载地址 http://archive.apache.org/dist/hive/ apache-hive-3.1.3-bin.tar.gz 以root用户上传至…

多层PCB内部长啥样?

硬件工程师刚接触多层PCB的时候&#xff0c;很容易看晕。动辄十层八层的&#xff0c;线路像蜘蛛网一样。 画了几张多层PCB电路板内部结构图&#xff0c;用立体图形展示各种叠层结构的PCB图内部架构。 高密度互联板(HDI)的核心 在过孔 多层PCB的线路加工&#xff0c;和单层双…

Transformer - Positional Encoding 位置编码 代码实现

Transformer - Positional Encoding 位置编码 代码实现 flyfish import torch import torch.nn as nn import torch.nn.functional as F import os import mathclass PositionalEncoding(nn.Module):def __init__(self, d_model, dropout, max_len5000):super(PositionalEnco…

深度学习理论基础(六)注意力机制

目录 深度学习中的注意力机制&#xff08;Attention Mechanism&#xff09;是一种模仿人类视觉和认知系统的方法&#xff0c;它允许神经网络在处理输入数据时集中注意力于相关的部分。通过引入注意力机制&#xff0c;神经网络能够自动地学习并选择性地关注输入中的重要信息&…

http: server gave HTTP response to HTTPS client 分析一下这个问题如何解决中文告诉我详细的解决方案

这个错误信息表明 Docker 客户端在尝试通过 HTTPS 协议连接到 Docker 仓库时&#xff0c;但是服务器却返回了一个 HTTP 响应。这通常意味着 Docker 仓库没有正确配置为使用 HTTPS&#xff0c;或者客户端没有正确配置以信任仓库的 SSL 证书。以下是几种可能的解决方案&#xff1…

半导体制程离子注入注入的是哪些离子

离子注入是一种低温过程 通过该过程将一种元素的离子加速进入固体靶材&#xff0c;从而改变靶材的物理、化学或电学性质。离子注入用于半导体器件制造和金属精加工以及材料科学研究。如果离子停止并保留在目标中&#xff0c;则它们可以改变目标的元素成分&#xff08;如果离子…

6 个典型的Java 设计模式应用场景题

单例模式(Singleton) 场景: 在一个Web服务中,数据库连接池应当在整个应用生命周期中只创建一次,以减少资源消耗和提升性能。使用单例模式确保数据库连接池的唯一实例。 代码实现: import java.sql.Connection; import java.sql.SQLException;public class DatabaseConne…

上位机图像处理和嵌入式模块部署(qmacviusal边缘宽度测量)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 前面有一篇文章&#xff0c;我们了解了测量标定是怎么做的。即&#xff0c;我们需要提前知道测量的方向&#xff0c;灰度的方向&#xff0c;实际的…

“省钱有道”的太平鸟,如何真正“高飞”?

衣食住行产业中&#xff0c;服装品类消费弹性较大、可选属性较强&#xff0c;其发展可以显著反映当前的经济温度。 根据国家统计局数据&#xff0c;2023年1-12月&#xff0c;我国限额以上单位服装类商品零售额累计10352.9亿元&#xff0c;同比增长15.4%&#xff0c;增速比2022…

Python框架下的qt设计之JSON格式化转换小程序

JSON转换小程序 代码展示&#xff1a; 主程序代码&#xff1a; from PyQt6.QtWidgets import (QApplication, QDialog, QMessageBox )import sys import jsonclass MyJsonFormatter(jsonui.Ui_jsonFormatter,QDialog): # jsonui是我qt界面py文件名def __init__(self):super()…

【HTML】注册页面制作 案例二

&#xff08;大家好&#xff0c;今天我们将通过案例实战对之前学习过的HTML标签知识进行复习巩固&#xff0c;大家和我一起来吧&#xff0c;加油&#xff01;&#x1f495;&#xff09; 案例复习 通过综合案例&#xff0c;主要复习&#xff1a; 表格标签&#xff0c;可以让内容…

【Go】十七、进程、线程、协程

文章目录 1、进程、线程2、协程3、主死从随4、启动多个协程5、使用WaitGroup控制协程退出6、多协程操作同一个数据7、互斥锁8、读写锁9、deferrecover优化多协程 1、进程、线程 进程作为资源分配的单位&#xff0c;在内存中会为每个进程分配不同的内存区域 一个进程下面有多个…

集合的学习

为什么要有集合&#xff1a;集合会自动扩容 集合不能存基本数据类型&#xff08;基本数据类型是存放真实的值&#xff0c;而引用数据类型是存放一个地址&#xff0c;这个地址存放在栈区&#xff0c;地址所指向的内容存放在堆区&#xff09; 数组和集合的对比&#xff1a; 集…

Flutter 开发学习笔记(3):第三方UI库的引入

文章目录 前言初始化程序Icon导入如何导入 Toast消息提示框引入简单封装简单使用 Charts图表导入新建pages文件夹存放page简单代码实现效果 总结 前言 Flutter已经发布了有10年了&#xff0c;生态也算比较完善了。用于安卓程序开发应该是非常的方便。我们这里就接入一些简单的…

golang语言系列:Web框架+路由 之 Gin

云原生学习路线导航页&#xff08;持续更新中&#xff09; 本文是golang语言学习系列&#xff0c;本篇对Gin框架的基本使用方法进行学习 1.Gin框架是什么 Gin 是一个 Go (Golang) 编写的轻量级 http web 框架&#xff0c;运行速度非常快&#xff0c;如果你是性能和高效的追求者…