kafka(五)spring-kafka(1)集成方法

一、集成

1、pom依赖
 <!--kafka--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

版本号对应关系可以查看官网 

2、配置文件

基础配置:以下是必须的配置

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer#spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

其他配置参考文档

二、使用

1、生产者

使用 KafkaTemplate发送消息。如

package com.example.service.impl;import com.alibaba.fastjson.JSON;
import com.example.dto.UserDTO;
import com.example.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service("userService")
@Slf4j
public class UserServiceImpl implements UserService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Value("${user.topic}")private String userTopic;@Overridepublic void sendUserMsg(UserDTO userDTO) {String msg = JSON.toJSONString(userDTO);ProducerRecord producerRecord = new ProducerRecord(userTopic,msg);kafkaTemplate.send(producerRecord);log.info("user消息发送成功");}
}
2、消费者

使用 @KafkaListener 注解。如

package com.example.listen;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Optional;
import java.util.Properties;@Component
@Slf4j
public class SchoolConsumer {@KafkaListener(topics = "${school.topic}", groupId = "${school.group.id}")public void consumer(ConsumerRecord<?, ?> record) {try {Object message = record.value();if (message != null) {String msg = String.valueOf(message);log.info("接收到:msg={},topic:{},partition={},offset={}",msg,record.topic(),record.partition(),record.offset());}} catch (Exception e) {log.error("topic:{},is consumed error:{}", record.topic(), e.getMessage());} finally {//ack.acknowledge();}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/32434.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java泛型学习

没有java泛型会存在的问题 假设我们有一个方法&#xff0c;希望通过传递不同类型的参数&#xff0c;输出不同类型的对象值。正常情况下我们可能会写不同的方法来实现&#xff0c;但是这样会导致类不断增加&#xff0c;并且类方法很相似&#xff0c;不能够复用。进而导致类爆炸…

cuda 与 opencl 的概念对应关系 备忘

OpenCL&#xff08;Open Computing Language&#xff09;和 CUDA&#xff08;Compute Unified Device Architecture&#xff09;都是用于并行编程的框架&#xff0c;允许开发者利用 GPU&#xff08;以及其他处理器&#xff09;进行高性能计算。尽管它们的目标相似&#xff0c;但…

基于顺序存储的环形队列算法库构建

学习贺利坚老师基于数组的环形队列 数据结构之自建算法库——顺序环形队列_下空队列q中依次入队列数据元素abc-CSDN博客文章浏览阅读5.2k次&#xff0c;点赞6次&#xff0c;收藏6次。本文针对数据结构基础系列网络课程(3)&#xff1a;栈和队列中第9课时环形队列的存储及基本操…

华为---理解OSPF Route-ID(五)

9.5 理解OSPF Route-ID 9.5.1 原理概述 一些动态路由协议要求使用Router-ID作为路由器的身份标示&#xff0c;如果在启动这些路由协议时没有指定Router-ID,则默认使用路由器全局下的路由管理Router-ID。 Router-ID选举规则为&#xff0c;如果通过Router-ID命令配置了Router-…

在Ubuntu 18.04上安装Anaconda

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。 简介 Anaconda 是一个为数据科学和机器学习工作流设计的开源软件包管理器、环境管理器和 Python 和 R 编程语言的发行版。 本教程将指…

QT 用GDAL库读写shp文件

地理信息系统离不开shp文件&#xff0c;自己写程序调用gdal库解析shp文件&#xff0c;看看shp文件里的内容。 一、GDALAllRegister()注册所有驱动 二、GDALOpenEx()打开shp文件 三、调用GDALDataset->GetLayerCount()获取图层数量 四、调用GDALDataset->GetLayer(?)…

1.22 LeetCode总结(基本算法)_位运算

进制的概念 进制即进位计数制&#xff0c;是利用固定的数字符号和统一的规则的带进位的计数方法。 任何一种进位计数制都有一个基数&#xff0c;基数为 X 的进位计数制称为 X 进制&#xff0c;表示每一个数位上的数运算时都是逢 X 进一。 504. 七进制数 手法1&#xff1a;当…

android开发中 ComponentActivity()有哪些常用的方法?

ComponentActivity 是 Android 中一个基础的活动类&#xff0c;它继承自 Activity&#xff0c;并且集成了 Jetpack 的诸多组件&#xff0c;使得它在 Compose 和 ViewModel 等新特性中的使用非常便利。以下是一些 ComponentActivity 中常用的方法&#xff1a; 生命周期相关方法…

MongoDB 插入文档

MongoDB 插入文档 MongoDB 是一个流行的 NoSQL 数据库,它使用文档存储数据。在 MongoDB 中,数据以 BSON(Binary JSON)格式存储,这是一种二进制表示的 JSON 格式。MongoDB 提供了灵活的数据模型,使得插入和查询文档变得非常简单。本文将详细介绍如何在 MongoDB 中插入文档…

APP启动流程解析

简单概括启动微信的流程就是&#xff1a; 1.Launcher通知AMS 要启动微信了&#xff0c;并且告诉AMS要启动的是哪个页面也就是首页是哪个页面 2.AMS收到消息告诉Launcher知道了&#xff0c;并且把要启动的页面记下来 3.Launcher进入Paused状态&#xff0c;告诉AMS&#xff0c…

[FreeRTOS 基础知识] 互斥访问与回环队列 概念

文章目录 为什么需要互斥访问&#xff1f;使用队列实现互斥访问休眠和唤醒机制环形缓冲区 为什么需要互斥访问&#xff1f; 在裸机中&#xff0c;假设有两个函数&#xff08;func_A, func_B&#xff09;都要修改a的值&#xff08;a&#xff09;&#xff0c;那么将a定义为全局变…

css-vxe列表中ant进度条与百分比

1.vxe列表 ant进度条 <vxe-column field"actualProgress" title"进度" align"center" width"200"><template #default"{ row }"><a-progress:percent"Math.floor(row.actualProgress)"size"s…

Java程序之可爱的小兔兔

题目&#xff1a; 古典问题&#xff0c;有一对兔子&#xff0c;从出生后第3个月起每个月都生一对兔子&#xff0c;小兔子长到第三个月后每个月又生一对兔子&#xff0c;假如兔子都不死&#xff0c;问每个月的兔子总数为多少? 程序分析&#xff1a; 兔子的规律为数列1,1,2,3,…

C++基础知识——《缺省参数》和《函数重载》

P. S.&#xff1a;以下代码均在VS2019环境下测试&#xff0c;不代表所有编译器均可通过。 P. S.&#xff1a;测试代码均未展示头文件stdio.h的声明&#xff0c;使用时请自行添加。 博主主页&#xff1a;Yan. yan.                        …

jQuery Mobile 按钮图标

jQuery Mobile 按钮图标 引言 在移动应用和网站设计中,按钮图标是用户界面设计的重要组成部分。它们不仅增加了视觉吸引力,而且有助于提高用户体验。jQuery Mobile 是一个流行的前端框架,用于创建响应式的移动界面。它提供了一系列内置的按钮图标,这些图标可以轻松地应用…

kafka 集群原理设计(三)之启动原理介绍

kafka 集群原理设计(三)之启动原理介绍 业务背景问题描述 现在有三个机器节点192.168.0.200、192.168.0.201、192.168.0.202,分别安装部署zookeeper、 kafka集群,每个topic有3个分区,3个副本,则kafka各个节点在刚启动时,是如何选择哪个Kafka节 点为管理节点,哪个副本为…

【Android进阶学习】Android-广播接收器(Broadcast-Receivers)

android:theme“style/AppTheme” > 现在&#xff0c;无论什么时候Android设备被启动&#xff0c;都将被广播接收器MyReceiver所拦截&#xff0c;并且在onReceive()中实现的逻辑将被执行。 有许多系统产生的事件被定义为类Intent中的静态常量值。下面的表格列举了重要的系…

微积分-导数1(导数与变化率)

切线 要求与曲线 C C C相切于 P ( a , f ( a ) ) P(a, f(a)) P(a,f(a))点的切线&#xff0c;我们可以在曲线上找到与之相近的一点 Q ( x , f ( x ) ) Q(x, f(x)) Q(x,f(x))&#xff0c;然后求出割线 P Q PQ PQ的斜率&#xff1a; m P Q f ( x ) − f ( a ) x − a m_{PQ} \…

前端面试题(基础篇七)

一、谈谈你对webpack的看法 webpack是一个模块打包工具&#xff0c;我们可以使用webpack管理我们的模块依赖&#xff0c;编译输出模块所需的静态文件。它可以很好的管理、打包web开发中所需的html、css、JavaScript以及其他各种静态文件&#xff08;使用的图片、字体图标等&am…

给数据库的表添加字段

周五有一个需求是这样的&#xff1a; 原来数据库有一个表B&#xff0c;现在需要添加一个字段C&#xff0c;我把代码中增删改查部分进行了修改&#xff0c; 比如insert中也添入了字段C。 但没有考虑到一个问题&#xff0c;数据库的兼容性。因为之前的版本已经投入使用了&…