KafkaStream:基本使用

简介:

        kafkaStream:提供了对存储在kafka中的数据进行流式处理和分析的功能

特点:

        KafkasSream提供了一个非常简单轻量的Library,它可以非常方便的嵌入到java程序中,也可以任何方式打包部署

入门案例:

  1、新建工程kafka-demo

           引入kafkaStream依赖

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency><!--kafkaStream--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency></dependencies>

   2、新建流式处理类

          代码如下

package com.heima.kafkademo.sample;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*
* 流式处理
* */
public class KafkaStreamQuickStart {public static void main(String[] args) {/*创建kafka配置中心并配置参数*/Properties prop = new Properties();//连接地址prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//key序列化prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());//value序列化prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());//创建id名称prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");//stream构造器StreamsBuilder streamsBuilder = new StreamsBuilder();//流式计算streamProcessor(streamsBuilder);//创建KafkaStream对象KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);//开启流式计算kafkaStreams.start();}//流式计算方法private static void streamProcessor(StreamsBuilder streamsBuilder) {//创建kafka对象,同时指定从哪个topic获取消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");//处理消息的valuestream.flatMapValues(new ValueMapper<String, Iterable<?>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})      //按照value进行聚合.groupBy((key,value)->value)//时间窗口,每隔10秒更新一次.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//统计单词个数.count()//转换为kStream.toStream().map((key,value)->{System.out.println("key:"+key+",vlaue:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//发送消息.to("itcast-topic-out");}
}

3、启动消费者类和流式处理类监听消息

        使用生产者类发送消息

       消费者和生产者类代码参考Kafka:安装和配置_Success___的博客-CSDN博客

4、测试

