Spring Cloud Stream Kafka(3.2.2版本)使用

问题

正在尝试只用Spring Cloud Stream Kafka。

步骤

配置

spring:cloud:function:definition: project2Building    stream:kafka:binder:brokers: xxxx:9002configuration:enable.auto.commit: falsesession.timeout.ms: 30000max.poll.records: 30allow.auto.create.topics: falseauto.offset.reset: earliest# 反序列化配置key.serializer: org.apache.kafka.common.serialization.StringDeserializervalue.deserializer: org.apache.kafka.common.serialization.StringDeserializer# JAAS配置security.protocol: SASL_PLAINTEXTsasl.mechanism: PLAINsasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxx\" password=\"xxx\";"autoCreateTopics: falsebindings:# 自定义消费bean的方法名称project2Building-in-0:# 消费组group: xxxx# 主题destination: xxxx

消费方法

package xxxxx.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;import java.util.function.Consumer;/*** 与xxxx的kafka对接处理* @author zyl*/
@Slf4j
@Configuration
public class MainConfig {@Beanpublic Consumer<Message<String>> project2Building(){return msg ->{log.info(String.format("Kafka消息:%s",msg.getPayload()));// TODO 手动提交kafkaAcknowledgment acknowledgment = msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);if (acknowledgment != null) {acknowledgment.acknowledge();}};}}

总结

我用的这个版本Spring Cloud Stream Kafka(3.2.2版本)相对于Spring Boot 的Kafka库有点重量级了。这就是Spring Cloud 基于Kafka的流处理框架。

参考:

  • Spring Cloud Stream Kafka Binder Reference Guide
  • Kafka With Spring Cloud Streams Using Function-based Mode
  • Spring Cloud Stream - functional and reactive

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/90364.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PHP Web 开发基础

PHP是动态类型的Web开发的脚本语言&#xff0c;PHP以页面文件作为加载和运行的单元&#xff0c;PHP现在有了Composer作为开发包管理。 1.使用Composer管理依赖 自从.NET开发用了Nuget管理程序集依赖&#xff0c;我就再也离不开它了&#xff0c;幸亏Java中也有Maven管理jar包&…

如何定时备份使用Docker构建的MySQL容器中的数据库

&#x1f468;&#x1f3fb;‍&#x1f4bb; 热爱摄影的程序员 &#x1f468;&#x1f3fb;‍&#x1f3a8; 喜欢编码的设计师 &#x1f9d5;&#x1f3fb; 擅长设计的剪辑师 &#x1f9d1;&#x1f3fb;‍&#x1f3eb; 一位高冷无情的编码爱好者 大家好&#xff0c;我是 DevO…

PIE:1979-2018年中国气温数据产品(空间分辨率为0.1º)

简介 中国气温数据产品包含1979-2018年期间中国的近地表气温数据&#xff08;单位为摄氏度&#xff09;&#xff0c;时间分辨率为每日&#xff0c;空间分辨率为0.1。本产品集成了再分析数据&#xff08;ERA5、CMFD&#xff09;、遥感数据&#xff08;MODIS&#xff09;、原位数…

php eayswoole node axios crypto-js 实现大文件分片上传复盘

不啰嗦 直接上步骤 步骤1.开发环境配置 项目需要node.js 做前端支撑 官网下载地址&#xff1a; http://nodejs.cn/download/ 根据自己需要下载对应的版本,我下载的是windows系统64位的版本。 包下载好后 进行安装&#xff0c;安装步骤在此省略... 测试是否安装成功 …

蓝海彤翔亮相2023新疆网络文化节重点项目“新疆动漫节”

9月22日上午&#xff0c;2023新疆网络文化节重点项目“新疆动漫节”&#xff08;以下简称“2023新疆动漫节”&#xff09;在克拉玛依科学技术馆隆重开幕&#xff0c;蓝海彤翔作为国内知名的文化科技产业集团应邀参与此次活动&#xff0c;并在美好新疆e起向未来动漫展映区设置展…

C#生成自定义海报

安装包 SixLabors.ImageSharp.Drawing 2.0 需要的字体&#xff1a;宋体和微软雅黑 商用的需要授权如果商业使用可以使用方正书宋、方正黑体&#xff0c;他们可以免费商用 方正官网 代码 using SixLabors.Fonts; using SixLabors.ImageSharp; using SixLabors.ImageSharp.Draw…

python切分字符串

在Python中&#xff0c;您可以使用不同的方法来切分字符串&#xff0c;具体取决于您的需求和字符串的结构。以下是一些常见的方法&#xff1a; 使用split()方法&#xff1a; split()方法允许您根据指定的分隔符将字符串切分成子字符串&#xff0c;并返回一个包含这些子字符串的…

使用SPY++查看窗口信息去排查客户端UI软件问题

目录 1、使用SPY查看窗口的信息 2、使用SPY查看某些软件UI窗口用什么UI组件实现的 2.1、查看海康视频监控客户端安装包程序 2.2、查看华为协同办公软件WeLink 2.3、查看字节协同办公软件飞书 2.4、查看最新版本的Chrome浏览器 2.5、查看小鱼易连视频会议客户端软件 2.6…

C/S架构学习之UDP服务器

UDP服务器的实现流程&#xff1a;一、创建用户数据报套接字&#xff08;socket函数&#xff09;&#xff1a;通信域选择IPV4网络协议、套接字类型选择数据报式&#xff1b; int sockfd socket(AF_INET,SOCK_DGRAM,0); 二、填充服务器的网络信息结构体&#xff1a;1.定义网络信…

Java初始化大量数据到Neo4j中(一)

背景&#xff1a;我们项目第一次部署图数据库&#xff0c;要求我们把现有的业务数据以及关系上线第一时间初始化到Neo4j中。开发环境数据量已经百万级别。生成环境数据量更多。 我刚开始开发的时候&#xff0c;由于对Neo4j的了解并没有很多&#xff0c;第一想到的是用代码通用组…

Mybatis 批量修改数据,,并判断非空数据插入

方法一&#xff1a; <update id"updateListPO"> <foreach collection"list" separator";" item"item">UPDATE project_quotation_item<SET>product_num #{item.productNum},product_price_total #{item.pr…

CIP或者EtherNET/IP中的PATH是什么含义?

目录 SegmentPATH举例 最近在学习EtherNET/IP&#xff0c;PATH不太明白&#xff0c;翻了翻规范&#xff0c;在这里记个笔记。下面的叙述可能是中英混合&#xff0c;有一些是规范中的原文我直接搬过来的。我翻译的不准确。 Segment PATH是CIP Segment中的一个分类。要了解PATH…

每日一题 322零钱兑换(完全背包)(灵神版本)

题目 与01背包的区别就是可以重复拿一件物品 零钱兑换 给你一个整数数组 coins &#xff0c;表示不同面额的硬币&#xff1b;以及一个整数 amount &#xff0c;表示总金额。 计算并返回可以凑成总金额所需的 最少的硬币个数 。如果没有任何一种硬币组合能组成总金额&#xff…

Dev C++安装与运行

参考: https://blog.csdn.net/Keven_11/article/details/126388791 https://www.cnblogs.com/-Wallace-/p/cpp-stl.html 2021年真题要求 2022年真题要求 河南省的考试环境 IDE环境 Dev C 安装 下载 安装 点击OK&#xff0c;选择我接受 修改安装路径为D盘d:\Program Fi…

MQTT协议是什么?快速了解MQTT协议在物联网中的应用

随着工业互联网的迅猛发展&#xff0c;工业设备数据采集和实时监控成为制造业提高生产效率和质量的重要手段。在物联网应用中&#xff0c;通信技术包括Wi-Fi、RFID、NFC、RS232、RS485、USB等&#xff0c;其中在物联网技术框架体系中所使用到的通讯协议主要有&#xff1a;AMQP、…

[ruby on rails] postgres sql explain 优化

一、查看执行计划 sql User.all.to_sql # 不会实际执行查询 puts ActiveRecord::Base.connection.explain(sql)# 会实际执行查询&#xff0c;再列出计划 User.all.explain# 会实际执行查询&#xff0c;再列出计划ActiveRecord::Base.connection.execute(EXPLAIN ANALYZE sql…

[论文笔记]P-tuning

引言 今天带来第四篇大模型微调的论文笔记GPT Understands, Too。 本篇工作提出的方法是P-tuning,使用可训练的连续提示嵌入,使GPT在NLU上表现比传统的全量微调的GPT更好的效果。P-tuning还提高了BERT在少样本和监督设定下的性能,大幅减少了提示工程的需求。 总体介绍 根…

Django(21):使用Celery任务框架

目录 Celery介绍Celery安装Celery使用项目文件和配置启动Celery编写任务调用异步任务查看任务执行状态及结果 设置定时和周期性任务配置文件添加任务Django Admin添加周期性任务启动任务调度器beat Flower监控任务执行状态Celery高级用法与注意事项给任务设置最大重试次数不同任…

2023-09-28 mysql-代号m-schema调研-文档记录

摘要: mdb中的database与mdb中的database的概念南辕北辙, 可以说有着本质的不同. mysql中的database可以看作是table的namespace, 而在mdb中, 与此相似的概念也就是table的namespace的概念, 是schema. 为了将mysql的db与mdb的schema建立映射关系后的技术风险可控, 需要详细分…

26663-2011 大型液压安全联轴器 课堂随笔

声明 本文是学习GB-T 26663-2011 大型液压安全联轴器. 而整理的学习笔记,分享出来希望更多人受益,如果存在侵权请及时联系我们 1 范围 本标准规定了大型液压安全联轴器的分类、技术要求、试验方法及检验规则等。 本标准适用于联接两同轴线的传动轴系&#xff0c;可起到限制…