Java 如何使用aws的kinesis实现消费端,消费流中数据

1.前言

AWS 官网给了两种方式实现:

java 1.x

java 2.x 

这两种方式,包是不一样的,1.x是com.amazonaws,2是software.amazon.kinesis

使用也是天差地别,而且国内对kinesis这个资料简直少的可怜,这也就增加了开发难度,

2.什么是kinesis

我说一下kinesis是啥吧,其实和咱们队列很像,服务端的数据,需要客户端监听消费,拿到数据解析之后怎么处理就是自己的事情啦,我主要的业务就是实现流中的数据,流中的数据都是url等相关信息,主要是点击链接就消费,所以可以实现点击量的处理等等.

maven包java2.x:Maven Central: software.amazon.kinesis:amazon-kinesis-client

kinesis怎么使用的介绍

地址:在 Java 中开发 Kinesis Client Library 消费端 - Amazon Kinesis Data Streams

3.开始前的准备

代码不难,难的是没有相关的资源资料去实现,所以我这次实现代码主要靠AI,它实现了代码其实也不准,但是确实是给了我灵感,一遍一遍让AI生成代码,一遍一遍试错,调试,最后终于成功!

在写代码之前我们需要一些配置:

1.应用名称,这个自己起个名字就行

2.流名,AWS关于kinesis控制台有,可以去拿

3.区域,AWS的区域

3.aws凭证密钥和key

4.代码

首先,我们需要启动监听,配置aws凭证,区域啊,workerid等,最后启动worker线程使其能够监听,

下面我在main方法中启动监听的演示代码,也可以多线程哦

public static void main(String[] args) throws UnknownHostException, ParseException {// 硬编码的AWS凭证String awsAccessKeyId = "xxx";String awsSecretAccessKey = "xxxxx";AWSCredentials credentials = new BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey);AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials);// 配置 KCL workerworkerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration("test","CloudFront-apk-download-log",credentialsProvider,workerId).withInitialPositionInStream(InitialPositionInStream.LATEST).withRegionName("ap-south-1");// 创建并启动 workerWorker worker = new Worker.Builder().recordProcessorFactory(new MyRecordProcessorFactory()).config(kinesisClientLibConfiguration).build();worker.run(); // 这将启动 worker 并开始从 Kinesis 流中读取数据
}

创建一个接口MyRecordProcessorFactory,实现IRecordProcessorFactory,返回实例化监听端处理的类.这样那边产生数据,这边开始进入监听类处理.

public class MyRecordProcessorFactory implements IRecordProcessorFactory {@Overridepublic IRecordProcessor createProcessor() {return new MyRecordProcessor();}
}

创建MyRecordProcessor类 , 实现IRecordProcessor, 然后就会实现三个接口,初始化,监听数据,关闭资源这三个接口,

初始化initialize(): 在启动程序时会进入到初始化方法,我们可以拿到分片id以及从哪个序列号取出数据.

监听数据方法processRecords(): 此方法就会服务端生成的信息,这边就能同步监听到,并把信息给到你,你可以从给的参数中取出数据,这个你服务监听什么就会给你返什么. 你就可以解析, 解析完放到实体或者什么自己自定义处理吧.

public class MyRecordProcessor implements IRecordProcessor {private static final Logger LOG = LoggerFactory.getLogger(KCLExample.class);@Overridepublic void initialize(InitializationInput initializationInput) {// 初始化LOG.info("初始化shardId:{}", initializationInput.getShardId());LOG.info("初始化序列号:{}", initializationInput.getExtendedSequenceNumber());LOG.info("初始化检查点序列号:{}", initializationInput.getPendingCheckpointSequenceNumber());}@Overridepublic synchronized void processRecords(ProcessRecordsInput processRecordsInput) {List<Record> records = processRecordsInput.getRecords();System.out.println("批次:" + records.size());for (Record record : records) {ByteBuffer byteBuffer = record.getData();// 接收数据转换成strString str = StandardCharsets.UTF_8.decode(byteBuffer).toString();byteBuffer.flip();LOG.info("数据:{}", str);}// 检查点,目的是为了知道此次读取到了哪里IRecordProcessorCheckpointer checkpointer = processRecordsInput.getCheckpointer();try {checkpointer.checkpoint();} catch (InvalidStateException e) {throw new RuntimeException(e);} catch (ShutdownException e) {throw new RuntimeException(e);}} @Overridepublic void shutdown(ShutdownInput shutdownInput) {ShutdownReason reson = shutdownInput.getShutdownReason();// 关闭资源等清理工作LOG.info("关闭资源:{}", reson.toString());}
}

pom.xml 

