spring boot 使用 Kafka

一、Kafka作为消息队列的好处

  1. 高吞吐量:Kafka能够处理大规模的数据流,并支持高吞吐量的消息传输。

  2. 持久性:Kafka将消息持久化到磁盘上,保证了消息不会因为系统故障而丢失。

  3. 分布式:Kafka是一个分布式系统,可以在多个节点上运行,具有良好的可扩展性和容错性。

  4. 支持多种协议:Kafka支持多种协议,如TCP、HTTP、UDP等,可以与不同的系统进行集成。

  5. 灵活的消费模式:Kafka支持多种消费模式,如拉取和推送,可以根据需要选择合适的消费模式。

  6. 可配置性强:Kafka的配置参数非常丰富,可以根据需要进行灵活配置。

  7. 社区支持:Kafka作为Apache旗下的开源项目,拥有庞大的用户基础和活跃的社区支持,方便用户得到及时的技术支持。

二、springboot中使用Kafka

  1. 添加依赖:在pom.xml文件中添加Kafka的依赖,包括spring-kafka和kafka-clients。确保版本与你的项目兼容。

  2. 创建生产者:创建一个Kafka生产者类,实现Producer接口,并使用KafkaTemplate发送消息。

  3. 配置生产者:在Spring Boot的配置文件中配置Kafka生产者的相关参数,例如bootstrap服务器地址、Kafka主题等。

  4. 发送消息:在需要发送消息的地方,注入Kafka生产者,并使用其发送消息到指定的Kafka主题。

  5. 创建消费者:创建一个Kafka消费者类,实现Consumer接口,并使用KafkaTemplate订阅指定的Kafka主题。

  6. 配置消费者:在Spring Boot的配置文件中配置Kafka消费者的相关参数,例如group id、auto offset reset等。

  7. 接收消息:在需要接收消息的地方,注入Kafka消费者,并使用其接收消息。

  8. 处理消息:对接收到的消息进行处理,例如保存到数据库或进行其他业务逻辑处理。

三、使用Kafka

pom中填了依赖

<dependency>  <groupId>org.springframework.kafka</groupId>  <artifactId>spring-kafka</artifactId>  <version>2.8.1</version>  
</dependency>  
<dependency>  <groupId>org.apache.kafka</groupId>  <artifactId>kafka-clients</artifactId>  <version>2.8.1</version>  
</dependency>
  1. 创建生产者:创建一个Kafka生产者类,实现Producer接口,并使用KafkaTemplate发送消息。

import org.apache.kafka.clients.producer.*;  
import org.springframework.beans.factory.annotation.Value;  
import org.springframework.kafka.core.KafkaTemplate;  
import org.springframework.stereotype.Component;  @Component  
public class KafkaProducer {  @Value("${kafka.bootstrap}")  private String bootstrapServers;  @Value("${kafka.topic}")  private String topic;  private KafkaTemplate<String, String> kafkaTemplate;  public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {  this.kafkaTemplate = kafkaTemplate;  }  public void sendMessage(String message) {  Producer<String, String> producer = new KafkaProducer<>(bootstrapServers, new StringSerializer(), new StringSerializer());  try {  producer.send(new ProducerRecord<>(topic, message));  } catch (Exception e) {  e.printStackTrace();  } finally {  producer.close();  }  }  
}
  1. 配置生产者:在Spring Boot的配置文件中配置Kafka生产者的相关参数,例如bootstrap服务器地址、Kafka主题等。

import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
import org.springframework.kafka.core.DefaultKafkaProducerFactory;  
import org.springframework.kafka.core.KafkaTemplate;  
import org.springframework.kafka.core.ProducerFactory;  
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;  
import org.springframework.kafka.core.ConsumerFactory;  
import org.springframework.kafka.core.ConsumerConfig;  
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;  
import org.springframework.kafka.listener.MessageListener;  
import org.springframework.context.annotation.PropertySource;  
import java.util.*;  
import org.springframework.beans.factory.*;  
import org.springframework.*;  
import org.springframework.*;expression.*;value; 																																		 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	  @Value("${kafka}")   Properties kafkaProps = new Properties(); @Bean public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf){ KafkaTemplate<String, String> template = new KafkaTemplate<>(pf); template .setMessageConverter(new StringJsonMessageConverter()); template .setSendTimeout(Duration .ofSeconds(30)); return template ; } @Bean public ProducerFactory<String, String> producerFactory(){ DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(kafkaProps); factory .setBootstrapServers(bootstrapServers); factory .setKeySerializer(new StringSerializer()); factory .setValueSerializer(new StringSerializer()); return factory ; } @Bean public ConsumerFactory<String, String> consumerFactory(){ DefaultKafkaConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(consumerConfigProps); factory .setBootstrapServers(bootstrapServers); factory .setKeyDeserializer(new StringDeserializer()); factory .setValueDeserializer(new StringDeserializer()); return factory ; } @Bean public ConcurrentMessageListenerContainer<String, String> container(ConsumerFactory<String, String> consumerFactory, MessageListener listener){ ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory); container .setMessageListener(listener); container .setConcurrency(3); return container ; } @Bean public MessageListener

