kafka生产者与消费者

文章目录

  • 一、 pom.xml依赖包
  • 二、yml配置文件
  • 三、消费者
  • 四、生产者
  • 总结


提示:这里可以添加本文要记录的大概内容:

一、 pom.xml依赖包

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.0</version>
</dependency>

二、yml配置文件

spring:kafka:listener:concurrency: 3  #线程数ack-mode: manual_immediatetype: batch #批量bootstrap-servers: 192.168.1.214:9092# 生产者配置producer:
#      retries: 1 # 消息发送重试次数batch-size: 16384buffer-memory: 33554432value-serializer: org.apache.kafka.common.serialization.StringSerializerkey-serializer: org.apache.kafka.common.serialization.StringSerializer#消费者需配置,生产者不需要consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: goodwe-touring-car-groupid-1auto-offset-reset: earliest #latest, earliest, noneenable-auto-commit: falseauto-commit-interval: 5000max-poll-records: 1000        #批量消费最大数量topic: portable_performance#自定义项目run, 运行kafka.
custom:run:kafka: true############################### 参数说明 #########################################consumer:# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5Dauto-commit-interval: 1S# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录auto-offset-reset: earliest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:# 在侦听器容器中运行的线程数。concurrency: 5#listner负责ack,每调用一次,就立即commitack-mode: manual_immediatemissing-topics-fatal: false

三、消费者

import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.goodwe.kafkaapi.model.constant.RedisConst;
import com.goodwe.kafkaapi.model.entity.ConsumerMessageData;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;import javax.annotation.Resource;
import java.util.*;
import java.util.stream.Collectors;/*** @Description : kafka消费者** @Author : LiYan* @CreateTime : 2023/8/16 8:35*/
@Slf4j
@Configuration
public class KafkaConsumer {private static final String REDIS_KEY = RedisConst.getREDIS_PREFIX() + RedisConst.getKEY();@Resourceprivate RedisTemplate<String,String> redisTemplate;@KafkaListener(topics = "#{'${spring.kafka.topic}'}", autoStartup = "${custom.run.kafka}")public void receive(List<ConsumerRecord<String, String>> listMessage, Acknowledgment ack) {try {log.info("----------------------开始消费消息--------------------------");if (CollectionUtils.isNotEmpty(listMessage)) {Map<String, ConsumerMessageData> dataMap = listMessage.stream().map(message -> JSON.parseObject(message.value(), ConsumerMessageData.class)).collect(Collectors.toMap(ConsumerMessageData::getSn, data -> data, (oldValue, newValue) -> newValue));dataMap.forEach((key, value) -> {redisTemplate.opsForZSet().add(REDIS_KEY, JSON.toJSONString(value), System.currentTimeMillis());});}} catch (Exception ex) {log.info("【断点续传处理】消费断点续传数据error;", ex);} finally {ack.acknowledge();}}
}

四、生产者

@SpringBootTest
class KafkaApiApplicationTests {@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;@Testpublic void testRedis(){List<ConsumerMessageData> messageData = messageData();for (ConsumerMessageData data : messageData) {String topic = "portable_performance";kafkaTemplate.send(topic, JSON.toJSONString(data));}}
}@RestController
public class KafkaController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@PostMapping("/send")public void sendMessage(@RequestBody String message) {kafkaTemplate.send("my-topic", message);}}

总结

================== 好记性不如烂笔头=========================

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/647847.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java实战:轻松掌握文件重命名与路径提取技巧

目录 一、使用File类进行文件重命名应用场景1&#xff1a;文件上传到服务器后保持原有文件名 二、从字符串中提取文件路径应用场景2&#xff1a;只获取不带http前缀的文件路径url应用场景3&#xff1a;获取文件路径url下的其他文件 三、解决反斜杠字符的问题应用场景4&#xff…

如何提高图片分辨率?3个方法让图片秒变清晰

如何提高图片分辨率&#xff1f;在日常生活中&#xff0c;我们经常需要处理各种图片。有时候&#xff0c;这些图片的分辨率可能比较低&#xff0c;导致无法满足我们的需求。例如&#xff0c;当我们想将图片放大或裁剪时&#xff0c;低分辨率的图片可能会出现模糊、失真等问题。…

计算机网络 第6章(应用层)

系列文章目录 计算机网络 第1章&#xff08;概述&#xff09; 计算机网络 第2章&#xff08;物理层&#xff09; 计算机网络 第3章&#xff08;数据链路层&#xff09; 计算机网络 第4章&#xff08;网络层&#xff09; 计算机网络 第5章&#xff08;运输层&#xff09; 计算机…

基于springboot+vue的新闻推荐系统(前后端分离)

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容&#xff1a;毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 项目背景…

常用电子器件学习——光耦

光耦介绍 光耦合器一般由三部分组成&#xff1a;光的发射、光的接收及信号放大。 输入的电信号驱动光发射源&#xff0c;使之发光&#xff0c;被光探测器接收而产生光电流&#xff0c;再经过进一步放大后输出。这就完成了电—光—电的转换&#xff0c;从而起到输入、输出、隔离…

Unity开发中的XML注释

在Unity开发中&#xff0c;XML注释主要用于C#脚本的注释&#xff0c;以帮助生成代码文档和提供IntelliSense功能。以下是一些关于如何使用XML注释的技巧&#xff1a; 创建注释&#xff1a; 在C#中&#xff0c;XML注释是由///或/**...*/开始的。例如 /// <summary> /// 这…

刘润-进化的力量2 一刷 笔记

