Java 集成阿里Kafka

配置文件

kafka-config:bootstrap-servers: server-url #写自己的地址sasl-username: ********sasl-password: ********ssl-truststore: src/main/resources/only.4096.client.truststore.jks## sasl路径,demo中有,请拷贝到自己的某个目录下,不能被打包到jar中login-config: src/main/resources/kafka_client_jaas.confsasl-mechanism: PLAIN #两种加密方式: PLAIN, SCRAM-SHA-256# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5Dauto-commit-interval: 1S# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录auto-offset-reset: latest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafka: topic: mytopic: user_info

读取配置,写consumer创建入参

import java.util.Properties;import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;@Configuration
public class KafkaProperties {@Value("${kafka-config.bootstrap-servers}")private String server;@Value("${kafka-config.sasl-username}")private String saslUsername;@Value("${kafka-config.sasl-password}")private String saslPassword;@Value("${kafka-config.ssl-truststore}")private String sslTruststore;@Value("${env}")public Properties getProperties(String topic) {Properties props = new Properties();// 设置接入点,请通过控制台获取对应Topic的接入点props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");// 属于同一个组的消费实例,会负载消费消息props.put(ConsumerConfig.GROUP_ID_CONFIG, topic);// 每次poll的最大数量// 注意该值不要改得太大,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);// 两次poll之间的最大允许间隔// 可更加实际拉去数据和客户的版本等设置此值,默认30sprops.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");// 设置单次拉取的量,走公网访问时,该参数会有较大影响props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 3200000);props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 3200000);//以下配置为走认证的配置项/****// 设置SSL根证书的路径,请记得将XXX修改为自己的路径// 与sasl路径类似,该文件也不能被打包到jar中props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststore);// 根证书store的密码,保持不变props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");// 接入协议,目前支持使用SASL_SSL协议接入props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");// 设置SASL账号// 两种加密方式: PLAIN, SCRAM-SHA-256String saslMechanism = "PLAIN";String username = saslUsername;String password = saslPassword;if (StringUtils.isNotBlank(saslUsername) && StringUtils.isNotBlank(saslPassword)) {String prefix = "org.apache.kafka.common.security.scram.ScramLoginModule";if ("PLAIN".equalsIgnoreCase(saslMechanism)) {prefix = "org.apache.kafka.common.security.plain.PlainLoginModule";}String jaasConfig = String.format("%s required username=\"%s\" password=\"%s\";", prefix, username,password);props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);}// SASL鉴权方式,保持不变props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);// hostname校验改成空props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");*/return props;}
}

消费者创建消息处理


import java.util.ArrayList;
import java.util.List;
import javax.annotation.Resource;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class KafkaMessageHandlerScheduler {@Resourceprivate KafkaPropertiesInit kafkaPropertiesInit;@Value("${kafka.topic.user_info}")private String userInfoTopic;@Scheduled(initialDelay = 10000, fixedRate = Long.MAX_VALUE)public void userInfoMsg() {new Thread(() -> {// 设置消费组订阅的Topic,可以订阅多个// 如果GROUP_ID_CONFIG是一样,则订阅的Topic也建议设置成一样List<String> subscribedTopics = new ArrayList<String>();// 如果需要订阅多个Topic,则在这里add进去即可// 每个Topic需要先在控制台进行创建subscribedTopics.add(topic);KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaPropertiesInit.getProperties(topic));consumer.subscribe(subscribedTopics);while (true) {try {ConsumerRecords<String, String> records = consumer.poll(1000);// 必须在下次poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIGList<String> msgs = new ArrayList<>();for (ConsumerRecord<String, String> record : records) {if (StringUtils.isNotBlank(record.value())) {msgs.add(record.value());}}if (ObjectUtils.isNotEmpty(msgs)) {/****************业务逻辑todo********************/consumer.commitAsync();}} catch (Exception e) {log.error("消息处理异常 error = {}", e);} finally {try {Thread.sleep(1000);} catch (Exception e) {}}}}).start();}/***** 消息处理* @param tableName* @param topic*/private void messageHandler(String tableName, String topic) {// 设置消费组订阅的Topic,可以订阅多个// 如果GROUP_ID_CONFIG是一样,则订阅的Topic也建议设置成一样List<String> subscribedTopics = new ArrayList<String>();// 如果需要订阅多个Topic,则在这里add进去即可// 每个Topic需要先在控制台进行创建subscribedTopics.add(topic);KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaPropertiesInit.getProperties(topic));consumer.subscribe(subscribedTopics);while (true) {try {ConsumerRecords<String, String> records = consumer.poll(1000);// 必须在下次poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIGList<String> msgs = new ArrayList<>();for (ConsumerRecord<String, String> record : records) {if (StringUtils.isNotBlank(record.value())) {msgs.add(record.value());}}if (ObjectUtils.isNotEmpty(msgs)) {kafkaMessageService.sgHandler(tableName, msgs);consumer.commitAsync();}} catch (Exception e) {log.error("消息处理异常 error = {}", e);} finally {try {Thread.sleep(1000);} catch (Exception e) {}}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/128031.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用opencv的tracking模块跟踪目标

OpenCV跟踪模块算法介绍 OpenCV的tracking模块是一个功能强大的跟踪算法库&#xff0c;包含多种用于跟踪对象的算法。它可以帮助你在连续的视频帧中定位一个物体&#xff0c;例如人脸、眼睛、车辆等。 在OpenCV的tracking模块中&#xff0c;一些主要的跟踪算法包括&#xff1…

软考之软件工程基础理论知识

软件工程基础 软件开发方法 结构化方法 将整个系统的开发过程分为若干阶段&#xff0c;然后依次进行&#xff0c;前一阶段是后一阶段的工作依据按顺序完成。应用最广泛。特点是注重开发过程的整体性和全局性。缺点是开发周期长文档设计说明繁琐&#xff0c;工作效率低开发前要…

Golang Gin 接口返回 Excel 文件

文章目录 1.Web 页面导出数据到文件由后台实现还是前端实现&#xff1f;2.Golang Excel 库选型3.后台实现示例4.xlsx 库的问题5.小结参考文献 1.Web 页面导出数据到文件由后台实现还是前端实现&#xff1f; Web 页面导出表数据到 Excel&#xff08;或其他格式&#xff09;可以…

One-to-N N-to-One: Two Advanced Backdoor Attacks Against Deep Learning Models

One-to-N & N-to-One: Two Advanced Backdoor Attacks Against Deep Learning Models----《一对N和N对一&#xff1a;针对深度学习模型的两种高级后门攻击》 1对N&#xff1a; 通过控制同一后门的不同强度触发多个后门 N对1&#xff1a; 只有当所有N个后门都满足时才会触发…

centos7完全卸载和安装mysql8

问题描述 最近安装了MYSQL8&#xff0c;遇到了各种问题&#xff0c;总体汇总一下&#xff0c;凡是无法启动就是my.cnf和初始化的参数不匹配。 第一种 启动前设置了mysqld --initialize --usermysql --lower-case-table-names1&#xff0c;my.cnf文件却没有修改就去启动。 第…

测试为什么分白盒、黑盒、单元、集成测试?

对于想进入测试行业的小萌新&#xff0c;本文的诉求主要分为三块&#xff1a; 1、想知道分为这么多种测试的原因 2、解决各种概念问题 3、提供各种软件测试工具 安排&#xff01; 一、为什么测试的概念这么多 一个软件项目就好比一部复杂的汽车&#xff0c;有很多零件&#x…

Java作业二

一、使用方法编写求圆面积和周长的程序&#xff0c;运行时提示输入圆半径&#xff0c;然后输出计算结果。运行效果如下图所示&#xff1a; import java.util.Scanner;public class Test {public static void main(String[] args) {Scanner input new Scanner(System.in);Syste…

Vue使用Object.definedproperty的数据监听 使用js实现一种发布订阅的模式

Vue是一款流行的JavaScript框架&#xff0c;它可以帮助开发者构建交互式的Web应用程序。在Vue中&#xff0c;我们可以使用Object.definedproperty来实现数据的监听&#xff0c;也可以使用发布订阅模式来实现组件之间的通信。本文将详细讲解这两个主题&#xff0c;并提供代码注释…

pycharm更改远程服务器地址

一、问题描述 在运行一些项目时&#xff0c;我们常需要在pycharm中连接远程服务器&#xff0c;但万一远程服务器的ip发生了变化&#xff0c;该如何修改呢&#xff1f;我们在file-settings-python interpreter中找到远程服务器&#xff0c;但是发现ip是灰色的&#xff0c;没有办…

最新Ai智能创作系统源码V3.0,AI绘画系统/支持GPT联网提问/支持Prompt应用+搭建部署教程

一、AI创作系统 SparkAi创作系统是基于OpenAI很火的ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统&#xff0c;支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如…

docker--基本操作

第 1 章 Docker基础 1.1 docker简介 在这一部分我们主要讲两个方面&#xff1a; docker是什么、docker特点 1.1.1 docker是什么 docker是什么&#xff1f; docker的中文解释是码头工人。 官方解释&#xff1a; Docker是一个开源的容器引擎&#xff0c;它基于LCX容器技术&…

设计模式之两阶段终止模式

文章目录 1. 简介 2. 常见思路3. 代码实战 1. 简介 两阶段终止模式&#xff08;Two-Phase Termination Pattern&#xff09;是一种软件设计模式&#xff0c;用于管理线程或进程的生命周期。它包括两个阶段&#xff1a;第一阶段是准备阶段&#xff0c;该阶段用于准备线程或进程…

Go语言的键盘输入和打印输出

键盘输入和打印输出 一、打印输出 1.1 fmt包 fmt包实现了类似C语言printf和scanf的格式化I/O。格式化verb&#xff08;‘verb’&#xff09;源自C语言但更简单。 详见官网fmt的API&#xff1a;https://golang.google.cn/pkg/fmt/ 1.2 导入包 import "fmt"1.3 常…

elasticsearch中highlight的“假匹配”

elasticsearch中highlight的“假匹配” 一个highlight的假高亮现象&#xff1a; /company_meta_info/_search?rest_total_hits_as_inttrue {"_source": {"includes": ["name","address"]},"query": {"bool": {&…

[笔记] 字符串输入 #字符输入

字符串的多组输入格式 scanf("%c", &ch)读取单个字符&#xff0c;用EOF作为结束的判断标志。 刷题记录&#xff1a;[题] 查找最大元素 #字符输入 逐个字符手动读取&#xff0c;因为题目的要求&#xff0c;要对每个字符逐个操作&#xff0c;所以就输入的时候顺便…

android一些经验记录

1.应用程序闪退&#xff0c;连画面都没有&#xff0c;先去看看程序的xml文件中的控件是不是有错误。

C++:string类!

Cstring 是C中的字符串。 字符串对象是一种特殊类型的容器&#xff0c;专门设计来操作的字符序列。 不像传统的c-strings,只是在数组中的一个字符序列&#xff0c;我们称之为字符数组&#xff0c;而C字符串对象属于一个类&#xff0c;这个类有很多内置的特点&#xff0c;在操作…

软考 系统架构设计师系列知识点之设计模式(11)

接前一篇文章&#xff1a;软考 系统架构设计师系列知识点之设计模式&#xff08;10&#xff09; 所属章节&#xff1a; 老版&#xff08;第一版&#xff09;教材 第7章. 设计模式 第2节. 设计模式实例 相关试题 10. 设计模式按照目的可划分三类&#xff0c;其中&#xff0c;…

某国产中间件企业:提升研发安全能力,助力数字化建设安全发展

​某国产中间件企业是我国中间件领导者&#xff0c;国内领先的大安全及行业信息化解决方案提供商&#xff0c;为各个行业领域近万家企业客户提供先进的中间件、信息安全及行业数字化产品、解决方案及服务支撑&#xff0c;致力于构建安全科学的数字世界&#xff0c;帮助客户实现…

c语言刷题(9周)(6~10)

输入10个不等的整数创建数组a[10]&#xff0c;在数组a中找是否存在整数t。若存在显示找到了及下标位置&#xff0c;若不存在显示error。 题干输入10个不等的整数创建数组a[10]&#xff0c;在数组a中找是否存在整数t。若存在显示找到了及下标位置&#xff0c;若不存在显示error…