消费者

import org.apache.kafka.clients.consumer.*;  
import org.springframework.kafka.core.KafkaTemplate;  
import org.springframework.stereotype.Component;  @Component  
public class KafkaConsumer {  @Value("${kafka.bootstrap}")  private String bootstrapServers;  @Value("${kafka.group}")  private String groupId;  @Value("${kafka.topic}")  private String topic;  private KafkaTemplate<String, String> kafkaTemplate;  public KafkaConsumer(KafkaTemplate<String, String> kafkaTemplate) {  this.kafkaTemplate = kafkaTemplate;  }  public void consume() {  Consumer<String, String> consumer = new KafkaConsumer<>(consumerConfigs());  consumer.subscribe(Collections.singletonList(topic));  while (true) {  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));  for (ConsumerRecord<String, String> record : records) {  System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());  }  }  }  private Properties consumerConfigs() {  Properties props = new Properties();  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);  props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");  return props;  }  
}

四、kafka与rocketMQ比较

Kafka和RocketMQ都是开源的消息队列系统,它们具有许多相似之处,但在一些关键方面也存在差异。以下是它们在数据可靠性、性能、消息传递方式等方面的比较:

  1. 数据可靠性:
  • Kafka使用异步刷盘方式,而RocketMQ支持异步实时刷盘、同步刷盘、同步复制和异步复制。这使得RocketMQ在单机可靠性上比Kafka更高,因为它不会因为操作系统崩溃而导致数据丢失。此外,RocketMQ新增的同步刷盘机制也进一步保证了数据的可靠性。
  1. 性能:
  • Kafka和RocketMQ在性能方面各有千秋。由于Kafka的数据以partition为单位,一个Kafka实例上可能有多达上百个partition,而一个RocketMQ实例上只有一个partition。这使得RocketMQ可以充分利用IO组的commit机制,批量传输数据,从而在replication时具有更好的性能。然而,Kafka的异步replication性能理论上低于RocketMQ的replication,因为同步replication与异步replication相比,性能上会有约20%-30%的损耗。
  1. 消息传递方式:
  • Kafka和RocketMQ在消息传递方式上也有所不同。Kafka采用Producer发送消息后,broker马上把消息投递给consumer,这种方式实时性较高,但会增加broker的负载。而RocketMQ基于Pull模式和Push模式的长轮询机制,来平衡Push和Pull模式各自的优缺点。RocketMQ的消息及时性较好,严格的消息顺序得到了保证。
  1. 其他特性:
  • Kafka在单机支持的队列数超过64个队列,而RocketMQ最高支持5万个队列。队列越多,可以支持的业务就越多。

五、kafka使用场景

  1. 实时数据流处理:Kafka可以处理大量的实时数据流,这些数据流可以来自不同的源,如用户行为、传感器数据、日志文件等。通过Kafka,可以将这些数据流进行实时的处理和分析,例如进行实时数据分析和告警。
  2. 消息队列:Kafka可以作为一个消息队列使用,用于在分布式系统中传递消息。它能够处理高吞吐量的消息,并保证消息的有序性和可靠性。
  3. 事件驱动架构:Kafka可以作为事件驱动架构的核心组件,将事件数据发布到不同的消费者,以便进行实时处理。这种架构可以简化应用程序的设计和开发,提高系统的可扩展性和灵活性。
  4. 数据管道:Kafka可以用于数据管道,将数据从一个系统传输到另一个系统。例如,可以将数据从数据库或日志文件传输到大数据平台或数据仓库。
  5. 业务事件通知:Kafka可以用于通知业务事件,例如订单状态变化、库存更新等。通过订阅Kafka主题,相关的应用程序和服务可以实时地接收到这些事件通知,并进行相应的处理。
  6. 流数据处理框架集成:Kafka可以与流处理框架集成,如Apache Flink、Apache Spark等。通过集成,可以将流数据从Kafka中实时导入到流处理框架中进行处理,实现流式计算和实时分析。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/657582.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

