springboot kafka消息消费学习 @KafkaListener 使用

kafka 配置类

用途:定义使用的基本 kafka 配置,以及定义Bean
下面文件是读取本地 spring 的标准配置文件的类,用于一般属性获取等操作

import lombok.Data;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;@Component
@ConfigurationProperties(prefix = "my.kafka")
@Data
public class MyTaskKafkaProperties {/**r* kafka地址*/private String serverUrl;/*** groupId*/private String groupId;/*** topic*/private String topic;private boolean enableAutoCommit;private String autoOffsetReset;@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(6);factory.getContainerProperties().setPollTimeout(6000);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}private ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}private Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);return props;}
}

@Data 为其他用于控制get set 方法的,与 此处配置不是强关联,可以没有

实际 kafka 监听消费

import com.dtdream.dthink.dtalent.dmall.openplat.service.opendata.OpenDataService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.util.Optional;@Slf4j
@ConditionalOnProperty(name = "my.kafka.enable", havingValue = "true")
@Component
public class MyTaskConsumer {@Autowiredprivate XxxxxService xxxxxService;@KafkaListener(topics = "${my.kafka.topic}", groupId = "${my.kafka.groupId}",containerFactory = "kafkaTwoContainerFactory")public void dxpTaskEnd(ConsumerRecord<String, String> record, Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {consume(record, ack, topic, msg -> xxxxxService.xxxxxxx(msg));}private void consume(ConsumerRecord<String, String> record, Acknowledgment ack, String topic,java.util.function.Consumer<String> consumer) {Optional<String> optional = Optional.ofNullable(record.value());if (!optional.isPresent()) {log.warn("kafka收到消息 但为空,record:{}", record);return;}String msg = optional.get();log.info("kafka收到消息  开始消费 topic:{},msg:{}", topic, msg);try {consumer.accept(msg);// 上面方法执行成功后手动提交ack.acknowledge();log.info("kafka收到消息消费成功 topic:{},msg:{}", topic, msg);} catch (Exception e) {log.error("kafka消费消息失败 topic:{},msg:{}", topic, msg, e);}}
}

@ConditionalOnProperty spring boot 用于判断当前类是否加载的条件

XxxxxService: 为我们的业务服务层,用于消费消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/68727.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第十八章、【Linux】认识与分析登录文件

18.1 什么是登录文件 什么是登录文件&#xff1f;简单地说&#xff0c;就是记录系统活动信息的几个文件&#xff0c;例如&#xff1a;何时何地何人&#xff0c;做了什么工作。换句话说就是&#xff1a;记录系统在什么时候由哪个程序做了什么样的行为时&#xff0c;发生了什么事…

前端食堂技术周刊第 97 期:Astro 3.0、8 月登陆 Web 平台的新功能、前端技术方案文档、理想的视口并不存在

美味值&#xff1a;&#x1f31f;&#x1f31f;&#x1f31f;&#x1f31f;&#x1f31f; 口味&#xff1a;栀子椰椰 食堂技术周刊仓库地址&#xff1a;https://github.com/Geekhyt/weekly 大家好&#xff0c;我是童欧巴。欢迎来到前端食堂技术周刊&#xff0c;我们先来看下…

个人炒伦敦银方法大公开

个人炒伦敦银的方法与机构投资者炒这个品种的方法是有不同的&#xff0c;但是双方可能会借鉴一些相同的分析工具&#xff0c;比方说有的机构可能也会使用技术分析&#xff0c;当然&#xff0c;个人投资者对技术分析这个词更是不会陌生。今天我们就从个人投资者的角度出发&#…

LeetCode(力扣)669. 修剪二叉搜索树Python

LeetCode669. 修剪二叉搜索树 题目链接代码 题目链接 https://leetcode.cn/problems/trim-a-binary-search-tree/ 代码 递归 # Definition for a binary tree node. # class TreeNode: # def __init__(self, val0, leftNone, rightNone): # self.val val # …

如何在IPhone 14、14 Pro和14 Pro Max上添加屏幕锁定

当你第一次获得iPhone时&#xff0c;系统会提示你为它创建一个密码&#xff0c;这样只有你才能访问它。你应该使用一个必须输入的密码&#xff0c;以便在iPhone 14被唤醒或打开时解锁它。这将提供更高级别的保护。当你打开数据保护时&#xff0c;iPhone上的数据会被加密&#x…

StringBuilder和SpringBuffer的区别

StringBuilder和SpringBuffer的区别 StringBuilder ​ StringBuilder不是线程安全的&#xff0c;这就意味着多个线程对StringBuilder进行访问的时候&#xff0c;可能造成数据不一致或异常。因为它适用于单线程的情况&#xff0c;如果我们确保使用环境在单线程的情况可以使用S…

Unity Android 之 在Unity 中引入 OkHttp的操作注意(OKHttp4.xx- kotlin 的包)简单记录

Unity Android 之 在Unity 中引入 OkHttp的操作注意(OKHttp4.xx- kotlin 的包)简单记录 目录 Unity Android 之 在Unity 中引入 OkHttp的操作注意(OKHttp4.xx- kotlin 的包)简单记录 一、简单介绍 二、OKHttp 4.xx 的 SDK 封装 aar 给 Unity 的使用注意 三、附录 OKHttp 的…

Linux gdb单步调试的原理

文章目录 一、demo演示二、原理分析参考资料 一、demo演示 .section .data message:.string "Hello, World!\n" len . - message.section .text .globl _start _start:# 调用 write() 函数输出 "Hello, World!"mov $1, %rax # 系统调用号为 1…

explain 参数解析

12个返回参数&#xff0c;分5、4、3理解记忆 5个 查询相关 id每个select 对应一个id&#xff0c;id越大越优先执行&#xff0c;优先级越高select_type查询类型&#xff0c;有11种table当前select对应的表名partitions匹配的表分区type对当前表的关联或者说匹配类型&#xff0…

leetcode70爬楼梯

题目&#xff1a; 假设你正在爬楼梯。需要 n 阶你才能到达楼顶。 每次你可以爬 1 或 2 个台阶。你有多少种不同的方法可以爬到楼顶呢&#xff1f; 示例 1&#xff1a; 输入&#xff1a;n 2 输出&#xff1a;2 解释&#xff1a;有两种方法可以爬到楼顶。 1. 1 阶 1 阶 2. …

AIGC专栏5——EasyPhoto AI写真照片生成器 sd-webui插件介绍、安装与使用

AIGC专栏5——EasyPhoto AI写真照片生成器 插件安装与使用 学习前言源码下载地址技术原理储备&#xff08;SD/Control/Lora&#xff09;StableDiffusionControlNetLora EasyPhoto插件简介EasyPhoto插件安装安装方式一&#xff1a;Webui界面安装 &#xff08;需要良好的网络&…

安全测试目录内容合集

基础知识 安全测试基础知识 安全测试-django防御安全策略 HTTP工作原理 靶场DVWA 安全测试网站-DWVA下载安装启动 DVWA-Command Injection DVWA-5.File upload 文件上传漏洞 DVWA-9.Weak Session IDs DVWA-XSS (Stored) DVWA-10.XSS (DOM) DVWA-XSS (Reflected) DVWA-15.Ope…

Wasm软件生态系统安全分析

本文转载自 OpenHarmony TSC 官方微信公众号《峰会回顾第12期 | Wasm软件生态系统安全分析》 演讲嘉宾 | 王浩宇 回顾整理 | 廖 涛 排版校对 | 李萍萍 嘉宾简介 王浩宇&#xff0c;华中科技大学教授&#xff0c;博士生导师&#xff0c;华中科技大学OpenHarmony技术俱乐部主任…

Qt CMake 中国象棋程序实现

前驱课程 C自学精简实践教程 目录(必读) C数据结构与算法实现&#xff08;目录&#xff09; Qt 入门实战教程&#xff08;目录&#xff09; 项目初衷 为学习 Qt 的人提供一个合适的有一定难度的综合型练习项目。 在学会写代码之前&#xff0c;先看别人怎么写的代码。深入…

手动实现uni-app可用的new URL

使用 import URL from url const url new URL(https://www.aaa.com:8989/bbb/ccc/ddd.html?e1&f2&g#h3&i4&j?k5#l6&e4) console.log(url)结果 {"href": "https://www.aaa.com:8989/bbb/ccc/ddd.html?e1&f2&g#h3&i4&j…

Linux以系统服务的方式启动Kafka(其他服务同理)

最终效果&#xff1a; 先回顾命令行的启动方式&#xff1a; kafka的启动 进入kafka的安装目录 1、首先启动zookeeper服务&#xff1a; bin/zookeeper-server-start.sh config/zookeeper.properties2、再启动kafka bin/kafka-server-start.sh config/server.properties &…

Ajax模拟视频点赞功能

前台 <%--Created by IntelliJ IDEA.User: xxDate: 2023/9/4Time: 10:00To change this template use File | Settings | File Templates. --%> <% page contentType"text/html;charsetUTF-8" language"java" %> <html> <head>&l…

记录一次WMware网络问题

目录 ​编辑 一、问题描述 二、问题排查 2.1 指令ifconfig 查看ip信息 2.2 nmcli n 查看网卡状态 三、问题解决 3.1 启动 NetworkManager 网络管理器 3.2 ifup ens160 启动网卡 一、问题描述 我在我本地电脑上使用WMware虚拟机部署了k8s&#xff0c;有次正常关机后&am…

docker安装mysql、clickhouse、oracle等各种数据库汇总

1&#xff1a;docker 安装mongo数据库并使用 官网&#xff1a;https://www.mongodb.com/docs/manual/ 安装 &#xff1a;https://www.zhihu.com/question/54602953/answer/3047452434?utm_id0 安装2&#xff1a;https://www.duidaima.com/Group/Topic/ArchitecturedDesign/91…

让GPT成为您的科研加速器丨GPT引领前沿与应用突破之GPT4科研实践技术与AI绘图

GPT对于每个科研人员已经成为不可或缺的辅助工具&#xff0c;不同的研究领域和项目具有不同的需求。如在科研编程、绘图领域&#xff1a;1、编程建议和示例代码:无论你使用的编程语言是Python、R、MATLAB还是其他语言&#xff0c;都可以为你提供相关的代码示例。​2、数据可视化…