Kafka系列(四)

本文接kafka三,代码实践kafkaStream的应用,用来完成流式计算。

kafkastream

        关于流式计算也就是实时处理,无时间概念边界的处理一些数据。想要更有性价比地和java程序进行结合,因此了解了kafka。但是本人阅读了kafka地官网,觉得可阅读性并不是很高,当然是个人认为,就是界面做的就不是很舒服。

简介

简介一下kafkaStream

Kafka Stream的特点
  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署

  • 除了Kafka外,无任何外部依赖

  • 充分利用Kafka分区机制实现水平扩展和顺序性保证(想要保证消息有序性就要设置一个分区)

  • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)

  • 支持正好一次处理语义

  • 提供记录级的处理能力,从而实现毫秒级的低延迟

  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)

  • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

关键概念
  • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。

  • Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题

Kstream

(1)数据结构类似于map,key-value键值对

KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。 数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。

KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。

为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:

(“ alice”,1)->(“” alice“,3)

如果流处理应用是要总结每个用户的价值,它将返回alice,4。因为第二条数据记录不会覆盖第一条,而是做了一个insert,累加。

代码实现

依赖
       <!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency>
kafkaStream配置类

需要在nacos的配置里面配置hosts属性和group,本地等怎么配置都可以,只要能读取到就行。


/*** 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数*/@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;private String hosts;private String group;@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {Map<String, Object> props = new HashMap<>();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");props.put(StreamsConfig.RETRIES_CONFIG, 10);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return new KafkaStreamsConfiguration(props);}
}

这里生产者和消费者我就不再举例子了,直接举中间这个stream怎么写。

stream需要知道是谁发的,所以生产者和stream需要绑定一个相同的主题,而stream需要知道要给谁发送过去,消费者知道是谁发的,所以stream和消费者又有一个相同的主题。

streamhandler代码

具体的每一行代码的含义结合个人理解都在注释里面。