7000字详解Spring Boot项目集成RabbitMQ实战以及坑点分析

本文给大家介绍一下在 Spring Boot 项目中如何集成消息队列 RabbitMQ&#xff0c;包含对 RibbitMQ 的架构介绍、应用场景、坑点解析以及代码实战。 我将使用 waynboot-mall 项目作为代码讲解&#xff0c;项目地址&#xff1a;https://github.com/wayn111/waynboot-mall。本文大…

MATLAB - 仿真单摆的周期性摆动

系列文章目录 前言 本例演示如何使用 Symbolic Math Toolbox™ 模拟单摆的运动。推导摆的运动方程&#xff0c;然后对小角度进行分析求解&#xff0c;对任意角度进行数值求解。 一、步骤 1&#xff1a;推导运动方程 摆是一个遵循微分方程的简单机械系统。摆最初静止在垂直位置…

大数据 - Hadoop系列《四》- MapReduce(分布式计算引擎)的核心思想

上一篇&#xff1a; 大数据 - Hadoop系列《三》- MapReduce&#xff08;分布式计算引擎&#xff09;概述-CSDN博客 目录 13.1 MapReduce实例进程 13.2 阶段组成 13.4 概述 13.4.1 &#x1f959;Map阶段&#xff08;映射&#xff09; 13.4.2 &#x1f959;Reduce阶段执行过…

STM32F407移植OpenHarmony笔记3

接上一篇&#xff0c;搭建完环境&#xff0c;找个DEMO能跑&#xff0c;现在我准备尝试从0开始搬砖。 首先把/device和/vendor之前的代码全删除&#xff0c;这个时候用hb set命令看不到任何项目了。 /device目录是硬件设备目录&#xff0c;包括soc芯片厂商和board板级支持代码…

Linux 系统相关的命令

参考资料 Linux之chmod使用【linux】chmod命令详细用法 目录 一. 系统用户相关1.1 查看当前访问的主机和用户1.2 切换用户1.2.1 设置root用户密码1.2.2 普通用户和root用户切换 1.4 系统状态1.4.1 vmstat 查看当前系统的状态1.4.2 history 查看系统中输入过的命令 二. 系统文件…

图扑 HT UI 5.0 全新升级,开箱即用!

为顺应数字时代的不断发展&#xff0c;图扑 HT UI 5.0 在原有功能强大的界面组件库的基础上进行了全面升级&#xff0c;融入了更先进的技术、创新的设计理念以及更加智能的功能。HT UI 5.0 使用户体验更为直观、个性化&#xff0c;并在性能、稳定性和安全性等方面达到新的高度。…

githacker安装详细教程,linux添加环境变量详细教程(见标题三)

笔者是ctf小白&#xff0c;这两天也是遇到.git泄露的题目&#xff0c;需要工具来解决问题&#xff0c;在下载和使用的过程中也是遇到很多问题&#xff0c;写此篇记录经验&#xff0c;以供学习 在本篇标题三中有详细介绍了Linux系统添加环境变量的操作教程&#xff0c;以供学习 …

TensorFlow2实战-系列教程6:猫狗识别3------迁移学习

&#x1f9e1;&#x1f49b;&#x1f49a;TensorFlow2实战-系列教程 总目录 有任何问题欢迎在下面留言 本篇文章的代码运行界面均在Jupyter Notebook中进行 本篇文章配套的代码资源已经上传 猫狗识别1 数据增强 猫狗识别2------数据增强 猫狗识别3------迁移学习 1、迁移学习 …

解读4篇混合类型文件Polyglot相关的论文

0. 引入 Polyglot文件指的是混合类型文件&#xff0c;关于混合类型文件的基础&#xff0c;请参考文末给出的第一个链接&#xff08;参考1&#xff09;。 1. Toward the Detection of Polyglot Files 1.1 主题 这篇2022年的论文&#xff0c;提出了Polyglot文件的检测方法。虽…

openssl3.2 - .pod文件的查看方法