        成功接收到消息

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/39370.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

jenkins自动化部署Jenkinsfile文件配置

简介 使用jenkins部署时会读取项目中Jenkinsfile文件&#xff0c;文件配置不对会导致部署失败 文件内容 pipeline {agent anyparameters {string(name: project_name, defaultValue: xxx1, description: 项目jar名称)string(name: version, defaultValue: xxx2, description…

【Apollo】阿波罗自动驾驶:塑造自动驾驶技术的未来

前言 Apollo (阿波罗)是一个开放的、完整的、安全的平台&#xff0c;将帮助汽车行业及自动驾驶领域的合作伙伴结合车辆和硬件系统&#xff0c;快速搭建一套属于自己的自动驾驶系统。 开放能力、共享资源、加速创新、持续共赢是 Apollo 开放平台的口号。百度把自己所拥有的强大、…

Java之SpringCloud Alibaba【四】【微服务 Sentinel服务熔断】

Java之SpringCloud Alibaba【四】【微服务 Sentinel服务熔断】 一、分布式系统遇到的问题1、服务挂掉的一些原因 二、解决方案三、Sentinel&#xff1a;分布式系统的流量防卫兵1、Sentinel是什么2、Sentinel和Hystrix对比3、Sentinel快速开发4、通过注解的方式来控流5、启动Sen…

DoIP学习笔记系列:(五)“安全认证”的.dll从何而来?

文章目录 1. “安全认证”的.dll从何而来?1.1 .dll文件base1.2 增加客户需求算法传送门 DoIP学习笔记系列:导航篇 1. “安全认证”的.dll从何而来? 无论是用CANoe还是VFlash,亦或是编辑cdd文件,都需要加载一个与$27服务相关的.dll(Windows的动态库文件),这个文件是从哪…

Go 流程控制

if语句使用 package mainimport "fmt"func main() {score : 700if score 700 {fmt.Println("清华")}//if支持一个初始化语句 初始化语句和条件判断用;分割if a : 700; a 700 {fmt.Println("清华")}}清华 清华if_else使用 package mainimpor…

机器学习深度学习——seq2seq实现机器翻译(数据集处理)

&#x1f468;‍&#x1f393;作者简介&#xff1a;一位即将上大四&#xff0c;正专攻机器学习的保研er &#x1f30c;上期文章&#xff1a;机器学习&&深度学习——从编码器-解码器架构到seq2seq&#xff08;机器翻译&#xff09; &#x1f4da;订阅专栏&#xff1a;机…

yolo源码注释1——文件结构

代码基于yolov5 v6.0 目录&#xff1a; yolo源码注释1——文件结构yolo源码注释2——数据集配置文件yolo源码注释3——模型配置文件yolo源码注释4——yolo-py datasets # 用于存放数据集的默认文件夹yolov5 data # 模型训练的超参数配置文件以及数据集配置文件 hyps # 存放超参…

C语言学习错题集(五)

1.最大公倍数的求法(gcd已知) 2.报数 3.字符串最后必须有’\0’!!! 4.例题 5.例题 6.例题 1.最大公倍数的求法(gcd已知) int lcmgcd*(a/gcd)*(b/gcd);2.报数 报数游戏是这样的&#xff1a;有n个人围成一圈&#xff0c;按顺序从1到n编好号。从第一个人开始报数&#xff0c;报到…

工程项目管理系统源码+功能清单+项目模块+spring cloud +spring boot em

​ 工程项目管理软件&#xff08;工程项目管理系统&#xff09;对建设工程项目管理组织建设、项目策划决策、规划设计、施工建设到竣工交付、总结评估、运维运营&#xff0c;全过程、全方位的对项目进行综合管理 工程项目各模块及其功能点清单 一、系统管理 1、数据字典&#…

代码保护 code protection

为什么要做代码保护&#xff1f; 为了保护知识产权并让攻击者的利用更加困难&#xff0c;组织应该为其软件的逆向工程设置障碍(例如&#xff0c;反篡改、调试保护、反盗版特性、运行时完整性)&#xff0c;增加攻击者分析和利用你的软件所需的投入。代码保护对于广泛分布的代码…

Markdown使用笔记

Markdown使用笔记 一、段落与强调 important denotes the impossible thing to do Because your ugly appearance, you cannot have a happy ending. 使用*括起来的为斜体 使用**括起来的是粗体 使用~~括起来的是删除线 在句子后面添加<br>即可换行 二、标题 在…

常见期权策略类型有哪些?

这几天在做一个期权策略类型的整理分类&#xff0c;怎么解释期权策略&#xff0c;期权策略是现代金融市场中运用非常广泛、变化非常丰富、结构非常精妙的金融衍生产品&#xff1b;同时也是一种更为复杂也更为灵活的投资工具&#xff0c;下文介绍常见期权策略类型有哪些&#xf…

iptables安全技术和防火墙

通信五元素 源ip和目标ip 源端口和目标端口 协议 通信四元素 源ip和目标ip 源端口和目标端口 iptables表链结构 Netfilter Linux防火墙是由Netfilter组件提供的&#xff0c;Netfilter工作在内核空间&#xff0c;集成在linux内核中 Netfilter是Linux 2.4.x之后新一代的Li…

CI/CD流水线实战

不知道为什么&#xff0c;现在什么技术都想学&#xff0c;因为我觉得我遇到了技术的壁垒&#xff0c;大的项目接触不到&#xff0c;做的项目一个字辣*。所以&#xff0c;整个人心浮气躁&#xff0c;我已经得通过每天的骑行和长跑缓解这种浮躁了。一个周末&#xff0c;我再次宅在…

k8s问题汇总

作者前言 本文章为记录使用k8s遇到的问题和解决方法&#xff0c;文章持续更新中… 目录 作者前言正常配置ingress&#xff0c;但是访问错误添加工作节点报错安装k8s报错使用kubectl命令报错container没有运行安装会出现kubelet异常&#xff0c;无法识别删除k8s集群访问dashboa…

Docker安装RabbitMQ单机版

Docker安装RabbitMQ单机版 先安装Docker服务&#xff0c;可参考安装Docker及学习 编写rabbitmq-composefile.yml文件 这里以rabbitmq 3.11.16 版本为例 cat << \EOF > /opt/rabbitmq-composefile.yml version: 3 services:rabbitmq:image: rabbitmq:3.10.0-managem…

【Apollo】推动创新:探索阿波罗自动驾驶的进步(含安装 Apollo的详细教程)

前言 Apollo (阿波罗)是一个开放的、完整的、安全的平台&#xff0c;将帮助汽车行业及自动驾驶领域的合作伙伴结合车辆和硬件系统&#xff0c;快速搭建一套属于自己的自动驾驶系统。 开放能力、共享资源、加速创新、持续共赢是 Apollo 开放平台的口号。百度把自己所拥有的强大、…

【密码学】维京密码

维京密码 瑞典罗特布鲁纳巨石上的图案看起来毫无意义&#xff0c;但是它确实是一种维京密码。如果我们注意到每组图案中长笔画和短笔画的数量&#xff0c;将得到一组数字2、4、2、3、3、5、2、3、3、6、3、5。组合配对得到24、23、35、23、36、35。现在考虑如图1.4所示的内容&a…

【变形金刚03】使用 Pytorch 开始构建transformer

一、说明 在本教程中&#xff0c;我们将使用 PyTorch 从头开始构建一个基本的转换器模型。Vaswani等人在论文“注意力是你所需要的一切”中引入的Transformer模型是一种深度学习架构&#xff0c;专为序列到序列任务而设计&#xff0c;例如机器翻译和文本摘要。它基于自我注意机…

iOS Epub阅读器改造记录

六个月前在这个YHEpubDemo阅读器的基础上做了一些优化&#xff0c;这里做一下记录。 1.首行缩进修复 由于分页的存在&#xff0c;新的一页的首行可能是新的一行&#xff0c;则应该缩进&#xff1b;也可能是前面一页段落的延续&#xff0c;这时候不应该缩进。YHEpubDemo基于XDS…