   <dependency><groupId>com.amazonaws</groupId><artifactId>amazon-kinesis-client</artifactId><version>1.11.0</version></dependency>

 启动就可以监听数据啦!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/13767.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

spring jpa 公共字段设计

数据库公共字段 1、多租户 tenantId 2、创建者名字 createName 3、创建者时间 createTime 4、更新者名字 updateName 5、更新者时间 updateTime 基础model类 /*** 基础Model*/ Data MappedSuperclass public class BaseModel implements Serializable{IdGeneratedValue(stra…

Linux DAY 6 _systemctl

systemctl命令&#xff0c;通过这个命令控制系统操作 语法&#xff1a;systemctl start | stop | status | enable | disable 服务名 start 启动 stop 关闭 status 查看状态 enable 开启开机自启 disable 关闭开机自启 服务名&#xff1a; NetworkManager 主网络服务 net…

源网络地址转换SNAT

左上角的是访问互联网发送的数据包&#xff0c;第一个是访问&#xff0c;第二个是网页传回来的 3、4项是源端口号和目的端口号&#xff08;3是随机的&#xff08;1024-65535&#xff09;&#xff0c;那个是http的网页服务端口就是80&#xff09; 那么往回传数据的时候源和目的…

解决uniapp中的web-view页面进入时全屏问题

当我们在使用uniapp开发应用的时候&#xff0c;我们使用web-view进行外部页面的引入 在h5 浏览器下显示正常&#xff0c;我通过样式控制&#xff0c;上面是web-view&#xff0c;下面是菜单栏。 但是在 app调试或真机上&#xff0c;无论如何&#xff0c;web-view都占满全屏&…

0.零基础入门微服务实战课

0.零基础入门微服务实战课 1.微服务和 Spring Cloud1.1 什么是微服务&#xff1f;1.2 什么是 Spring Cloud&#xff1f;1.3 微服务 VS Spring Cloud 2.为什么要学微服务&#xff1f;3.Spring Cloud 组件介绍 1.微服务和 Spring Cloud 1.1 什么是微服务&#xff1f; 微服务是将…

【Leetcode 每日一题】28. 找出字符串中第一个匹配项的下标

给你两个字符串 haystack 和 needle &#xff0c;请你在 haystack 字符串中找出 needle 字符串的第一个匹配项的下标&#xff08;下标从 0 开始&#xff09;。如果 needle 不是 haystack 的一部分&#xff0c;则返回 -1 。 示例 1&#xff1a; 输入&#xff1a;haystack &qu…

DFE_offset失调校准

1.校准原因 *制造工艺的限制&#xff1a;晶体管在制造过程中&#xff0c;由于工艺的限制&#xff0c;不可能做到完全对称&#xff0c;这导致了输入级晶体管的性能存在微小的差异。 *输入级偏置电流的不对称&#xff1a;输入级晶体管的偏置电流也会存在差异&#xff0c;这也会…

H4vdo 台湾APT-27视频投放工具

地址:https://github.com/MartinxMax/H4vdo 视频 关于 H4vdo RTMP lock 屏播放视频工具&#xff0c;可以向目标发送有效载荷&#xff0c;播放目标的屏幕内容。目标无法曹作计算机 使用方法 安装依赖 根据你的操作系统选择一个安装程序 RTMP 服务端 ./rtsp-simple-server.…

npm run dev启动element-ui,提示node_modules中webpack的版本跟package.json中webpack的版本不一致

问题一&#xff1a;修改node_modules/webpack/package.json版本为4.14.0&#xff0c;npm run dev时版本号又自动更改为 4.47.0 问题二&#xff1a;使用yarn 安装依赖&#xff0c;webpack的版本默认是4.47.0&#xff0c;为什么 求大佬们帮我解答一下以上两个问题 左侧是node_m…

【退役重学Java】关于 Sentinel 与服务熔断熔断

一、Sentinel 分布式系统的流量防卫兵 随着微服务的流行&#xff0c;服务和服务之间的稳定性变得越来越重要。Sentinel 以流量为切入点&#xff0c;从流量控制、熔断降级、系统负载保护等多个维度 二、服务熔断 是什么&#xff1a; 服务熔断是一种微服务架构中的容错机制&#…

数据中心大型AI模型网络需求

数据中心大型AI模型网络需求 随着Transformer的崛起和2023年ChatGPT的大规模应用&#xff0c;业界逐渐形成共识&#xff1a;遵循一定的规模效应原则&#xff0c;增加模型参数量能够显著提升模型性能。特别是在参数数量级跃升至数百亿乃至更高时&#xff0c;大型AI模型在语言理…

【C++】 类型转换的详细讲解

前言 本章我们将学习C里面的几种类型转换。如&#xff1a; static_cast、reinterpret_cast、const_cast、dynamic_cast。 这些都是操作符关键字。 目录 1. C的类型转换1.1 C语言的类型转换&#xff1a;1.2 为什么C需要四种类型转换&#xff1a;1.3 C强制类型转换&#xff1a;1.…

知识分享|非凸问题求解方法及代码示例【分类迭代】【大M法】

主要内容 之前发布了非线性问题线性化的几种方法&#xff0c;如知识分享|分段函数线性化及matlab测试&#xff0c;学习园地 | yalmip实用操作-线性化&#xff0c;非线性优化 | 非线性问题matlabyalmip求解案例&#xff0c;但是在实际建模及编程过程中&#xff0c;会遇到各种…

记录一个更新adobe软件导致加载PDF文件异常的问题

最近由于项目需要,没有办法把原有的adobe正版软件进行了卸载,换了个盗版软件,结果导致我的pdf文件加载的时候出现异常。 报错的语句是这个 string str = System.Environment.CurrentDirectory; // string fileName = MyOpenFileDialog(); axAcroPDF1.LoadFile(…

一顿五元钱的午餐

在郑州喧嚣的城市一隅&#xff0c;藏着一段鲜为人知的真实的故事。 故事的主角是一位年过半百的父亲&#xff0c;一位平凡而又伟大的劳动者。岁月在他脸上刻下了深深的痕迹&#xff0c;但他眼神中闪烁着不屈与坚韧。 他今年52岁&#xff0c;为了给远在家乡的孩子们一个更好的…

人工智能应用-实验4-蚁群算法求解 TSP

文章目录 &#x1f9e1;&#x1f9e1;实验内容&#x1f9e1;&#x1f9e1;&#x1f9e1;&#x1f9e1;代码&#x1f9e1;&#x1f9e1;&#x1f9e1;&#x1f9e1;分析结果&#x1f9e1;&#x1f9e1;&#x1f9e1;&#x1f9e1;实验总结&#x1f9e1;&#x1f9e1; &#x1f9…

【vue】封装的天气展示卡片,在线获取天气信息

源码 <template><div class"sen_weather_wrapper"><div class"sen_top_box"><div class"sen_left_box"><div class"sen_top"><div class"sen_city">山东</div><qctc-time cl…

OCM认证考试难吗?

OCM&#xff08;Oracle Certified Master&#xff09;认证考试是Oracle公司提供的最高级别的专业认证&#xff0c;它确实被认为是非常具有挑战性的考试。以下是关于OCM认证考试难度的一些关键点&#xff1a; 深入的技术要求&#xff1a;OCM认证不仅要求考生具备Oracle数据库的…

VR直播:改变我们的直播方式,让现场触手可及

VR直播是近期比较火爆的一种直播方式&#xff0c;相信在抖音上我们都刷到过转动手机、变换视角的VR直播&#xff0c;因为形式比较新颖&#xff0c;用户的参与度比较高&#xff0c;一场直播下来用户的打赏也是较为可观的。 不仅仅在直播行业&#xff0c;在众多应用领域中&#…

软件下载系统asp.net

本项目实现电子书下载网站的功能&#xff0c;实现文章、管理员分类&#xff0c;友情连接的管理以及对前台页面的静态化。网站前台实现对电子书的详细信息介绍和提供下载。 说明文档 运行前附加数据库.mdf&#xff08;或sql生成数据库&#xff09; 主要技术&#xff1a; 基于a…