文章目录 .pod文件的查看方法概述笔记初步的解决方法备注 - pod2html.bat的详细用法好像Perl就自带这个BATEND .pod文件的查看方法 概述 看到openssl源码目录下有很多.pod文件, 软件发布的帮助内容都在里面. 当make install后, 大部分的.pod都会转成html文件, 但是有一部分…

【Java程序设计】【C00215】基于SSM的勤工助学管理系统(论文+PPT)

基于SSM的勤工助学管理系统&#xff08;论文PPT&#xff09; 项目简介项目获取开发环境项目技术运行截图 项目简介 这个一个基于SSM的勤工助学管理系统&#xff0c;本系统共分为三种权限&#xff1a;管理员、教师和学生 管理员&#xff1a;首页、个人中心、教师管理、学生管理…

gdp调试—Linux

目录 介绍 使用 介绍 代码分为debug模式和release模式 如果一份代码要被调试&#xff0c;这份代码必须是debug Linux下编译代码默认是是release模式 如果你想代码是debug模式 必须加上 - g 小提&#xff1a; vim默认&#xff1a;命令模式 gcc默认&#xff1a;releas…

操作系统--进程、线程基础知识

一、进程 我们编写的代码只是一个存储在硬盘的静态文件&#xff0c;通过编译后就会生成二进制可执行文件&#xff0c;当我们运行这个可执行文件后&#xff0c;它会被装载到内存中&#xff0c;接着 CPU 会执行程序中的每一条指令&#xff0c;那么这个运行中的程序&#xff0c;就…

ModelArts加速识别,助力新零售电商业务功能的实现

前言 如果说为客户提供最好的商品是产品眼中零售的本质&#xff0c;那么用户的思维是什么呢&#xff1f; 在用户眼中&#xff0c;极致的服务体验与优质的商品同等重要。 企业想要满足上面两项服务&#xff0c;关键在于提升效率&#xff0c;也就是需要有更高效率的零售&#…

C++ //练习 3.8 分别用while循环和传统的for循环重写第一题的程序,你觉得哪种形式更好呢?为什么?

C Primer&#xff08;第5版&#xff09; 练习 3.8 练习 3.8 分别用while循环和传统的for循环重写第一题的程序&#xff0c;你觉得哪种形式更好呢&#xff1f;为什么? 环境&#xff1a;Linux Ubuntu&#xff08;云服务器&#xff09; 工具&#xff1a;vim 代码块 /********…

【三】【C++】类与对象(二)

类的六个默认成员函数 在C中&#xff0c;有六个默认成员函数&#xff0c;它们是编译器在需要的情况下自动生成的成员函数&#xff0c;如果你不显式地定义它们&#xff0c;编译器会自动提供默认实现。这些默认成员函数包括&#xff1a; 默认构造函数 (Default Constructor)&…

C++ 数论相关题目 博弈论:拆分-Nim游戏

给定 n 堆石子&#xff0c;两位玩家轮流操作&#xff0c;每次操作可以取走其中的一堆石子&#xff0c;然后放入两堆规模更小的石子&#xff08;新堆规模可以为 0 &#xff0c;且两个新堆的石子总数可以大于取走的那堆石子数&#xff09;&#xff0c;最后无法进行操作的人视为失…

PMP中的数据收集工具:打开项目成功的钥匙

在项目管理中&#xff0c;数据收集是关键的一环。准确、及时的数据能够为项目决策提供可靠的依据&#xff0c;帮助项目经理更好地监控项目进展、识别潜在风险&#xff0c;并制定有效的应对措施。本文将深入探讨PMP&#xff08;项目管理专业&#xff09;中常用的数据收集工具&am…

力扣题目训练(6)

2024年1月30日力扣题目训练 2024年1月30日力扣题目训练367. 有效的完全平方数374. 猜数字大小383. 赎金信99. 恢复二叉搜索树105. 从前序与中序遍历序列构造二叉树51. N 皇后 2024年1月30日力扣题目训练 2024年1月30日第六天编程训练&#xff0c;今天主要是进行一些题训练&…

在ubuntu上在安装Squid代理服务器

Squid 是一个代理和缓存服务器&#xff0c;它将请求转发到所需的目的地&#xff0c;同时保存请求的内容&#xff0c;当你再次请求相同内容时&#xff0c;他可以向你提供缓冲内容&#xff0c;从而提高访问速度。Squid代理服务器目前支持的协议有&#xff1a;http、SSL、DNS、FTP…