一个简单的kafka 消费者

写一个简单的kafka 消费者

1. 依赖

 		<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

2. 消费者

import com.xxxx.npi.module.common.msg.enums.Topic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SslConfigs;import java.util.Arrays;
import java.util.Properties;public class ConsumerTest {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "xxxx.xxxx.xxxx.xxxx:9092");props.put("group.id", "gedigital");props.put("enable.auto.commit", "false");props.put("auto.offset.reset", "earliest");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 后面安全相关的配置,如果没有,可以不用配置;如果有,必须配props.put("security.protocol", "SASL_SSL");props.put("sasl.mechanism", "SCRAM-SHA-512");props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "e:\\client_truststore.jks"); //Save the certificate (dowload client_truststore.jks) in trust store to local server directoy (only for Java client)props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "passwdconfig");props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='username' password='password';");//注意passwod结尾的分号一定不要漏props.put("ssl.endpoint.identification.algorithm", "");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);String topic = Topic.FBR.getTopic();consumer.subscribe(Arrays.asList(topic));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("partition= %d, offset = %d, key = %s, value = %s\n", record.partition(), record.offset(), record.key(), record.value());}consumer.commitSync();}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/2607.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

watchEffect的使用

watchEffect 是 Vue 3 Composition API 中的一个函数&#xff0c;它用于在响应式数据变化时自动执行一个副作用函数。与 watch 不同&#xff0c;watchEffect 会自动收集其执行过程中访问到的响应式依赖&#xff0c;并在这些依赖发生变化时重新运行副作用函数。这意味着你不需要…

预防oracle的漏洞及其提权

防止Oracle数据库的漏洞及其潜在的权限提升&#xff0c;需要实施一系列综合的安全措施。这些措施不仅涉及技术配置&#xff0c;还包括过程管理和持续的安全评估。以下是有效防御Oracle数据库漏洞和提权攻击的一些关键步骤&#xff1a; 1. 安装和配置 安装最新安全补丁 定期更…

git提交注释规范插件

1、前言 为什么要注重代码提交规范&#xff1f; 在团队协作开发时&#xff0c;每个人提交代码时都会写 commit message。 每个人都有自己的书写风格&#xff0c;翻看我们组的git log, 可以说是五花八门&#xff0c;十分不利于阅读和维护。 一般项目开发都是多分支共存&#x…

关于CPP类中字符串成员初始化

直接看代码吧 #include <iostream> #include <string>/* A string is actually an object of the C++ Standard Library class string. This class is defined in header <string>, and the name string, like cout, belongs to namespace std. To enable …

kvm使用virt-clone克隆虚拟机

