solon 集成 kafka-clients

使用 kafka-clients 原本是比较简单的事情。但有些同学习惯了 spring-kafka 后,对原始 java 接口会陌生些。会希望有个集成的示例。

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version>
</dependency>

现在我们使用原始 sdk 的依赖包,做一个 solon 项目的集成分享(其它的框架,也可以参考此例)。

1、添加集成配置

使用 Solon 初始器 生成一个 Solon Web 模板项目,然后添加上面的 kafka-clients 依赖。之后:

  • 添加 yml 配置(具体的配置属性,参考:ProducerConfig,ConsumerConfig)
solon.app:name: "demo-app"group: "demo"solon.logging:logger:root:level: INFO# 配置可以自由定义,与 @Bean 代码对应起来即可(以下为参考)
solon.kafka:properties:  #公共配置(配置项,参考:ProducerConfig,ConsumerConfig 的公用部分)bootstrap:servers: "127.0.0.1:9092"key:serializer: "org.apache.kafka.common.serialization.StringSerializer"deserializer: "org.apache.kafka.common.serialization.StringDeserializer"value:serializer: "org.apache.kafka.common.serialization.StringSerializer"deserializer: "org.apache.kafka.common.serialization.StringDeserializer"producer: #生产者专属配置(配置项,参考:ProducerConfig)acks: "all"consumer: #消费者专属配置(配置项,参考:ConsumerConfig)enable:auto:commit: "false"isolation:level: "read_committed"group:id: "${solon.app.group}:${solon.app.name}"
  • 添加 java 配置器
@Configuration
public class KafkaConfig {@Beanpublic KafkaProducer<String, String> producer(@Inject("${solon.kafka.properties}") Properties common,@Inject("${solon.kafka.producer}") Properties producer) {Properties props = new Properties();props.putAll(common);props.putAll(producer);return new KafkaProducer<>(props);}@Beanpublic KafkaConsumer<String, String> consumer(@Inject("${solon.kafka.properties}") Properties common,@Inject("${solon.kafka.consumer}") Properties consumer) {Properties props = new Properties();props.putAll(common);props.putAll(consumer);return new KafkaConsumer<>(props);}
}

完成上面两步,就算是集成了(后面,是应用的事儿)。配置也可以没有,直接写代码设定属性。

2、应用

  • 发送(或生产),这里代控制器由用户请求再发送消息(仅供参考):
