【SpringCloud】Kafka消息中间件

Kafka

  • Kafka
    • 消息中间件对比:
    • kafka介绍
    • 安装教程:
    • 配置以及启动顺序:
  • Kafka整合微服务
    • 初级入门
      • 测试:
    • Kafka整合SpringBoot
      • ①导入spring-kafka依赖信息
      • ②消息生产者
      • ③消息消费者
      • Postman测试

Kafka

消息中间件对比:

消息中间件对比-选择建议:

kafka介绍

Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。kafka官网:Apache Kafka

producer相当于生产者,consumer相当于消费者。

Kafka Cluster相当于快递驿站,broker1、broker2相当于1,2号快递员,topic为它们的要派送的快递;

  • producer:发布消息的对象称之为主题生产者(Kafka topic producer)
  • topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
  • consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
  • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

  • 生产者发送消息,多个消费者只能有一个消费者接收到消息(一对一)

**理解:**相当于同一个快递员只能给一组快递的一个人派送快递,只有这个组的对应的一个人才能收到消息;

  • 生产者发送消息,多个消费者都可以接收到消息 (一对多)

**理解:**相当于多个快递员给自己组的快递的一个人派送快递,各个组的对应的一个人都能收到消息;

安装教程:

windows系统kafka小白入门篇——下载安装,环境配置,入门代码书写_windows kafka-CSDN博客

配置以及启动顺序:

问题处理:

kfk.cmd闪退配置,配置环境变量https://blog.csdn.net/Xxy605/article/details/116844151

在环境变量path里面添加C:\Windows\System32\wbem

Kafka整合微服务

初级入门

①创建kafka-demo项目,导入依赖

<!-- kafkfa -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions>
</dependency>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId>
</dependency>

生产者发送消息:

package com.ape.kafka.sample;import org.apache.kafka.clients.producer.*;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** 生产者*/
public class ProducerQuickStart {public static void main(String[] args) throws ExecutionException, InterruptedException {//1.kafka链接配置信息Properties prop = new Properties();//kafka链接地址prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//key和value的序列化prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//2.创建kafka生产者对象KafkaProducer<String,String> producer = new KafkaProducer<String,String>(prop);//3.发送消息/*** 第一个参数 :topic* 第二个参数:消息的key* 第三个参数:消息的value*/ProducerRecord<String,String> kvProducerRecord = new ProducerRecord<String,String>("topic-first","key-001","hello kafka");//同步发送消息producer.send(kvProducerRecord);//4.关闭消息通道  必须要关闭,否则消息发送不成功producer.close();}}

消费者1消费消息:

package com.ape.kafka.sample;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** 消费者*/
public class ConsumerQuickStart {public static void main(String[] args) {//1.kafka的配置信息Properties prop = new Properties();//链接地址prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//key和value的反序列化器prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//设置消费者组prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");//2.创建消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);//3.订阅主题consumer.subscribe(Collections.singletonList("topic-first"));//4.拉取消息while (true) {// 读取数据,读取超时时间为100ms ,即每个1000ms拉取一次ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());}}}}

消费者2

测试:

一对多

先启动俩个消费者1,2,然后在启动生产者:

说明是一对多,对于不同的组group对应的这个人topic-first都能收到消息。

一对一:

Kafka整合SpringBoot

①导入spring-kafka依赖信息

<!-- kafkfa -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions>
</dependency>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId>
</dependency>

编写yaml文件:

server:port: 9991
spring:application:name: kafka-demokafka:bootstrap-servers: localhost:9092producer:retries: 10 #重试的次数key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

②消息生产者

③消息消费者

Postman测试

发送请求:http://localhost:9991/hello

控制台输出:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/59551.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ViT模型复现项目实战

项目源码获取方式见文章末尾&#xff01; 600多个深度学习项目资料&#xff0c;快来加入社群一起学习吧。 《------往期经典推荐------》 项目名称 1.【基于CNN-RNN的影像报告生成】 2.【卫星图像道路检测DeepLabV3Plus模型】 3.【GAN模型实现二次元头像生成】 4.【CNN模型实现…

MySQL缓存参数如何优化与表结构如何优化才算是最大性能的优化

为了最大化 MySQL 的性能&#xff0c;优化缓存参数和表结构是非常重要的。MySQL 提供了多个缓存参数来提高查询效率&#xff0c;而表结构优化可以减少磁盘 I/O&#xff0c;改善查询响应时间。下面我将分别给出如何优化缓存参数以及表结构的详细建议和代码示例。 1. MySQL 缓存…

16通道AD采集方案,基于复旦微ARM + FPGA国产SoC处理器平台

测试数据汇总 表 1 本文带来的是基于复旦微FMQL20S400M四核ARM Cortex-A7(PS端) + FPGA可编程逻辑资源(PL端)异构多核SoC处理器设计的全国产工业评估板的AD采集案例。本次案例演示的开发环境如下: Windows开发环境:Windows 7 64bit、Windows 10 64bit PL端开发环境:P…

mac电脑如何远程登录自己购买的阿里云服务器

密码登录 示例 ssh root125.12.45.32 # 其中root是用户名&#xff0c;125.12.45.32是阿里云服务器的公网ipjava0904weigongdeMBP ~ % ssh root106.15.186.127 The authenticity of host 106.15.186.127 (106.15.186.127) cant be established. ED25519 key fingerprint is SH…

【Python爬虫实战】DrissionPage 与 ChromiumPage:高效网页自动化与数据抓取的双利器

&#x1f308;个人主页&#xff1a;易辰君-CSDN博客 &#x1f525; 系列专栏&#xff1a;https://blog.csdn.net/2401_86688088/category_12797772.html ​ 目录 前言 一、DrissionPage简介 &#xff08;一&#xff09;特点 &#xff08;二&#xff09;安装 &#xff08;三…

R7:糖尿病预测模型优化探索

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 一、实验目的&#xff1a; 探索本案例是否还有进一步优化的空间 二、实验环境&#xff1a; 语言环境&#xff1a;python 3.8编译器&#xff1a;Jupyter notebo…

HANDLINK ISS-7000v2 网关 login_handler.cgi 未授权RCE漏洞复现

0x01 产品简介 瀚霖科技股份有限公司ISS-7000 v2网络网关服务器是台高性能的网关,提供各类酒店网络认证计费的完整解决方案。由于智慧手机与平板电脑日渐普及,人们工作之时开始使用随身携带的设备,因此无线网络也成为网络使用者基本服务的项目。ISS-7000 v2可登录300至1000…

vue3中当界面高度太高时,如何固定导航栏

导航栏组件 设计思路&#xff1a; 将原先的导航栏进行隐藏&#xff0c;将需要新的导航栏进行条件展示 <script setup> //vueuse import {useScroll} from vueuse/core </script><style scoped langscss> .app-header-sticky {width: 100%;height: 80px;po…

RK3576 LINUX RKNN SDK 测试

安装Conda工具 安装 Miniforge Conda wget -c https://github.com/conda-forge/miniforge/releases/latest/download/Miniforge3-Linux-x86_64.sh chmod 777 Miniforge3-Linux-x86_64.sh bash Miniforge3-Linux-x86_64.shsource ~/miniforge3/bin/activate # Miniforge 安装的…

深入学习指针(5)!!!!!!!!!!!!!!!

文章目录 1.回调函数是什么&#xff1f;2.qsort使用举例2.1使用qsort函数排序整形数据2.2使用sqort排序结构数据 3.qsort函数的模拟实现 1.回调函数是什么&#xff1f; 回调函数就是⼀个通过函数指针调⽤的函数。 如果你把函数的指针&#xff08;地址&#xff09;作为参数传递…

天锐绿盾加密软件与Ping32数据安全防护对比,为企业提供坚实的保障

在当今信息化时代&#xff0c;数据安全已成为企业不可忽视的重要议题。天锐绿盾加密软件与Ping32作为两款备受关注的数据安全解决方案&#xff0c;各自以其卓越的功能和优势&#xff0c;为企业数据安全提供了坚实的保障。 Ping32&#xff0c;同样以其出色的数据加密和防泄密功能…

支持向量机相关证明 解的稀疏性

主要涉及拉格朗日乘子法&#xff0c;对偶问题求解

内存优化常用技巧

在嵌入式开发中&#xff0c;内存管理非常关键&#xff0c;因为资源&#xff08;如 RAM 和 Flash&#xff09;往往非常有限。以下是一些内存管理优化的技巧及实例&#xff0c;帮助提高内存利用效率&#xff1a; 1. 使用静态分配而不是动态分配 解释&#xff1a;嵌入式系统通常避…

求职经验分享

更多详情&#xff1a;爱米的前端小笔记&#xff0c;更多前端内容&#xff0c;等你来看&#xff01;这些都是利用下班时间整理的&#xff0c;整理不易&#xff0c;大家多多&#x1f44d;&#x1f49b;➕&#x1f914;哦&#xff01;你们的支持才是我不断更新的动力&#xff01;找…

基于Dpabi和spm12的脑脊液(csf)分割和提取笔记

一、前言 脑脊液&#xff08;csf&#xff09;一直被认为与新陈代谢有重要关联&#xff0c;其为许多神经科学研究提供重要价值&#xff0c;从fMRI图像中提取脑脊液信号可用于多种神经系统疾病的诊断。特别是自2019年Science上那篇著名的csf-BOLD文章发表后&#xff0c;大家都试图…

力扣:94--中序遍历二叉树

树 – 二叉树 完全二叉树&#xff1a; 完全二叉树可以用数组完美匹配位置&#xff08;先序存储&#xff1a;根左右&#xff09;&#xff0c; 推论一 &#xff1a; 位置为k的节点&#xff0c;左孩子&#xff1a;2*k 1 &#xff0c;右孩子 &#xff1a; 2 * &#xff08;k 1&…

SQL 常用语句

目录 我的测试环境 学习文档 进入数据库 基础通关测验 语句-- 查 展示数据库&#xff1b; 进入某个数据库&#xff1b; 展示表&#xff1a; 展示某个表 desc 查询整个表&#xff1a; 查询特定列&#xff1a; 范围查询 等于特定值 不等于 介于 特定字符查询 Li…

Android——画中画模式

应用中的画中画 监听回到桌面与打开任务列表的广播收到广播之后&#xff0c;调用 enterPictureInPictureMode 方法进入画中画模式重写活动页面的 onPictureInPictureModeChanged 方法&#xff0c;补充进入画中画模式或退出画中画模式时的处理逻辑 回到桌面与切到任务列表 按…

keepalived 高可用搭建

一、部署准备 准备两台Linux&#xff08;ubuntu&#xff09;设备 主机&#xff1a;192.168.2.78 备机&#xff1a;192.168.2.18 设备安装keepalived(主机和备机都安装) sudo apt-get install keepalivedkeepalived配置文件为keepalived.conf&#xff0c;开始时文件可能不存在…

MySQL utf8mb3 和 utf8mb4引发的问题

问题描述 Cause: java.sql.SQLException: Incorrect string value: \xF4\x8F\xBB\xBF-b... for column sddd_aaa_ark at row 1 sddd_aaa_ark 存储中文字符时&#xff0c;出现上述问题 原因分析 sddd_aaa_ark在数据库中结构是 utf8字符的最大字节数是3 byte&#xff0c;但是某些…