package com.neu.article.stream;import com.alibaba.fastjson.JSON;import com.neu.base.constants.HotArticleConstants;
import com.neu.base.model.mess.ArticleVisitStreamMess;
import com.neu.base.model.mess.UpdateArticleMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.time.Duration;@Configuration
@Slf4j
public class HotArticleStreamHandler {@Beanpublic KStream<String,String> kStream(StreamsBuilder streamsBuilder){//接收消息KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);//聚合流式处理stream.map((key,value)->{UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);//重置消息的key:1234343434(文章id)   和  value: likes:1 当前文章点赞一次//mess.getType().name():用于区分是点赞还是阅读 mess.getAdd():用于区分是加1还是减1return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());})//按照文章id进行聚合.groupBy((key,value)->key)//时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))/*** 自行的完成聚合的计算*/.aggregate(new Initializer<String>() {/*** 初始方法,返回值是消息的value->aggValue,聚合之后的value* @return*/@Overridepublic String apply() {return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";}/*** 真正的聚合操作,返回值是消息的value*/}, new Aggregator<String, String, String>() {/**** @param key 消息的key :mess.getArticleId().toString()* @param value  消息的value likes:1* @param aggValue   初始化消息聚合后的一个值 COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0* @return*/@Overridepublic String apply(String key, String value, String aggValue) {System.out.println(value);if(StringUtils.isBlank(value)){return aggValue;}String[] aggAry = aggValue.split(",");int col = 0,com=0,lik=0,vie=0;for (String agg : aggAry) {//agg遍历第一次的时候最开始为 COLLECTION:0String[] split = agg.split(":");//split[0] = COLLECTION  split[1] = 0/*** 获得初始值,也是时间窗口内计算之后的值*/switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){case COLLECTION:col = Integer.parseInt(split[1]);break;case COMMENT:com = Integer.parseInt(split[1]);break;case LIKES:lik = Integer.parseInt(split[1]);break;case VIEWS:vie = Integer.parseInt(split[1]);break;}}/*** 累加操作   likes:1*/String[] valAry = value.split(":");//valAry[0] = likes  valAry[1] = 1switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){case COLLECTION:col += Integer.parseInt(valAry[1]);break;case COMMENT:com += Integer.parseInt(valAry[1]);break;case LIKES:lik += Integer.parseInt(valAry[1]);break;case VIEWS:vie += Integer.parseInt(valAry[1]);break;}//返回值是有要求的,必须与初始化apply方法的返回值形式一致String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);System.out.println("文章的id:"+key);System.out.println("当前时间窗口内的消息处理结果:"+formatStr);return formatStr;}//Materialized.as("hot-atricle-stream-count-001"):用于指定六十处理的状态,字符串可以随便给,多个流处理的话不重复就行}, Materialized.as("hot-atricle-stream-count-001")).toStream().map((key,value)->{//key.key().toString():文章id,value:COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));})//发送消息.to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);return stream;}/*** 格式化消息的value数据* @param articleId* @param value* @return*/public String formatObj(String articleId,String value){ArticleVisitStreamMess mess = new ArticleVisitStreamMess();mess.setArticleId(Long.valueOf(articleId));//COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0String[] valAry = value.split(",");for (String val : valAry) {String[] split = val.split(":");switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){case COLLECTION:mess.setCollect(Integer.parseInt(split[1]));break;case COMMENT:mess.setComment(Integer.parseInt(split[1]));break;case LIKES:mess.setLike(Integer.parseInt(split[1]));break;case VIEWS:mess.setView(Integer.parseInt(split[1]));break;}}log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess));return JSON.toJSONString(mess);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/630260.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

论文笔记:信息融合的门控多模态单元(GMU)

整理了GMU&#xff08;ICLR2017 GATED MULTIMODAL UNITS FOR INFORMATION FUSION&#xff09;论文的阅读笔记 背景模型实验 论文地址&#xff1a; GMU 背景 多模态指的是同一个现实世界的概念可以用不同的视图或数据类型来描述。比如维基百科有时会用音频的混合来描述一个名人…

vue+echarts 几个案例

普通柱状图 <template><!-- 容器默认宽高是0 如果不设置 页面不显示--><div ref"mychart" id"mychart"></div> </template><script> import * as echarts from "echarts" import axios from axiosexport …

(Matlab)基于CNN-Bi_LSTM的多维回归预测(卷积神经网络-双向长短期记忆网络)

目录 一、程序及算法内容介绍&#xff1a; 基本内容&#xff1a; 亮点与优势&#xff1a; 二、实际运行效果&#xff1a; 三、部分代码展示&#xff1a; 四、完整程序数据分享&#xff1a; 一、程序及算法内容介绍&#xff1a; 基本内容&#xff1a; 本代码基于Matlab平…

数字电源简介

数字电源简介 定义主要应用场景数字电源的基本组成常见算法常见电源拓扑PFCLLC 数字电源与模拟电源对比参考链接 定义 常见定义有以下四种&#xff1a; 通过数字接口控制的开关电源&#xff0c;强调的是数字电源的“通信”功能。可通过I2C或类似的数字总线来对数字信号进行控…

springboot整合websocket后启动报错:javax.websocket.server.ServerContainer not available

一、场景 Springboot使用ServerEndpoint来建立websocket链接。引入依赖。 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>配置Websocket Confi…

九:day01_ 消息队列01

第一章 RabbitMQ 概念 1.1.1 消息队列 MQ全称Message Queue&#xff08;消息队列&#xff09;&#xff0c;是在消息的传输过程中保存消息的容器。多用于系统之间的异步通信。 1.1.2 消息 两台计算机间传送的数据单位。消息可以非常简单&#xff0c;例如只包含文本字符串&#x…

【目标检测】YOLOv5算法实现(九):模型预测

本系列文章记录本人硕士阶段YOLO系列目标检测算法自学及其代码实现的过程。其中算法具体实现借鉴于ultralytics YOLO源码Github&#xff0c;删减了源码中部分内容&#xff0c;满足个人科研需求。   本系列文章主要以YOLOv5为例完成算法的实现&#xff0c;后续修改、增加相关模…

深入解析JavaScript属性的getter和setter

&#x1f9d1;‍&#x1f393; 个人主页&#xff1a;《爱蹦跶的大A阿》 &#x1f525;当前正在更新专栏&#xff1a;《VUE》 、《JavaScript保姆级教程》、《krpano》、《krpano中文文档》 ​ ​ ✨ 前言 在JavaScript中,对象属性除了直接设置值之外,还可以通过getter和sett…

鸿蒙OS4.0兼容性测试

背景 OpenHarmony兼容性测评主要是验证合作伙伴的设备和业务应用满足OpenHarmony开源兼容性定义的技术要求&#xff0c;确保运行在OpenHarmony上的设备和业务应用能稳定、正常运行&#xff0c;同时使用OpenHarmony的设备和业务应用有一致性的接口和业务体验。 OpenHarmony兼容…

MongoDB调优

三大导致 MongoDB 性能不佳的原因 1&#xff09;慢查询2&#xff09;阻塞等待3&#xff09;硬件资源不足1,2通常是因为模型/索引设计不佳导致的。排查思路&#xff1a;按1-2-3依次排查。 影响 MongoDB 性能的因素 MongoDB 性能监控工具 Free Monitoring 从版本 4.0 开始&am…

redis经典面试题

说说你对Redis的理解 Redis是一个基于Key-Value存储结构的开源内存数据库&#xff0c;也是一种NoSQL数据库。 它支持多种数据类型&#xff0c;包括String、Map、Set、ZSet和List&#xff0c;以满足不同应用场景的需求。 Redis以内存存储和优化的数据结构为基础&#xff0c;提…

Python爬虫实战:IP代理池助你突破限制,高效采集数据

当今互联网环境中&#xff0c;为了应对反爬虫、匿名访问或绕过某些地域限制等需求&#xff0c;IP代理池成为了一种常用的解决方案。IP代理池是一个包含多个可用代理IP地址的集合&#xff0c;可以通过该代理池随机选择可用IP地址来进行网络请求。 IP代理池是一组可用的代理IP地址…

网页设计(十一)JavaScript事件分析

一、设计校园办公系统认证页面 校园办公系统认证页面 校园办公系统认证页面初始布局 卡号有效性检查页面 口令有效性检查页面 二次口令有效性检查页面 QQ/微信有效性检查图 <!-- prj_11_1.html --> <!doctype html> <html lang"en"><…

Java设计模式-迭代器模式

迭代器模式 一、概述二、结构三、案例实现四、优缺点五、使用场景六、JDK源码解析 一、概述 定义&#xff1a; 提供一个对象来顺序访问聚合对象中的一系列数据&#xff0c;而不暴露聚合对象的内部表示。 二、结构 迭代器模式主要包含以下角色&#xff1a; 抽象聚合&#xf…

postman做接口测试

之前搞自动化接口测试&#xff0c;由于接口的特性&#xff0c;要验证接口返回xml中的数据&#xff0c;所以没找到合适的轮子&#xff0c;就自己用requests造了个轮子&#xff0c;用着也还行&#xff0c;不过就是case管理有些麻烦&#xff0c;近几天又回头看了看postman也可以玩…

论文笔记(四十)Goal-Auxiliary Actor-Critic for 6D Robotic Grasping with Point Clouds

Goal-Auxiliary Actor-Critic for 6D Robotic Grasping with Point Clouds 文章概括摘要1. 介绍2. 相关工作3. 学习 6D 抓握政策3.1 背景3.2 从点云抓取 6D 策略3.3 联合运动和抓握规划器的演示3.4 行为克隆和 DAGGER3.5 目标--辅助 DDPG3.6 对未知物体进行微调的后视目标 4. 实…

CleanMyMac X .4.14.7如何清理 Mac 系统?

细心的用户发现苹果Mac电脑越用越慢&#xff0c;其实这种情况是正常的&#xff0c;mac电脑用久了会产生很多的缓存文件&#xff0c;如果不及时清理会影响运行速度。Mac系统在使用过程中都会产生大量系统垃圾&#xff0c;如不需要的系统语言安装包&#xff0c;视频网站缓存文件&…

SSL之mkcert构建本地自签名

文章目录 1. 什么是SSL2. mkcert&#xff1a;快速生成自签名证书2.1 mkcert的工作流程如下&#xff1a;2.2 window 本地实现自签证书2.2.1 下载安装2.2.2 下载,生成本地 SSL2.2.3 生成 pem 自签证书,可供局域网内使用其他主机访问。2.2.4 使用-psck12 生成*.p12 文件 2.3 Sprin…

【Python】模块

&#x1f6a9; WRITE IN FRONT &#x1f6a9; &#x1f50e; 介绍&#xff1a;"謓泽"正在路上朝着"攻城狮"方向"前进四" &#x1f50e;&#x1f3c5; 荣誉&#xff1a;2021|2022年度博客之星物联网与嵌入式开发TOP5|TOP4、2021|2222年获评…

Java中的Socket你了解吗

☆* o(≧▽≦)o *☆嗨~我是小奥&#x1f379; &#x1f4c4;&#x1f4c4;&#x1f4c4;个人博客&#xff1a;小奥的博客 &#x1f4c4;&#x1f4c4;&#x1f4c4;CSDN&#xff1a;个人CSDN &#x1f4d9;&#x1f4d9;&#x1f4d9;Github&#xff1a;传送门 &#x1f4c5;&a…