kafka(五)spring-kafka(2)详解与demo

一、简单的收发消息demo

父工程pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>kafka-demo</artifactId><version>1.0-SNAPSHOT</version><packaging>pom</packaging><modules><module>producer</module><module>consumer-1</module><module>consumer-2</module></modules><!-- springBoot --><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.4.RELEASE</version></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--kafka--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
<!--            <version>3.0.0</version>--></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.78</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency></dependencies></project>
1、生产者

1.1、配置文件
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializeruser.topic = userTest
school.topic = schoolTest
1.2、dto
package com.example.dto;import lombok.Builder;
import lombok.Data;@Data
@Builder
public class SchoolDTO {private String schoolId;private String schoolName;
}
package com.example.dto;import lombok.Builder;
import lombok.Data;@Data
@Builder
public class UserDTO {private String userId;private String userName;private Integer age;
}
 1.3、service
package com.example.service.impl;import com.alibaba.fastjson.JSON;
import com.example.dto.SchoolDTO;
import com.example.dto.UserDTO;
import com.example.service.SchoolService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service("schoolService")
@Slf4j
public class SchoolServiceImpl implements SchoolService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Value("${school.topic}")private String schoolTopic;@Overridepublic void sendSchoolMsg(SchoolDTO schoolDTO) {String msg = JSON.toJSONString(schoolDTO);ProducerRecord producerRecord = new ProducerRecord(schoolTopic,msg);kafkaTemplate.send(producerRecord);log.info("school消息发送成功");}
}
package com.example.service.impl;import com.alibaba.fastjson.JSON;
import com.example.dto.UserDTO;
import com.example.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service("userService")
@Slf4j
public class UserServiceImpl implements UserService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Value("${user.topic}")private String userTopic;@Overridepublic void sendUserMsg(UserDTO userDTO) {String msg = JSON.toJSONString(userDTO);ProducerRecord producerRecord = new ProducerRecord(userTopic,msg);kafkaTemplate.send(producerRecord);log.info("user消息发送成功");}
}
 1.4、启动类
package com.example;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class ProducerApplication {public static void main(String[] args) {SpringApplication.run(ProducerApplication.class, args);}
}
2、消费者

