SpringBoot 实现整合kafka的简单使用

1、引入kafka的依赖

        <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId></dependency>

2、配置kafka

spring:kafka:bootstrap-servers: 156.65.20.76:9092,156.65.20.77:9092,156.65.20.78:9092 #指定Kafka集群的地址,这里有三个地址,用逗号分隔。listener:ack-mode: manual_immediate #设置消费者的确认模式为manual_immediate,表示消费者在接收到消息后立即手动确认。concurrency: 3  #设置消费者的并发数为5missing-topics-fatal: false  #设置为false,表示如果消费者订阅的主题不存在,不会抛出异常。producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer  # 设置消息键的序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer #设置消息值的序列化器acks: 1  #一般就是选择1,兼顾可靠性和吞吐量 ,如果想要更高的吞吐量设置为0,如果要求更高的可靠性就设置为-1consumer:auto-offset-reset: earliest #设置为"earliest"表示将从最早的可用消息开始消费,即从分区的起始位置开始读取消息。enable-auto-commit: false #禁用了自动提交偏移量的功能,为了避免出现重复数据和数据丢失,一般都是手动提交key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 设置消息键的反序列化器value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #设置消息值的反序列化器

3、创建主题

  • 自动创建(不推荐)

    在kafka的安装目录conf目录下找到该配置文件server.properties,添加如下配置:
    num.partitions=3 #默认3个分区
    auto.create.topics.enable=true #开启自动创建主题
    default.replication.factor=3 #默认3个副本
    
  • 手动创建

在kafka的安装目录bin目录下,执行如下命令:
//创建一个有三个分区和三个副本,名为zhuoye的主题
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3  --topic xxxx

4、生产者代码

