KafkaStream:Springboot中集成

1、在kafka-demo中创建配置类

        配置kafka参数

package com.heima.kafkademo.config;import lombok.Data;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;import java.util.HashMap;
import java.util.Map;/*** 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数*/@Data
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;private String hosts;private String group;@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {Map<String, Object> props = new HashMap<>();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");props.put(StreamsConfig.RETRIES_CONFIG, 10);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return new KafkaStreamsConfiguration(props);}
}

2、在application.yml中配置上面配置类需要的参数

server:port: 9991
spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafka:hosts: 192.168.200.130:9092group: ${spring.application.name}

3、新增配置类,创建KStream对象,进行聚合

package com.heima.kafkademo.stream;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.time.Duration;
import java.util.Arrays;@Configuration
@Slf4j
public class KafkaStreamHelloListener {@Beanpublic KStream<String,String> kStream(StreamsBuilder streamsBuilder){//创建kstream对象,同时指定从那个topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})//根据value进行聚合分组.groupBy((key,value)->value)//聚合计算时间间隔.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//求单词的个数.count().toStream()//处理后的结果转换为string字符串.map((key,value)->{System.out.println("key:"+key+",value:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//发送消息.to("itcast-topic-out");return stream;}
}

4、启动kafka-demo服务测试

        使用生产者发送消息可以看到控制台接收成功

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/39623.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

8月11日上课内容 nginx的多实例和动静分离

多实例部署 在一台服务器上有多个tomcat的服务。 配置多实例之前&#xff0c;看单个实例是否访问正常。 1.安装好 jdk 2.安装 tomcat cd /opt tar zxvf apache-tomcat-9.0.16.tar.gz mkdir /usr/local/tomcat mv apache-tomcat-9.0.16 /usr/local/tomcat/tomcat1 cp -a /u…

Linux系统管理:虚拟机ESXi安装

目录 一、理论 1.VMware Workstation 2.VMware vSphere Client 3.ESXi 二、实验 1.ESXi 7安装 一、理论 1.VMware Workstation 它是一款专业的虚拟机软件&#xff0c;可以在一台物理机上运行多个操作系统&#xff0c;支持Windows、Linux等操作系统&#xff0c;可以模拟…

使用selenium如何实现自动登录

回顾使用requests如何实现自动登录一文中&#xff0c;提到好多网站在我们登录过后&#xff0c;在之后的某段时间内访问该网页时&#xff0c;不会给出请登录的提示&#xff0c;时间到期后就会提示请登录&#xff01;这样在使用爬虫访问网页时还要登录&#xff0c;打乱我们的节奏…

item_get_sales-获取商品销量详情

一、接口参数说明&#xff1a; item_get_sales-获取商品销量详情&#xff0c;点击更多API调试&#xff0c;请移步注册API账号点击获取测试key和secret 公共参数 请求地址: https://api-gw.onebound.cn/taobao/item_get_sales 名称类型必须描述keyString是调用key&#xff08…

ACM模式刷Leetcode题目

139题单词拆分 链接: link #include<iostream> #include<sstream> #include<string> #include<vector> #include<algorithm> #include<unordered_set> using namespace std;int main() {// 实现输入第一行为s字符串。// 第二行为wordDic…

【代码随想录day22】爬楼梯

题目 假设你正在爬楼梯。需要 n 阶你才能到达楼顶。 每次你可以爬 1 或 2 个台阶。你有多少种不同的方法可以爬到楼顶呢&#xff1f; 示例 1&#xff1a; 输入&#xff1a;n 2 输出&#xff1a;2 解释&#xff1a;有两种方法可以爬到楼顶。 1. 1 阶 1 阶 2. 2 阶 示例 2…

Spring的三种异常处理方式

1.SpringMVC 异常的处理流程 异常分为编译时异常和运行时异常&#xff0c;编译时异常我们 try-cache 进行捕获&#xff0c;捕获后自行处理&#xff0c;而运行时异常是不 可预期的&#xff0c;就需要规范编码来避免&#xff0c;在SpringMVC 中&#xff0c;不管是编译异常还是运行…

java:JDBC

文章目录 什么是JDBCJDBC使用步骤详解各个对象DriverManagerConnectionStatementResultSetPreparedStatement JDBC控制事务操作步骤示例 什么是JDBC 我们知道&#xff0c;数据库有很多种&#xff0c;比如 mysql&#xff0c;Oracle&#xff0c;DB2等等&#xff0c;如果每一种数…

C# WPF 中 外部图标引入iconfont,无法正常显示问题 【小白记录】

wpf iconfont 外部图标引入&#xff0c;无法正常显示问题。 1. 检查资源路径和引入格式是否正确2. 检查资源是否包含在程序集中 1. 检查资源路径和引入格式是否正确 正确的格式&#xff0c;注意字体文件 “xxxx.ttf” 应写为 “#xxxx” <TextBlock Text"&#xe7ae;…

不重启Docker能添加自签SSL证书镜像仓库吗?

应用背景 在企业应用Docker规划初期配置非安全镜像仓库时&#xff0c;有时会遗漏一些仓库没配置&#xff0c;但此时应用程序已经在Docker平台上部署起来了&#xff0c;体量越大就越不会让人去直接重启Docker。 那么&#xff0c;不重启Docker能添加自签SSL证书镜像仓库吗&…

经典人体模型SMPL介绍(一)

SMPL是马普所提出的经典人体模型&#xff0c;目前已成为姿态估计、人体重建等领域必不可少的基础先验。SMPL基于蒙皮和BlendShape实现&#xff0c;从数千个三维人体扫描结果得来&#xff0c;后通过PCA统计学习得来。 论文&#xff1a;SMPL: A Skinned Multi-Person Linear Mode…

Python读取svn版本

本文将详细介绍如何使用Python读取svn版本。 一、安装svn库 首先&#xff0c;我们需要使用Python来连接svn服务器&#xff0c;并获取版本号。这里我们使用pysvn库来完成这个工作。 pip install pysvn需要注意的是&#xff0c;如果你需要安装特定版本的pysvn&#xff0c;你可…

2023连锁收银系统该如何选?值得推荐的5款连锁收银系统

现在不管是连锁店还是零售店&#xff0c;只要是开店做生意赚钱的&#xff0c;都少不了要和钱打交道&#xff0c;尤其是对连锁店来说&#xff0c;收银工作更是重中之重。 连锁店涉及的门店较多&#xff0c;必须要有一套足够优秀的连锁收银系统&#xff0c;才能做好每个门店的收银…

【ARM 嵌入式 编译系列 5 -- GCC 内建函数 __builtin 详细介绍】

文章目录 什么是GCC内建函数?GCC 常见内建函数GCC内建函数使用示例上篇文章:ARM 嵌入式 编译系列 4.2 – GCC 链接规范 extern “C“ 介绍 下篇文章:ARM 嵌入式 编译系列 6 – GCC objcopy, objdump, readelf, nm 介绍 什么是GCC内建函数? GCC提供了一些专门的功能,用于…

使用 `tailwindcss-patch@2` 来提取你的类名吧

使用 tailwindcss-patch2 来提取你的类名吧 使用 tailwindcss-patch2 来提取你的类名吧 安装使用方式 命令行 Cli 开始提取吧 Nodejs API 的方式来使用 配置 初始化 What’s next? tailwindcss-patch 是一个 tailwindcss 生态的扩展项目。也是 tailwindcss-mangle 项目重要…

redis的Key的过期策略是如何实现的?

Key的过期策略 一个redis中可能同时存在很多很多key&#xff0c;这些key可能有很大一部分都有过期时间&#xff0c;此时&#xff0c;redis服务器咋知道哪些key已经过期要被删除&#xff0c;哪些key还没有过期&#xff1f; 如果直接遍历所有的key&#xff0c;显然是行不通的&am…

Abandon_Ubuntu Declaration

鉴于以下几个原因&#xff0c;持续到明年考研结束&#xff0c;我将不再捣鼓ubuntu和任何linux系统&#xff0c; 原因如下&#xff1a; ubuntu23.04不支持wps编辑pdf这个核心功能&#xff0c;且开机向canonial公司发送远程遥测&#xff0c;暂时不会用iptables禁用&#xff0c;故…

第几天(day)

庐阳区2021年信息学竞赛试题 题目描述 Description 给定一个日期&#xff0c;求这一天是当年的第几天。每年的元旦&#xff0c;1月1日&#xff0c;都是每年的第一天&#xff0c;但是每年的最后一天&#xff0c;12月31日&#xff0c;有可能是第365天&#xff0c;也有可能是第3…

2023年上半年网络工程师上午真题及答案解析

1.固态硬盘的存储介质是( )。 A.光盘 B.闪存 C.软盘 D.磁盘 2.虚拟存储技术把( )有机地结合起来使用&#xff0c;从而得到一个更大容量的“内存”。 A.内存与外存 B.Cache与内存 C.寄存器与Cache D.Cache与外存 3.下列接口协议中&…

关于安卓高版本gradle(7.0+)引入aar包报错问题

背景 项目开发过程中&#xff0c;接入三方sdk&#xff0c;引入了本地aar包依赖&#xff0c;as rebuild项目的过程中&#xff0c;报错&#xff0c;提示依赖找不到问题。 报错&#xff1a;“bundleDebugAar FAILED”等 开发环境 win10 jdk11 gradle 7.5 原因 由于gradle的版…