2.1、配置文件
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit = true user.topic = userTest
user.group.id = user-group-1school.topic = schoolTest
school.group.id = school-group-1server.port = 2222
2.2、监听
package com.example.listen;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Optional;
import java.util.Properties;@Component
@Slf4j
public class SchoolConsumer {@KafkaListener(topics = "${school.topic}", groupId = "${school.group.id}")public void consumer(ConsumerRecord<?, ?> record) {try {Object message = record.value();if (message != null) {String msg = String.valueOf(message);log.info("接收到:msg={},topic:{},partition={},offset={}",msg,record.topic(),record.partition(),record.offset());}} catch (Exception e) {log.error("topic:{},is consumed error:{}", record.topic(), e.getMessage());} finally {//ack.acknowledge();}}
}
package com.example.listen;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class UserConsumer {@KafkaListener(topics = "${user.topic}", groupId = "${user.group.id}")public void consumer(ConsumerRecord<?, ?> record) {try {Object message = record.value();if (message != null) {String msg = String.valueOf(message);log.info("接收到:msg={},topic:{},partition={},offset={}",msg,record.topic(),record.partition(),record.offset());}} catch (Exception e) {log.error("topic:{},is consumed error:{}", record.topic(), e.getMessage());} finally {//ack.acknowledge();}}
}

不指定group.id会报错,这也验证了kafka consumer必须要有group id。如写:

@KafkaListener(topics = "${user.topic}")
public void consumer(ConsumerRecord<?, ?> record) 启动报错:

Caused by: java.lang.IllegalStateException: No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.
    at org.springframework.util.Assert.state(Assert.java:73) ~[spring-core-5.1.6.RELEASE.jar:5.1.6.RELEASE]
 

2.3、启动类 
package com.example;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;@SpringBootApplication
//@EnableKafka
public class Consumer1Application {public static void main(String[] args) {SpringApplication.run(Consumer1Application.class, args);}
}
3、测试

启动消费者:

生产者这里通过单元测试来发送消息:

package com.demo.kafka;import com.example.ProducerApplication;
import com.example.dto.SchoolDTO;
import com.example.dto.UserDTO;
import com.example.service.SchoolService;
import com.example.service.UserService;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest(classes = {ProducerApplication.class}, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@RunWith(SpringRunner.class)
public class Test {@Autowiredprivate SchoolService schoolService;@Autowiredprivate UserService userService;@org.junit.Testpublic void sendUserMsg(){UserDTO userDTO = UserDTO.builder().userId("id-1").age(18).userName("zs").build();userService.sendUserMsg(userDTO);}@org.junit.Testpublic void sendSchoolMsg(){SchoolDTO schoolDTO = SchoolDTO.builder().schoolId("schoolId-1").schoolName("mid school").build();schoolService.sendSchoolMsg(schoolDTO);}}

运行单测,观察消费者输出:

修改参数再次运行,观察到消费者都可以正常监听: 

2024-06-22 17:09:06.383  INFO 76104 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 2222 (http) with context path ''
2024-06-22 17:09:06.390  INFO 76104 --- [ntainer#1-0-C-1] org.apache.kafka.clients.Metadata        : Cluster ID: ZdpIAHTjS9GhJlvPP8n0Rw
2024-06-22 17:09:06.392  INFO 76104 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=user-group-1] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-06-22 17:09:06.393  INFO 76104 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-4, groupId=user-group-1] Revoking previously assigned partitions []
2024-06-22 17:09:06.394  INFO 76104 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: []
2024-06-22 17:09:06.394  INFO 76104 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=user-group-1] (Re-)joining group
2024-06-22 17:09:06.401  INFO 76104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=school-group-1] Successfully joined group with generation 11
2024-06-22 17:09:06.403  INFO 76104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=school-group-1] Setting newly assigned partitions [schoolTest-0]
2024-06-22 17:09:06.403  INFO 76104 --- [           main] com.example.Consumer1Application         : Started Consumer1Application in 8.606 seconds (JVM running for 9.621)
2024-06-22 17:09:06.413  INFO 76104 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [schoolTest-0]
2024-06-22 17:09:06.491  INFO 76104 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=user-group-1] Successfully joined group with generation 3
2024-06-22 17:09:06.493  INFO 76104 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-4, groupId=user-group-1] Setting newly assigned partitions [userTest-0]
2024-06-22 17:09:06.611  INFO 76104 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [userTest-0]
2024-06-22 17:16:29.775  INFO 76104 --- [ntainer#1-0-C-1] com.example.listen.UserConsumer          : 接收到:msg={"age":18,"userId":"id-1","userName":"zs"},topic:userTest,partition=0,offset=4
2024-06-22 17:16:48.157  INFO 76104 --- [ntainer#0-0-C-1] com.example.listen.SchoolConsumer        : 接收到:msg={"schoolId":"schoolId-1","schoolName":"mid school"},topic:schoolTest,partition=0,offset=1
2024-06-22 17:17:39.458  INFO 76104 --- [ntainer#1-0-C-1] com.example.listen.UserConsumer          : 接收到:msg={"age":20,"userId":"id-2","userName":"ls"},topic:userTest,partition=0,offset=5
2024-06-22 17:17:59.474  INFO 76104 --- [ntainer#0-0-C-1] com.example.listen.SchoolConsumer        : 接收到:msg={"schoolId":"schoolId-2","schoolName":"primary school"},topic:schoolTest,partition=0,offset=2
4、多个消费者
4.1、同一个groupId

将consumer-1的代码copy到consumer-2,注意端口号修改成不一样的3333,并启动,

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit = true user.topic = userTest
user.group.id = user-group-1school.topic = schoolTest
school.group.id = school-group-1server.port = 3333
2024-06-22 17:23:59.524  INFO 78096 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 3333 (http) with context path ''
2024-06-22 17:23:59.531  INFO 78096 --- [           main] example.Consumer2Application             : Started Consumer2Application in 7.534 seconds (JVM running for 8.506)
2024-06-22 17:24:00.021  INFO 78096 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=school-group-1] Successfully joined group with generation 12
2024-06-22 17:24:00.024  INFO 78096 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=school-group-1] Setting newly assigned partitions []
2024-06-22 17:24:00.025  INFO 78096 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: []
2024-06-22 17:24:00.028  INFO 78096 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=user-group-1] Successfully joined group with generation 4
2024-06-22 17:24:00.028  INFO 78096 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-4, groupId=user-group-1] Setting newly assigned partitions []
2024-06-22 17:24:00.029  INFO 78096 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: []

再执行次生产者的Test,观察两个消费者:

 可以看到consumer-1接收到了,而consumer-2没有接收到。

