kafka消息队列最常用的两种模式,以及应用场景

目录

一、发布-订阅模式

二、点对点模式

三、应用场景


 

一、发布-订阅模式

发布-订阅模式是最常见的消息传递模式,其中消息发布者将消息发送到一个或多个主题(Topic),而订阅者可以选择订阅一个或多个主题来接收消息。每个订阅者都可以独立地消费消息,而发布者和订阅者之间没有直接的联系。

在Kafka中,使用KafkaProducer类进行消息发布,KafkaConsumer类进行消息订阅。以下是一个简单的Java代码示例:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Properties;public class PubSubExample {private static final String TOPIC = "my_topic";private static final String BOOTSTRAP_SERVERS = "localhost:9092";public static void main(String[] args) {// Kafka ProducerProperties producerProps = new Properties();producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);// Publish messagesfor (int i = 0; i < 10; i++) {String message = "Message " + i;ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, message);producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {System.err.println("Error publishing message: " + exception.getMessage());} else {System.out.println("Message published successfully: " + metadata.offset());}}});}producer.close();// Kafka ConsumerProperties consumerProps = new Properties();consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my_consumer_group");consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);consumer.subscribe(Collections.singletonList(TOPIC));// Consume messageswhile (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.println("Received message: " + record.value());// Process the message}}}
}

 

二、点对点模式

点对点模式中,消息发送者将消息发送到一个指定的队列(Queue),而消息接收者从相同的队列中接收消息。每个消息只能被一个接收者消费。

在Kafka中,点对点模式可以通过创建单个消费者组来实现。以下是一个简单的Java代码示例:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Properties;public class PointToPointExample {private static final String QUEUE = "my_queue";private static final String BOOTSTRAP_SERVERS = "localhost:9092";public static void main(String[] args) {// Kafka ProducerProperties producerProps = new Properties();producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);// Publish messagesfor (int i = 0; i < 10; i++) {String message = "Message " + i;ProducerRecord<String, String> record = new ProducerRecord<>(QUEUE, message);producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {System.err.println("Error publishing message: " + exception.getMessage());} else {System.out.println("Message published successfully: " + metadata.offset());}}});}producer.close();// Kafka ConsumerProperties consumerProps = new Properties();consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my_consumer_group");consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);consumer.subscribe(Collections.singletonList(QUEUE));// Consume messageswhile (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.println("Received message: " + record.value());// Process the messageconsumer.commitAsync();}}}
}

以上代码示例演示了如何使用Kafka的Java客户端库进行发布和订阅消息以及点对点消息传递。请注意,代码中的BOOTSTRAP_SERVERS需要根据你的实际环境进行配置。

 

三、应用场景

Kafka消息队列具有高吞吐量、低延迟、可扩展性等特点,因此广泛应用于以下场景:

  1. 日志收集和数据管道:Kafka可以用作集中式日志收集系统,可以将不同服务、应用程序、服务器生成的日志集中到一个中心化的消息队列中,再通过消费者进行处理、分析和存储。同时,Kafka还可以作为数据管道,将不同数据源的数据通过消息队列进行传输和处理。

  2. 实时流处理:Kafka与流处理框架(如Apache Flink、Apache Spark)结合使用,可以实现实时的数据流处理。Kafka可以作为输入源和输出源,将数据流传输给流处理框架进行实时分析、计算和处理。

  3. 微服务架构:Kafka可以用作微服务之间的异步通信机制,不同的微服务各自独立地生产和消费消息,实现解耦和扩展性。同时,Kafka还可以用于实现事件驱动架构,不同的微服务通过订阅事件的方式进行通信和协作。

  4. 网络爬虫和数据采集:Kafka可以用于构建高可靠的网络爬虫系统和数据采集系统。爬虫可以将抓取的数据写入Kafka队列,然后其他系统可以消费这些数据进行进一步的处理和分析。

  5. 消息系统和通信中间件:Kafka提供了可靠的消息传递机制,可以作为消息系统和通信中间件,用于构建分布式系统、实现异步通信和跨系统的数据传输。

总之,Kafka消息队列的应用场景非常广泛,适用于大数据处理、实时数据流处理、异步通信等各种场景。它具有高性能、可靠性和可扩展性的特点,可以帮助解决数据流处理和消息传递的各种问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1887.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

实现本地缓存-caffeine

目录 实现caffeine cache CacheManager Caffeine配置说明 创建自定义配置类 配置缓存管理器 编写自动提示配置文件 测试使用 创建测试配置实体类 创建测试配置类 创建注解扫描的测试实体 创建单元测试类进行测试 实现caffeine cache CacheManager SimpleCacheManag…

香橙派4和树莓派4B构建K8S集群实践之七: Jenkins

目录 1. 说明 2. 步骤 2.1 准备工作 2.2 安装 2.2.1 用jenkins原站for k8s的安装仓方法安装 2.2.2 Helm 安装 3. 相关命令 4. 遇到的问题 5. 参考 1. 说明 在k8s上部署jenkins&#xff0c;并用 jenkins.k8s-t2.com访问在namespace为devops下安装在指定节点k8s-master-…

欧姆龙以太网模块如何设置ip连接 Kepware opc步骤

在数字化和自动化的今天&#xff0c;PLC在工业控制领域的作用日益重要。然而&#xff0c;PLC通讯口的有限资源成为了困扰工程师们的问题。为了解决这一问题&#xff0c;捷米特推出了JM-ETH-CP转以太网模块&#xff0c;让即插即用的以太网通讯成为可能&#xff0c;不仅有效利用了…

字符函数和字符串函数上篇(详解)

❤️ 作者简介 &#xff1a;RO-BERRY 致力于C、C、数据结构、TCP/IP、数据库等等一系列知识&#xff0c;对纯音乐有独特的喜爱 &#x1f4d7; 日后方向 : 偏向于CPP开发以及大数据方向&#xff0c;如果你也感兴趣的话欢迎关注博主&#xff0c;期待更新 字符函数和字符串函数 &a…

Leetcode每日一题(困难):1851. 包含每个查询的最小区间(2023.7.18 C++)

目录 1851. 包含每个查询的最小区间 题目描述&#xff1a; 实现代码与解析&#xff1a; 排序 哈希 原理思路&#xff1a; 1851. 包含每个查询的最小区间 题目描述&#xff1a; 给你一个二维整数数组 intervals &#xff0c;其中 intervals[i] [lefti, righti] 表示第 i…

OpenCV——总结《车牌识别》

1.图片中的hsv hsv提取蓝色部分 # hsv提取蓝色部分 def hsv_color_find(img):img_copy img.copy()cv2.imshow(img_copy, img_copy)"""提取图中的蓝色部分 hsv范围可以自行优化cv2.inRange()参数介绍&#xff1a;第一个参数&#xff1a;hsv指的是原图第二个参…

初识vue3/setup/ ref()/ computed/watch/生命周期/父传子

创建项目先不着急学 main.js变了 新加setup reactive ref() computed watch 生命周期 父传子 子传父 ref/模板引用 暴露子组件属性 跨层传数据 defineOptions

用OpenCV进行图像分割--进阶篇

1. 引言 大家好&#xff0c;我的图像处理爱好者们&#xff01; 在上一篇幅中&#xff0c;我们简单介绍了图像分割领域中的基础知识&#xff0c;包含基于固定阈值的分割和基于OSTU的分割算法。这一次&#xff0c;我们将通过介绍基于色度的分割来进一步巩固大家的基础知识。 闲…

【JavaEE】DI与DL的介绍-Spring项目的创建-Bean对象的存储与获取

Spring的开发要点总结 文章目录 【JavaEE】Spring的开发要点总结&#xff08;1&#xff09;1. DI 和 DL1.1 DI 依赖注入1.2 DL 依赖查询1.3 DI 与 DL的区别1.4 IoC 与 DI/DL 的区别 2. Spring项目的创建2.1 创建Maven项目2.2 设置国内源2.2.1 勾选2.2.2 删除本地jar包2.2.3 re…

C++万字自学笔记

[TOC] 一、 C基础 C的IDE有CLion、Visual Studio、DEV C、eclipse等等&#xff0c;这里使用CLion进行学习。 0. C初识 0.1 第一个C程序 编写一个C程序总共分为4个步骤 创建项目创建文件编写代码运行程序 #include <iostream>int main() {using namespace std;cout…

提车自检手册(3系,其他车辆类似)

一、检查铭牌 1. 检查铭牌车辆生产日期&#xff0c;大于半年pass&#xff0c;玻璃、大灯、轮胎的生产日期不得大于车辆生产日期 二、检查轮胎 1. 是否全部为米其林轮胎 zp 4 防爆胎2. 检查全部轮胎日期&#xff0c;4个数字&#xff0c;后俩位年份&#xff0c;前俩位第几周 …

2.7 进制转换与mac

文章目录 2.7 进制转换与MAC进制转换MAC地址MAC地址与IP地址的关系总结 2.7 进制转换与MAC 进制转换 在计算机科学中&#xff0c;进制转换是将一个数值从一种进制表示转换为另一种进制表示的过程。常见的进制包括二进制&#xff08;base-2&#xff09;、十进制&#xff08;ba…

Vant源码解析(四)----Popup弹出层,详解样式方法

这个功能&#xff0c;自己也手写过&#xff0c;毕竟有很多弹窗的嘛。 我自己写就是&#xff1a;一个背景层&#xff0c;然后一个盒子里面放内容。再写个显示隐藏事件。够够的了。 Vant的Popup弹出层 页面结构 短短一个背景加内容盒子&#xff0c;vant套了几层。 这是引用的组件…

# Pytorch 深度卷积模型的特征可视化

Pytorch 深度卷积模型的特征可视化 1. 模型构建与可视化1.1 确定当前模型各层名称1.2 模型构建1.3 模型训练2. 训练过程可视化与特征图2.1 获取完整节点信息2.2 可视化参考文献资料1. 模型构建与可视化 1.1 确定当前模型各层名称 可视化模型的特征层需要打印各层的名称: 安装…

数据结构单向循环链表,创建以及增删改查的实现

一、单向循环链表的描述 循环链表&#xff1a;是另一种形式的链式存储结构。其特点是表中最后一个结点的指针域指向头节点&#xff0c;整个链表形成一个环。 单向循环链表的操作和单链表操作基本一致&#xff0c;差别在于&#xff1a;当链表遍历时&#xff0c;判别当前指针p是…

ChatGPT是否具有记忆能力?

ChatGPT在某种程度上具有记忆能力&#xff0c;但它的记忆能力有限且不像人类的记忆那样全面和持久。以下是对ChatGPT的记忆能力的详细分析&#xff1a; 1. 上下文记忆&#xff1a;ChatGPT可以在对话过程中记住先前的对话历史&#xff0c;以便更好地理解和回应后续的问题。通过…

vue+element Cascader 级联选择器 > 实现省市区三级联动

vueelement Cascader 级联选择器 > 实现省市区三级联动 先看下实现效果吧&#xff08;嘻嘻&#xff09; 看完我们就开始啦 安装element-china-area-data1 npm install element-china-area-data5.0.2 -S上代码 <el-cascadersize"large":options"options…

Http相关

Q&#xff1a;RESTful接口风格是什么&#xff1f; RESTful API 是一种基于 REST&#xff08;Representational State Transfer&#xff0c;表现层状态转移&#xff09;架构风格的 API 设计规范&#xff0c;它的核心思想是资源&#xff08;Resource&#xff09;和 HTTP 方法&am…

CRC校验原理全面解读

目录 1. 简介2. 原理2.1 CRC的发送与接收2.2 CRC校验码的生成2.3 CRC校验码的校验 3. 拓展问题3.1 模2除法为什么等同于异或运算&#xff1f;3.2 为什么除数的位数和被除数补充的位数相差为1&#xff1f;3.3 为什么CRC校验码不能纠正错误&#xff0c;只能检测错误&#xff1f; …

基于Selenium+Python的web自动化测试框架(附框架源码+项目实战)

目录 一、什么是Selenium&#xff1f; 二、自动化测试框架 三、自动化框架的设计和实现 四、需要改进的模块 五、总结 总结感谢每一个认真阅读我文章的人&#xff01;&#xff01;&#xff01; 重点&#xff1a;配套学习资料和视频教学 一、什么是Selenium&#xff1f; …