安全感来自确定性&#xff0c;但机会藏在不确定性中 安全感来自确定性&#xff0c;但机会藏在不确定性中。 每一个弯道里&#xff0c;都有你超车的机会 意外、周期、趋势、规划 可是&#xff0c;为什么趋势一定是不可逆转的呢&#xff1f;因为&#xff0c;效率提高了 长期…

配置接口策略路由案例

知识改变命运&#xff0c;技术就是要分享&#xff0c;有问题随时联系&#xff0c;免费答疑&#xff0c;欢迎联系 厦门微思网络​​​​​​ https://www.xmws.cn 华为认证\华为HCIA-Datacom\华为HCIP-Datacom\华为HCIE-Datacom Linux\RHCE\RHCE 9.0\RHCA\ Oracle OCP\CKA\K8S\…

【Image captioning】论文阅读七—Efficient Image Captioning for Edge Devices_AAAI2023

中文标题:面向边缘设备的高效图像描述(Efficient Image Captioning for Edge Devices) 文章目录 1. 引言2. 相关工作3. 方法3.1 Model Architecture(模型结构)3.2 Model Training (模型训练)3.3 Knowledge Distillation (知识蒸馏)4. 实验4.1 数据集和评价指标4.2 实施细…

c++中 cin中的hello world 也有需要注意的事情

文章目录 一个demo先看一段代码思考一下看一下结果 输入cincin.get()先看一段代码思考一下看一下结果解决办法cin.getline() 一个demo 在std命名空间中的cin属于标准输入了。 先看一段代码 char yourname[50];cout << "请输入你的姓名" << endl; cin.g…

如何使用Jellyfin+cpolar搭建私人影音平台实现无公网ip远程访问

文章目录 1. 前言2. Jellyfin服务网站搭建2.1. Jellyfin下载和安装2.2. Jellyfin网页测试 3.本地网页发布3.1 cpolar的安装和注册3.2 Cpolar云端设置3.3 Cpolar本地设置 4.公网访问测试5. 结语 1. 前言 随着移动智能设备的普及&#xff0c;各种各样的使用需求也被开发出来&…

STC89C51单片机

本文为博主 日月同辉&#xff0c;与我共生&#xff0c;csdn原创首发。希望看完后能对你有所帮助&#xff0c;不足之处请指正&#xff01;一起交流学习&#xff0c;共同进步&#xff01; > 发布人&#xff1a;日月同辉,与我共生_单片机-CSDN博客 > 欢迎你为独创博主日月同…

【c语言】扫雷

前言&#xff1a; 扫雷是一款经典的单人益智游戏&#xff0c;它的目标是在一个方格矩阵中找出所有的地雷&#xff0c;而不触碰到任何一颗地雷。在计算机编程领域&#xff0c;扫雷也是一个非常受欢迎的项目&#xff0c;因为它涉及到许多重要的编程概念&#xff0c;如数组、循环…

pinctrl子系统与gpio子系统实验-向设备树文件添加Led设备节点

一. 简介 前面几篇文章学习了 Linux内核中 针对 GPIO而言&#xff0c;提供的pinctrl子系统与gpio子系统。 本文开始学习如何利用 Linux内核的 pinctrl子系统&#xff0c;与 gpio子系统提供的 API函数&#xff0c;开发 Led驱动实验。 本文首先来学习向设备树文件中添加Led设…

SpringBoot ResponseBodyAdvice使用以及常见问题

简介 PS: advice, 在这里意思是顾问, 其余很多场景也是顾问的意思由于篇幅问题, 注释已删, 如想看注释, 请在github中查看 作用: 用于在Controller返回后, HttpMessageConverter执行转换之前执行一些转换 常见场景: 统一响应结构, 如json统一包装 由于版本不同, 多少有些差异…

Sketch怎么增加组件?

Sketch怎么增加组件&#xff1f;Sketch组件库经常使用&#xff0c;想要添加一些新的组件&#xff0c;该怎么添加呢&#xff1f;下面我们就来看看Sketch组件库添加新组建的技巧&#xff0c;详细请看下文介绍 打开电脑&#xff0c;找到sketch软件的图标&#xff0c;点击进入 新建…

计算机工作原理解析和解剖(基础版)

我们会从软件⼯程师的⻆度解释计算机是如何⼯作的&#xff0c;我们的主要⽬标既不是期待 ⼤家可以造出⾃⼰的计算机&#xff0c;也不是介绍如何编程&#xff0c;⽽是希望让⼤家了解计算机的核⼼⼯作机制后&#xff0c;打破计算机的神秘感&#xff0c;并且有利于理解我们平时编程…

【大数据】Flink 中的数据传输

Flink 中的数据传输 1.基于信用值的流量控制2.任务链接 在运行过程中&#xff0c;应用的任务会持续进行数据交换。TaskManager 负责将数据从发送任务传输至接收任务。它的网络模块在记录传输前会先将它们收集到 缓冲区 中。换言之&#xff0c;记录并非逐个发送的&#xff0c;而…

Stream实战-统计求和

Stream实战-统计 stream在开发中经常使用场景就是统计&#xff0c;再次记录一下实际开发中用的到统计&#xff0c;使用模拟数据。 需求如下&#xff1a; 代码如下: /*** map集合统计*/ public class StreamDemo4 {/*** 实体类*/DataAllArgsConstructorNoArgsConstructorclas…

Python模块与包:扩展功能、提高效率的利器

文章目录 一、引言1.1 模块与包对于Python开发的重要性1.2 Python作为拥有丰富生态系统的编程语言 二、为什么学习模块与包2.1 复用代码&#xff1a;利用现有模块与包加速开发过程2.2 扩展功能&#xff1a;通过模块与包提供的功能增强应用的能力 三、模块的使用3.1 导入模块&am…