再次执行,结果相同。school也是同样的结果。

验证了:同一个topic下的某个分区只能被消费者组中的一个消费者消费。

4.2、不同group

现修改cosumer-2中groupId并重启

user.group.id = user-group-2
school.group.id = school-group-2

启动后自动接收了之前发送的所有消息(因为这是一个新的消费者组):

再次发送新的消息:

可以看到consumer-1和2同时都接收到了:

 

验证了:同一个topic可以被不同的消费者组消费。

二、生产者分区partition

先观察上面步骤产生的数据文件:

上面只有一个patition, 所有两个topic各自只有一个数据目录。现将userTest这个topic分成多个partition,结合四种分区策略看下:

1、指定分区
2、轮询
3、key哈希分区策略
4、自定义分区策略(即自定义Partitioner)

三、消费者分配策略

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/32306.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

谷歌手机刷机教学

注意&#xff1a;手机已经解开了oem锁和bl 1、adb基础命令 连接设备adb devices&#xff1a;列出当前连接的所有设备。 adb connect <设备IP>&#xff1a;通过IP地址连接设备&#xff08;用于无线连接&#xff09;。 设备信息adb shell getprop&#xff1a;获取设备的所…

Docker部署MySQL8.3.0(保姆级图文教程)

系列文章目录 Docker部署Nginx1.21.5&#xff08;保姆级图文教程&#xff09; Docker部署MySQL8.3.0&#xff08;保姆级图文教程&#xff09; 文章目录 一、环境二、拉取镜像2.1 查找 Docker Hub 上的 MySQL 镜像2.2 拉取MySQL镜像2.3 查看MySQL镜像 三、在宿主机创建目录3.1 创…

无痛接入图像生成风格迁移能力:GAN生成对抗网络

AI应用开发相关目录 本专栏包括AI应用开发相关内容分享&#xff0c;包括不限于AI算法部署实施细节、AI应用后端分析服务相关概念及开发技巧、AI应用后端应用服务相关概念及开发技巧、AI应用前端实现路径及开发技巧 适用于具备一定算法及Python使用基础的人群 AI应用开发流程概…

【一】【算法】经典树状数组和并查集,详细解析,初步认识,【模板】树状数组 1,树状数组并查集

【模板】树状数组 1 题目描述 如题&#xff0c;已知一个数列&#xff0c;你需要进行下面两种操作&#xff1a; 将某一个数加上 x x x 求出某区间每一个数的和 输入格式 第一行包含两个正整数 n , m n,m n,m&#xff0c;分别表示该数列数字的个数和操作的总个数。 第二…

基于SSM的在线预约导游系统

文章目录 项目介绍主要功能截图&#xff1a;部分代码展示设计总结项目获取方式 &#x1f345; 作者主页&#xff1a;超级无敌暴龙战士塔塔开 &#x1f345; 简介&#xff1a;Java领域优质创作者&#x1f3c6;、 简历模板、学习资料、面试题库【关注我&#xff0c;都给你】 &…

java: Annotation processing is not supported for module cycles.

java: Annotation processing is not supported for module cycles. 查了半天是造成了循环依赖 解决步骤 1.打开project structure 2.找到循环依赖的两个或多个模块&#xff0c;在dependencies中找到对应的模块并删除 仅记录我遇到这个问题的解决方法&#xff0c;并不适合所…

51-OLED显示代码

文章目录 初始化代码SSD1305初始化代码SSD1315初始化代码(来源于商家赠送) 起动&#xff0c;停止&#xff0c;应答启动停止应答 发送数据和命令发送一个字节发送1字节命令发送1字节数据 设置行列位置设置行起始位置设置列起始位置 屏幕显示相关清屏指数显示字符显示数字显示英文…

七层和四层的区别

OSI七层模型的结构如下&#xff1a; 物理层&#xff08;Physical Layer&#xff09;&#xff1a;负责传输原始比特流&#xff0c;实现数据在物理媒介上的传输&#xff1b; 数据链路层&#xff08;Data Link Layer&#xff09;&#xff1a;负责在相邻节点之间传输数据帧&#…

Springboot与RestTemplate

RestTemplate是Spring提供的用于访问Rest服务的客户端&#xff0c;RestTemplate提供了多种便捷访问远程Http服务的方法,能够大大提高客户端的编写效率。 一、使用Get进行访问 1、获取json格式 使用 getForEntity() API 发起 GET 请求&#xff1a; RestTemplate restTemplate…