首先使用命令查看系统安装的所有虚拟机: virsh list --all然后使用如下命令进行虚拟机的克隆: virt-clone -o generic -n generic-1 -f /var/lib/libvirt/images/generic-1.qcow2-o后面要克隆的虚拟机名称 -n是新的虚拟机的名称 -f是生成的新的虚拟机磁盘文件路径(一般是/var…

Seal^_^【送书活动第2期】——《Flink入门与实战》

Seal^_^【送书活动第2期】——《Flink入门与实战》 一、参与方式二、本期推荐图书2.1 作者简介2.2 编辑推荐2.3 前 言2.4 本书特点2.5 内容简介2.6 本书适用读者2.7 书籍目录 三、正版购买 一、参与方式 评论&#xff1a;"掌握Flink&#xff0c;驭大数据&#xff0c;实战…

Ubuntu下部署gerrit+报错分析(超详细)

Ubuntu下部署gerrit代码平台 之前安装过几次 最后都在Apache代理这里失败了&#xff0c;如下图&#xff0c;总是gerrit.config与Apache2.config配置有问题&#xff0c;后面换了使用ngnix代理&#xff0c;简单多了 安装Mysql、gerrit、jdk、git 这一步也是非必须得&#xff0…

【c++】list类接口函数介绍与深度剖析模拟实现

&#x1f525;个人主页&#xff1a;Quitecoder &#x1f525;专栏&#xff1a;c笔记仓 朋友们大家好&#xff0c;本篇文章来到list有关部分&#xff0c;这一部分函数与前面的类似&#xff0c;我们简单讲解&#xff0c;重难点在模拟实现时的迭代器有关实现 目录 1.List介绍2.接…

Go 之为什么 rune 是 int32 的别名而不是 uint32 的别名

我对这个问题其实也是一直有疑问的&#xff0c;毕竟像 byte 都是 uint8 的别名。然后找了一些问答资料&#xff0c;不知道还没有没其他更好的解释。 范围足够 在 Unicode 字符集中&#xff0c;一个字符的码点范围是从 U0000 到 U10FFFF&#xff0c;共计 1114112 个码点&#…

转向敏捷财务规划,实现更快更自信的决策

随着数字化的到来&#xff0c;原本基于电子表格的时代正逐渐拉下帷幕&#xff0c;大部分企业开始摆脱依赖于电子表格进行计划、预算和预测的传统规划系统&#xff0c;寻求更符合当今市场要求的敏捷财务规划。但不得不承认&#xff0c;当下电子表格仍然是多数企业使用最广泛的工…

代码随想录-字符串 | 右旋字符串

代码随想录-字符串 | 7右旋字符串 卡码网 右旋字符串解题思路代码复杂度难点总结 卡码网 右旋字符串 题目链接 题目描述 字符串的右旋转操作是把字符串尾部的若干个字符转移到字符串的前面。给定一个字符串 s 和一个正整数 k&#xff0c;请编写一个函数&#xff0c;将字符串…

Wireshark数据包分析入门

Wireshark数据包分析 1. 网络协议基础1.1. 应传网数物&#xff08;应表会传网数物&#xff09; 2. 三次握手2.1. 第一次握手2.2. 第二次握手2.3. 第三次握手2.4. 三次握手后流量特征 3. 第一层---物理层&#xff08;以太网&#xff09;4. 第二层---数据链路层&#xff08;PPP L…

ele pls 表格行内样式超出隐藏

使用 模板实现方案&#xff1a; 实现效果&#xff1a; 相关样式&#xff1a;

【网络技术】【Kali Linux】Wireshark嗅探(十)IPv4和IPv6

往期 Kali Linux 上的 Wireshark 嗅探实验见博客&#xff1a; 【网络技术】【Kali Linux】Wireshark嗅探&#xff08;一&#xff09;ping 和 ICMP 【网络技术】【Kali Linux】Wireshark嗅探&#xff08;二&#xff09;TCP 协议 【网络技术】【Kali Linux】Wireshark嗅探&…

Jenkins CI/CD 持续集成专题四 Jenkins服务器IP更换

一、查看brew 的 services brew services list 二、编辑 homebrew.mxcl.jenkins-lts.plist 将下面的httpListenAddress值修改为自己的ip 服务器&#xff0c;这里我是用的本机的ip 三 、重新启动 jenkins-lts brew services restart jenkins-lts 四 浏览器访问 http://10.85…

Python技术:从入门到精通的指南

Python&#xff0c;作为一种高级编程语言&#xff0c;因其简洁的语法和强大的功能而广受开发者喜爱。它不仅适用于初学者快速上手&#xff0c;也能满足专业开发者的复杂需求。 Python语言的特点 简洁易读 Python的语法接近英语&#xff0c;易于编写和阅读&#xff0c;是初学者…

小米金融警示非法集资风险 助力消费者守护“钱袋子”

随着公安机关对电信网络诈骗打击力度的持续加大&#xff0c;不法分子的诈骗手段也在不断翻新&#xff0c;其隐蔽性和欺骗性日益增强。根据《2022年度反诈报告》数据显示&#xff0c;投资理财类诈骗已成为当前黑灰产活动的主要类型之一&#xff0c;给广大金融消费者带来了严重的…

孩子如何才能学好python

Python是一种高级编程语言&#xff0c;因为其简单易学、功能强大和易于阅读的语法&#xff0c;已经成为了最受欢迎的编程语言之一。对于孩子来说&#xff0c;学习Python不仅可以提高他们的计算机技能&#xff0c;还可以为他们未来的职业生涯打下基础。下面是一些学习Python的建…

python 常用库

python 常用库 以下是对每个包的作用的简要解释&#xff1a; absl-py&#xff1a;是 Abseil Python 库&#xff0c;提供了一些实用的工具和功能。accelerate&#xff1a;用于加速数据处理。addict&#xff1a;用于操作嵌套字典。aiofiles&#xff1a;支持异步文件操作。aiohtt…

redis7安装与配置

一、下载 通过 redis官网 或者 redis中文网 下载。 以下是 redis 相关文档资料链接&#xff1a; redis源码地址 redis在线测试 redis命令参考 redis中文文档 历史发布版本的源码地址 二、版本命名规则 Redis从发布到现在&#xff0c;已经有十余年的时光了&#xff0c;…