@Slf4j
@Component
public class ALiYunServiceImpl implents IALiYunService {@Autowiredprivate KafkaTemplate kafkaTemplate;@Autowiredprivate ExecutorService executorService;String topicName = "xxxx";@Overridepublic void queryInfo() {List<Message> messages = Collections.synchronizedList(new ArrayList<>());boolean flag = true;//获取上次查询时间Long startTime = Long.valueOf(queryTimeRecordMapper.selectTimeByBelongId(3)) * 1000;Long endTime = System.currentTimeMillis();try {List<CloudInstanceAssetDto> cloudInstances = cloudInstanceAssetMapper.queryAllRunningInstance(1, "Running");if (CollectionUtils.isEmpty(cloudInstances)) {return;}//定义计数器CountDownLatch latch = new CountDownLatch(cloudInstances.size());//遍历查询for (CloudInstanceAssetDto instance : cloudInstances) {executorService.submit(() -> {try {dealMetricDataToMessage(ALiYunConstant.ECS_INTRANET_OUT_RATE, ALiYunConstant.INTRANET_OUT_RATE_NAME, ALiYunConstant.LW_INTRANET_OUT_RATE_CODE,startTime, endTime, instance, messages);} catch (Exception e) {} finally {latch.countDown();}});}//等待任务执行完毕latch.await();//将最终的消息集合发送到kafkaif (CollectionUtils.isNotEmpty(messages)) {for (int i = 0; i < messages.size(); i++) {if (StringUtils.isNotBlank(messages.get(i).getValue())&& "noSuchInstance".equals(messages.get(i).getValue())) {continue;}kafkaTemplate.send(topicName,  messages.get(i));}}} catch (Exception e) {flag = false;}}

这个时候,如果你想看有没有把消息发送到kafka的指定主题可以使用如下命令:

kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic xxxx

5、消费者代码

@Slf4j
@Component
public class KafkaConsumer {// 消费监听@KafkaListener(topics = "xxxx",groupId ="aliyunmetric")public void consumeExtractorChangeMessage(ConsumerRecord<String, String> record, Acknowledgment ack){try {String value = record.value();//处理数据,存入openTsDb.................................ack.acknowledge();//手动提交}catch (Exception e){log.error("kafa-topic【zhuoye】消费阿里云指标源消息【失败】");log.error(e.getMessage());}}
}

6、常用Kafka的命令

//创建主题
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3  --topic zhuoye
//查看kafka是否接收对应的消息kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic xxxx
// 修改kafka-topic分区数
./kafka-topics.sh --zookeeper localhost:2181 -alter --partitions 6 --topic xxxx
// 查看topic分区数
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic xxxx
// 查看用户组消费情况
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group zhuoye-aliyunmetric --describe

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/45945.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux系统下weblogic10.3.6版本打补丁步骤

linux系统 weblogic补丁压缩包&#xff1a;p35586779_1036_Generic.zip 链接&#xff1a;https://pan.baidu.com/s/1EEz_zPX-VHp5EU5LLxfxjQ 提取码&#xff1a;XXXX &#xff08;补丁压缩包中包含以下东西&#xff09; 打补丁步骤&#xff1a; 1.备份原weblogic(需要先确保服…

Langchain[3]:Langchain架构演进与功能扩展:流式事件处理、事件过滤机制、回调传播策略及装饰器应用

Langchain[3]:Langchain架构演进与功能扩展&#xff1a;流式事件处理、事件过滤机制、回调传播策略及装饰器应用 1. Langchain的演变 v0.1: 初始版本&#xff0c;包含基本功能。 从0.1~0.2完成的特性&#xff1a; 通过事件流 API 提供更好的流式支持。标准化工具调用支持Tool…

【linux 100条命令】

以下介绍一些常见的 Linux 命令&#xff1a; 1. ls &#xff1a;用于列出目录中的内容。 - 常用选项&#xff1a; - -l &#xff1a;以长格式显示详细信息&#xff0c;包括文件权限、所有者、所属组、文件大小、修改时间等。 - -a &#xff1a;显示所有文件&#xff0c;包…

哪些基于 LLMs 的产品值得开发?从用户体验和市场接受度的角度探讨

编者按&#xff1a;在大语言模型&#xff08;LLMs&#xff09;相关技术高速发展的今天&#xff0c;哪些基于 LLMs 的产品真正值得我们投入精力开发&#xff1f;如何从用户体验和市场接受度的角度评估这些产品的潜力&#xff1f; 今天为大家分享的这篇文章&#xff0c;作者的核心…

从代理模式到注解开发

代理模式 package org.example.proxy;public class ProxyClient {public static void main(String[] args) {ProxyBuilder proxyBuilder new ProxyBuilder();proxyBuilder.build();} }interface BuildDream {void build(); }class CustomBuilder implements BuildDream {Over…

visual studio开发C++项目遇到的坑

文章目录 1.安装的时候&#xff0c;顺手安装了C模板&#xff0c;导致新建项目执行出问题2.生成的exe&#xff0c;打开闪退问题3.项目里宏的路径不对&#xff0c;导致后面编译没有输出4. vs编译ui&#xff0c;warning跳过&#xff0c;未成功5.vs编译.h&#xff0c;warning跳过&a…

K8S 中的 CRI、OCI、CRI shim、containerd

K8S 如何创建容器&#xff1f; 下面这张图&#xff0c;就是经典的 K8S 创建容器的步骤&#xff0c;可以说是冗长复杂&#xff0c;至于为什么设计成这样的架构&#xff0c;继续往下读。 前半部分 CRI&#xff08;Container Runtime Interface&#xff0c;容器运行时接口&#xf…

避免海外业务中断,TikTok养号注意事项

TikTok已成为企业和个人拓展海外业务的重要平台。然而&#xff0c;由于平台规则严格&#xff0c;账号被封禁或限制访问的风险始终存在。为了确保用户在TikTok上的业务顺利进行&#xff0c;着重说一些养号的注意事项。 文章分为三个部分&#xff0c;分别是遵守平台规则、养号策略…

Qt判定鼠标是否在该多边形的线条上

要判断鼠标是否在由QPainterPath或一系列QPointF点定义的多边形的线条上&#xff0c;你可以使用以下步骤&#xff1a; 获取鼠标当前位置&#xff1a;在鼠标事件中&#xff0c;使用QMouseEvent的pos()方法获取鼠标的当前位置。 检查点与线段的距离&#xff1a;遍历多边形的每条…

面试高级 Java 工程师:2024 年的见闻与思考

面试高级 Java 工程师&#xff1a;2024 年的见闻与思考 由于公司业务拓展需要&#xff0c;公司招聘一名高级java工程研发工程师&#xff0c;主要负责新项目的研发及老项目的维护升级。我作为一名技术面试官&#xff0c;参与招聘高级 Java 工程师&#xff0c;我见证了技术领域的…

LATEX格式的高等数学题库(导数和概率论与数理统计)

\documentclass{ctexart} \usepackage{amsmath,amssymb,amsfonts,hyperref} \usepackage{CJKutf8} \usepackage{enumitem} % 引入宏包 \usepackage [colorlinkstrue] {} \begin{document}\begin{CJK}{UTF8}{gkai}%正文放在此行下与\end{CJK}之间就行\tableofcontents\newpage\s…

F1-score(标准度量)

什么是F1-score&#xff1f; F1分数&#xff08;F1-score&#xff09;是分类问题的一个衡量指标。一些多分类问题的机器学习竞赛&#xff0c;常常将F1-score作为最终测评的方法。它是精确率和召回率的调和平均数&#xff0c;最大为1&#xff0c;最小为0&#xff0c;如公式1所示…

高效转换:CSV 转 JSON 数组 API

在日常数据处理和分析中&#xff0c;CSV 和 JSON 是两种常见的数据格式。无论是开发者还是数据科学家&#xff0c;经常需要在这两种格式之间转换。我们提供的 CSV 转 JSON 数组 API 可以帮助您轻松完成这一任务。 功能特点&#xff1a; 多种输入方式&#xff1a;支持直接粘贴…

使用GPT3.5,LangChain,FAISS和python构建一个本地知识库

引言 介绍本地知识库的概念和用途 在现代信息时代&#xff0c;我们面临着海量的数据和信息&#xff0c;如何有效地管理和利用这些信息成为一项重要的任务。本地知识库是一种基于本地存储的知识管理系统&#xff0c;旨在帮助用户收集、组织和检索大量的知识和信息。它允许用户…

C语言-->指针详解

提示&#xff1a;本系列文章是C语言的重难点–>指针 C语言-->指针详解 前言一、什么是指针&#xff1f;二、指针的声明与初始化三、指针的解引用四、指针与数组五、指针与函数六、动态内存分配七、常见错误与注意事项总结我是将军我一直都在&#xff0c;。&#xff01; 前…

Oracle或MySQL数据迁移到国产数据库后的注意事项

一、人大金仓Kingbase 1、初始化后兼容 创建sysdate()方法兼容原生MySQL模式下不具备sysdate()的问题&#xff1a; create or replace function sysdate() returns timestamp with time zone as select current_timestamp; language sql; 2. 执行语句收集统计信息&#xff…

1.5-协程基础与关键知识:连接线程的世界-回调型 API 协作

文章目录 线程 API 转换成挂起函数&#xff1a;suspendCoroutine支持取消的 suspendCoroutine&#xff1a;suspendCancellableCoroutine总结 线程 API 转换成挂起函数&#xff1a;suspendCoroutine 在实际项目中即使已经使用协程了&#xff0c;可是要完全避免跟传统的线程 API…

Excel 学习手册 - 精进版(包括各类复杂函数及其嵌套使用)

作为程序员从未想过要去精进一下 Excel 办公软件的使用方法&#xff0c;以前用到某功能都是直接百度&#xff0c;最近这两天跟着哔哩哔哩上的戴戴戴师兄把 Excel 由里到外学了一遍&#xff0c;收获良多。程序员要想掌握这些内容可以说是手拿把掐&#xff0c;对后续 Excel 的运用…

linux的学习(七):读取,函数,正则表达式,文本处理工具cut和awk

##简介 shell编程中的读取&#xff0c;函数&#xff0c;正则表达式&#xff0c;文本处理工具的简单使用 read read&#xff1a;读取控制台的输入 参数&#xff1a; -p&#xff1a;指定读取时的提示符-t&#xff1a;等待读取的时间 脚本例子 编写i.sh脚本&#xff0c;enter…

算法实验3:贪心算法的应用

实验内容 &#xff08;1&#xff09;活动安排问题 设有n个活动的集合E{1, 2, …, n}&#xff0c;其中每个活动都要求使用同一资源&#xff0c;而在同一时间内只有一个活动能使用这一资源。每个活动i都有一个要求使用该资源的起始时间si和一个结束时间fi&#xff0c;且si <f…