Git简单使用和理解

workspace: 本地的工作目录。 index/stage&#xff1a;暂存区域&#xff0c;临时保存本地改动。 local repository: 本地仓库&#xff0c;只想最后一次提交HEAD。 remote repository&#xff1a;远程仓库。 对于Git,首先应该明白第一git是一种分布式版本控制系统&#xff0c;最…

后仿真中 module path polarity 问题

目录 一 未知极性 二 正极性 三 负极性 不知道大家有没有遇到这个问题:什么?我们知道的module path delay 指的是定义在specify...endspecify block 中的语句,指示输入-输出的延迟信息。 这里的module path 竟然还有极性问题,今天,来学习一下。 模块路径的极性是一…

FreeBSD服务器监控:核心指标解读与应用建议(Telnet)

随着企业IT环境的日益复杂&#xff0c;对服务器性能和稳定性的监控变得至关重要。特别是针对FreeBSD这类广泛应用的服务器操作系统&#xff0c;进行高效的监控和管理更是运维团队的核心任务。本文将针对监控易中FreeBSD服务器的核心监控指标进行解读&#xff0c;并提供相应的应…

用RNN构建人名分类器

目录 项目综述1.导入必备的工具包2.处理数据&#xff0c;满足训练要求2.1 统计常用的字符2.2 进行规范化处理,去除重音符号2.3 将文件读取到内存中2.4 构建人名国家和具体人名的对应关系2.5 one-hot编码 3.构建RNN模型3.1 构建传统RNN模型3.2 构建传统LSTM模型3.3 构建传统GRU模…

永久免费设备日志采集工具

免费试用下载: Gitee下载 最新版本 优势: A. 开箱即用. 解压直接运行.不需额外安装. B. 批管理设备. 设备配置均在后台管理. C. 无人值守 客户端自启动,自更新. D. 稳定安全. 架构简单,内存占用小,通过授权访问.

openeuler一个服务异常占用cpu的排查过程

1 环境 硬件环境&#xff1a;LS1046A arm64 系统环境&#xff1a;openEuler release 22.03 (LTS-SP1) Linux kernel 4.19.26 2 问题说明 我的硬件平台需要适配一下 openEuler release 22.03 (LTS-SP1) 但是目前只能使用原来硬件平台的内核&#xff0c;在适配的过程中…

LLM大语言模型应用方案之RAG检索增强生成的实现步骤。

0.我理解的RAG 什么是RAG&#xff1f; RAG的全称是“检索增强生成模型”&#xff08;Retrieval-Augmented Generation&#xff09;。这是一种特别聪明的大语言模型。 RAG是怎么工作的呢&#xff1f; 1.检索&#xff1a;当你问RAG一个问题时&#xff0c;它会先去“图书…

aardio - 【库】lock 跨进程读写锁

import win.ui; /*DSG{{*/ var winform win.form(text"aardio form";right272;bottom203;topmost1) winform.add( button{cls"button";text"无锁演示";left27;top132;right120;bottom184;z2}; button2{cls"button";text"有锁演示…

正确使用 QSqlDatabase 与 QSqlQuery 的方法

前言 在使用 Qt 的数据库模块时&#xff0c;许多人可能会对 QSqlDatabase 和 QSqlQuery 的使用方法感到困惑。本文将对这一话题进行详细讨论&#xff0c;并提供一些最佳实践。 基本使用示例 首先&#xff0c;让我们看看以下代码片段&#xff1a; {QSqlDatabase db QSqlDat…

Redis的实战常用一、验证码登录(解决session共享问题)(思路、意识)

一、基于session实现登录功能 第一步&#xff1a;发送验证码&#xff1a; 用户在提交手机号后&#xff0c;会校验手机号是否合法&#xff1a; 如果不合法&#xff0c;则要求用户重新输入手机号如果手机号合法&#xff0c;后台此时生成对应的验证码&#xff0c;同时将验证码进行…

前端路线指导(4):前端春招秋招经验分享

春招/秋招经验分享(前端) 哈喽大家好&#xff0c;我是小粉&#xff0c;双一流本科&#xff0c;自学前端一年&#xff0c;收获腾讯&#xff0c;字节等多家大厂offer&#xff0c;一半以上ssp~ 今天给大家分享一下我的春招&#xff08;暑期实习&#xff09;、秋招经历&#xff0c;…