@Controller
public class DemoController {@Injectprivate KafkaProducer<String, String> producer;@Mapping("/send")public void send(String msg) {//发送producer.send(new ProducerRecord<>("topic.test", msg));}
}
  • 拉取(或消费),这里采用定时拦取方式:(仅供参考)

这里需要引入一个 solon 的简单调度插件(或,别的调度插件),用于定时拉取消息:

<dependency><groupId>org.noear</groupId><artifactId>solon-scheduling-simple</artifactId>
</dependency>

编写定时拉取任务:

@Component
public class DemoJob {@Injectprivate KafkaConsumer<String, String> consumer;@Initpublic void init() {//订阅consumer.subscribe(Arrays.asList("topic.test"));}@Scheduled(fixedDelay = 10_000L, initialDelay = 10_000L)public void job() throws Exception {//拉取ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());//确认consumer.commitSync();}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/888702.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android开发,使用ViewPager2实现自动轮播图

文章目录 1. build.gradle添加依赖&#xff1a;2. AndroidManifest.xml中添加网络访问权限3. 编写 布局文件4. 编写Banner适配器5. 自定义Banner视图6. 定义圆点指示器的drawable资源7. 在需要使用轮播图中的Activity中使用8. 运行效果图9. 视频教程 在Android项目程序设计中&a…

Oracle DataGuard启动与关闭顺序

在Oracle Data Guard环境中&#xff0c;启动和关闭数据库的顺序非常重要&#xff0c;以确保数据的一致性和最小化服务中断。以下是针对主库&#xff08;Primary Database&#xff09;和备库&#xff08;Standby Database&#xff09;启动与关闭的标准步骤。 启动顺序 1. 启动…

JavaScript 中基元类型与引用类型的解析

文章目录 基元类型Undefined 类型Null 类型Boolean 类型Number 类型String 类型Symbol 类型 类型详解对象&#xff08;Object&#xff09;数组&#xff08;Array&#xff09;函数&#xff08;Function&#xff09;日期&#xff08;Date&#xff09;、正则表达式&#xff08;Reg…

切比雪夫不等式:方差约束下的概率估计

切比雪夫不等式&#xff1a;方差约束下的概率估计 背景 在概率分析中&#xff0c;切比雪夫不等式是一个常用的工具&#xff0c;它通过引入随机变量的 方差信息&#xff0c;给出了偏离均值的概率界限。这一不等式是对 马尔科夫不等式 的自然扩展&#xff0c;结合了更丰富的分布…

企业网双核心交换机实现冗余和负载均衡(MSTP+VRRP)

MSTP&#xff08;多生成树协议&#xff09; 通过创建多个VLAN实例&#xff0c;将原有的STP、RSTP升级&#xff0c;避免单一VLAN阻塞后导致带宽的浪费&#xff0c;通过将VLAN数据与实例绑定&#xff0c;有效提升网络速率。 VRRP&#xff08;虚拟路由冗余协议&#xff09; 用…

Socket编程-tcp

1. 前言 在tcp套接字编程这里&#xff0c;我们将完成两份代码&#xff0c;一份是基于tcp实现普通的对话&#xff0c;另一份加上业务&#xff0c;client输入要执行的命令&#xff0c;server将执行结果返回给client 2. tcp_echo_server 与udp类似&#xff0c;前两步&#xff1…

深入浅出云计算 ---笔记

这是博主工作闲时的一些日常学习记录&#xff0c;有些之前很熟悉的&#xff0c;但工作中不常用&#xff0c;慢慢就遗忘了&#xff0c;在这里记录&#xff0c;也是为了激励自己坚持复习&#xff0c;如果有能帮到你&#xff0c;那我将感到非常的荣幸~ 快速到达↓↓↓ IaaS篇>&…

14 设计模式值观察者模式(书籍发布通知案例)

一、观察者模式定义 在日常开发中&#xff0c;我们经常会遇到一种场景&#xff1a;某个对象的状态发生变化时&#xff0c;需要通知并更新其他相关对象。这时&#xff0c;观察者模式便成为了解决问题的有效方案。观察者模式是一种常见的设计模式&#xff0c;它允许一个对象的状态…

15分钟训练数字人MimicTalk

只需15分钟&#xff0c;就能训练高质量&#xff0c;个性化数字人大模型。由浙江大学与字节跳动联合推出MimicTalk算法&#xff0c;目前已开源。 在外表和说话风格上和真人相似。将通用3D数字人大模型适应到单个目标人&#xff0c;采用动静结合的高效微调方案&#xff0…

c++高级篇(四) ——Linux下IO多路复用之epoll模型

IO多路复用 —— epoll 前言 在之前我们就已经介绍过了select和poll,在作为io多路复用的最后一个的epoll,我们来总结一下它们之间的区别: a select 实现原理 select 通过一个文件描述符集合&#xff08;fd_set&#xff09;来工作&#xff0c;该集合可以包含需要监控的文件…

【kettle】mysql数据抽取至kafka/消费kafka数据存入mysql

目录 一、mysql数据抽取至kafka1、表输入2、json output3、kafka producer4、启动转换&#xff0c;查看是否可以消费 二、消费kafka数据存入mysql1、Kafka consumer2、Get records from stream3、字段选择4、JSON input5、表输出 一、mysql数据抽取至kafka 1、表输入 点击新建…

docker-compose部署skywalking 8.1.0

一、下载镜像 #注意 skywalking-oap-server和skywalking java agent版本强关联&#xff0c;版本需要保持一致性 docker pull elasticsearch:7.9.0 docker pull apache/skywalking-oap-server:8.1.0-es7 docker pull apache/skywalking-ui:8.1.0二、部署文件docker-compose.yam…

用Python开发一个经典贪吃蛇小游戏

Python 是开发小游戏的绝佳工具,借助第三方库,如 pygame,我们可以快速开发一个经典的贪吃蛇游戏。本篇将介绍如何用 Python 实现一个完整的贪吃蛇小游戏。 一、游戏设计 1.1 游戏规则 玩家通过方向键控制贪吃蛇移动。贪吃蛇吃到食物后会变长,同时得分增加。如果贪吃蛇撞到…

在 MacOS 上为 LM Studio 更换镜像源

在 MacOS 之中使用 LM Studio 部署本地 LLM时&#xff0c;用户可能会遇到无法下载模型的问题。 一般的解决方法是在 huggingface.co 或者国内的镜像站 hf-mirror.com 的项目介绍卡页面下载模型后拖入 LM Studio 的模型文件夹。这样无法利用 LM Studio 本身的搜索功能。 本文将…

vue中.sync修饰符的用法

一、什么是.sync修饰符 在Vue.js中&#xff0c;.sync 修饰符用于创建一个双向绑定的 prop。它使子组件能够更新父组件的 prop 值&#xff0c;实现父子组件之间的双向数据同步。具体来说&#xff0c;.sync 修饰符主要有以下几个功能&#xff1a; 简化双向绑定&#xff1a; 使用…

【附源码】基于环信鸿蒙IM SDK实现一个聊天Demo

项目背景 本项目基于环信IM 鸿蒙SDK 打造的鸿蒙IM Demo&#xff0c;完全适配HarmonyOS NEXT系统&#xff0c;实现了发送消息&#xff0c;添加好友等基础功能。代码开源&#xff0c;功能简洁&#xff0c;如果您有类似开发需求可以参考。 源码地址&#xff1a;https://github.c…

SHELL----正则表达式

一、文本搜索工具——grep grep -参数 条件 文件名 其中参数有以下&#xff1a; -i 忽略大小写 -c 统计匹配的行数 -v 取反&#xff0c;不显示匹配的行 -w 匹配单词 -E 等价于 egrep &#xff0c;即启用扩展正则表达式 -n 显示行号 -rl 将指定目录内的文件打…

Can‘t find variable: token(token is not defined)

文章目录 例子 1&#xff1a;使用 var例子 2&#xff1a;使用 let 或 const例子 3&#xff1a;异步操作你的代码中的情况 Cant find variable: tokentoken is not defined源代码 // index.jsPage({data: {products:[],cardLayout: grid, // 默认卡片布局为网格模式isGrid: tr…

Kafka-创建topic源码

一、命令创建topic kafka-topics --create --topic quickstart-events --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2 二、kafka-topics脚本 exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.TopicCommand "$" 脚本中指定了…

【AI系统】GhostNet 系列

GhostNet 系列 本文主要会介绍 GhostNet 系列网络&#xff0c;在本文中会给大家带来卷积结构的改进方面的轻量化&#xff0c;以及与注意力(self-attention)模块的进行结合&#xff0c;部署更高效&#xff0c;更适合移动计算的 GhostNetV2。让读者更清楚的区别 V2 与 V1 之间的…