kafka消息队列最常用的两种模式,以及应用场景

目录

一、发布-订阅模式

二、点对点模式

三、应用场景


 

一、发布-订阅模式

发布-订阅模式是最常见的消息传递模式,其中消息发布者将消息发送到一个或多个主题(Topic),而订阅者可以选择订阅一个或多个主题来接收消息。每个订阅者都可以独立地消费消息,而发布者和订阅者之间没有直接的联系。

在Kafka中,使用KafkaProducer类进行消息发布,KafkaConsumer类进行消息订阅。以下是一个简单的Java代码示例:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Properties;public class PubSubExample {private static final String TOPIC = "my_topic";private static final String BOOTSTRAP_SERVERS = "localhost:9092";public static void main(String[] args) {// Kafka ProducerProperties producerProps = new Properties();producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);// Publish messagesfor (int i = 0; i < 10; i++) {String message = "Message " + i;ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, message);producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {System.err.println("Error publishing message: " + exception.getMessage());} else {System.out.println("Message published successfully: " + metadata.offset());}}});}producer.close();// Kafka ConsumerProperties consumerProps = new Properties();consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my_consumer_group");consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);consumer.subscribe(Collections.singletonList(TOPIC));// Consume messageswhile (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.println("Received message: " + record.value());// Process the message}}}
}

 

二、点对点模式

点对点模式中,消息发送者将消息发送到一个指定的队列(Queue),而消息接收者从相同的队列中接收消息。每个消息只能被一个接收者消费。

在Kafka中,点对点模式可以通过创建单个消费者组来实现。以下是一个简单的Java代码示例:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Properties;public class PointToPointExample {private static final String QUEUE = "my_queue";private static final String BOOTSTRAP_SERVERS = "localhost:9092";public static void main(String[] args) {// Kafka ProducerProperties producerProps = new Properties();producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);// Publish messagesfor (int i = 0; i < 10; i++) {String message = "Message " + i;ProducerRecord<String, String> record = new ProducerRecord<>(QUEUE, message);producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {System.err.println("Error publishing message: " + exception.getMessage());} else {System.out.println("Message published successfully: " + metadata.offset());}}});}producer.close();// Kafka ConsumerProperties consumerProps = new Properties();consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my_consumer_group");consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);consumer.subscribe(Collections.singletonList(QUEUE));// Consume messageswhile (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.println("Received message: " + record.value());// Process the messageconsumer.commitAsync();}}}
}

以上代码示例演示了如何使用Kafka的Java客户端库进行发布和订阅消息以及点对点消息传递。请注意,代码中的BOOTSTRAP_SERVERS需要根据你的实际环境进行配置。

 

三、应用场景

Kafka消息队列具有高吞吐量、低延迟、可扩展性等特点,因此广泛应用于以下场景:

  1. 日志收集和数据管道:Kafka可以用作集中式日志收集系统,可以将不同服务、应用程序、服务器生成的日志集中到一个中心化的消息队列中,再通过消费者进行处理、分析和存储。同时,Kafka还可以作为数据管道,将不同数据源的数据通过消息队列进行传输和处理。

  2. 实时流处理:Kafka与流处理框架(如Apache Flink、Apache Spark)结合使用,可以实现实时的数据流处理。Kafka可以作为输入源和输出源,将数据流传输给流处理框架进行实时分析、计算和处理。

  3. 微服务架构:Kafka可以用作微服务之间的异步通信机制,不同的微服务各自独立地生产和消费消息,实现解耦和扩展性。同时,Kafka还可以用于实现事件驱动架构,不同的微服务通过订阅事件的方式进行通信和协作。

  4. 网络爬虫和数据采集:Kafka可以用于构建高可靠的网络爬虫系统和数据采集系统。爬虫可以将抓取的数据写入Kafka队列,然后其他系统可以消费这些数据进行进一步的处理和分析。

  5. 消息系统和通信中间件:Kafka提供了可靠的消息传递机制,可以作为消息系统和通信中间件,用于构建分布式系统、实现异步通信和跨系统的数据传输。

总之,Kafka消息队列的应用场景非常广泛,适用于大数据处理、实时数据流处理、异步通信等各种场景。它具有高性能、可靠性和可扩展性的特点,可以帮助解决数据流处理和消息传递的各种问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1887.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

实现本地缓存-caffeine

目录 实现caffeine cache CacheManager Caffeine配置说明 创建自定义配置类 配置缓存管理器 编写自动提示配置文件 测试使用 创建测试配置实体类 创建测试配置类 创建注解扫描的测试实体 创建单元测试类进行测试 实现caffeine cache CacheManager SimpleCacheManag…

香橙派4和树莓派4B构建K8S集群实践之七: Jenkins

目录 1. 说明 2. 步骤 2.1 准备工作 2.2 安装 2.2.1 用jenkins原站for k8s的安装仓方法安装 2.2.2 Helm 安装 3. 相关命令 4. 遇到的问题 5. 参考 1. 说明 在k8s上部署jenkins&#xff0c;并用 jenkins.k8s-t2.com访问在namespace为devops下安装在指定节点k8s-master-…

欧姆龙以太网模块如何设置ip连接 Kepware opc步骤

在数字化和自动化的今天&#xff0c;PLC在工业控制领域的作用日益重要。然而&#xff0c;PLC通讯口的有限资源成为了困扰工程师们的问题。为了解决这一问题&#xff0c;捷米特推出了JM-ETH-CP转以太网模块&#xff0c;让即插即用的以太网通讯成为可能&#xff0c;不仅有效利用了…

字符函数和字符串函数上篇(详解)

❤️ 作者简介 &#xff1a;RO-BERRY 致力于C、C、数据结构、TCP/IP、数据库等等一系列知识&#xff0c;对纯音乐有独特的喜爱 &#x1f4d7; 日后方向 : 偏向于CPP开发以及大数据方向&#xff0c;如果你也感兴趣的话欢迎关注博主&#xff0c;期待更新 字符函数和字符串函数 &a…

OpenCV——总结《车牌识别》

1.图片中的hsv hsv提取蓝色部分 # hsv提取蓝色部分 def hsv_color_find(img):img_copy img.copy()cv2.imshow(img_copy, img_copy)"""提取图中的蓝色部分 hsv范围可以自行优化cv2.inRange()参数介绍&#xff1a;第一个参数&#xff1a;hsv指的是原图第二个参…

初识vue3/setup/ ref()/ computed/watch/生命周期/父传子

创建项目先不着急学 main.js变了 新加setup reactive ref() computed watch 生命周期 父传子 子传父 ref/模板引用 暴露子组件属性 跨层传数据 defineOptions

用OpenCV进行图像分割--进阶篇

1. 引言 大家好&#xff0c;我的图像处理爱好者们&#xff01; 在上一篇幅中&#xff0c;我们简单介绍了图像分割领域中的基础知识&#xff0c;包含基于固定阈值的分割和基于OSTU的分割算法。这一次&#xff0c;我们将通过介绍基于色度的分割来进一步巩固大家的基础知识。 闲…

【JavaEE】DI与DL的介绍-Spring项目的创建-Bean对象的存储与获取

Spring的开发要点总结 文章目录 【JavaEE】Spring的开发要点总结&#xff08;1&#xff09;1. DI 和 DL1.1 DI 依赖注入1.2 DL 依赖查询1.3 DI 与 DL的区别1.4 IoC 与 DI/DL 的区别 2. Spring项目的创建2.1 创建Maven项目2.2 设置国内源2.2.1 勾选2.2.2 删除本地jar包2.2.3 re…

C++万字自学笔记

[TOC] 一、 C基础 C的IDE有CLion、Visual Studio、DEV C、eclipse等等&#xff0c;这里使用CLion进行学习。 0. C初识 0.1 第一个C程序 编写一个C程序总共分为4个步骤 创建项目创建文件编写代码运行程序 #include <iostream>int main() {using namespace std;cout…

Vant源码解析(四)----Popup弹出层,详解样式方法

这个功能&#xff0c;自己也手写过&#xff0c;毕竟有很多弹窗的嘛。 我自己写就是&#xff1a;一个背景层&#xff0c;然后一个盒子里面放内容。再写个显示隐藏事件。够够的了。 Vant的Popup弹出层 页面结构 短短一个背景加内容盒子&#xff0c;vant套了几层。 这是引用的组件…

数据结构单向循环链表,创建以及增删改查的实现

一、单向循环链表的描述 循环链表&#xff1a;是另一种形式的链式存储结构。其特点是表中最后一个结点的指针域指向头节点&#xff0c;整个链表形成一个环。 单向循环链表的操作和单链表操作基本一致&#xff0c;差别在于&#xff1a;当链表遍历时&#xff0c;判别当前指针p是…

vue+element Cascader 级联选择器 > 实现省市区三级联动

vueelement Cascader 级联选择器 > 实现省市区三级联动 先看下实现效果吧&#xff08;嘻嘻&#xff09; 看完我们就开始啦 安装element-china-area-data1 npm install element-china-area-data5.0.2 -S上代码 <el-cascadersize"large":options"options…

CRC校验原理全面解读

目录 1. 简介2. 原理2.1 CRC的发送与接收2.2 CRC校验码的生成2.3 CRC校验码的校验 3. 拓展问题3.1 模2除法为什么等同于异或运算&#xff1f;3.2 为什么除数的位数和被除数补充的位数相差为1&#xff1f;3.3 为什么CRC校验码不能纠正错误&#xff0c;只能检测错误&#xff1f; …

基于Selenium+Python的web自动化测试框架(附框架源码+项目实战)

目录 一、什么是Selenium&#xff1f; 二、自动化测试框架 三、自动化框架的设计和实现 四、需要改进的模块 五、总结 总结感谢每一个认真阅读我文章的人&#xff01;&#xff01;&#xff01; 重点&#xff1a;配套学习资料和视频教学 一、什么是Selenium&#xff1f; …

会员管理系统如何深度绑定用户?会员系统必备哪些功能?

在以消费者为主导的企业&#xff08;商家&#xff09;范围内&#xff0c;实行会员制管理能够更好的提升客户的忠诚度&#xff0c;减少客户的流失。完整、精确的会员管理系统&#xff0c;更能提升企业&#xff08;商家&#xff09;的实际效益。 蚓链会员管理系统(专业版) 便是这…

02 QPushButton的基本使用

Tips: 在使用控件的时候如果没有智能提示&#xff0c;可能是没有包含头文件 在运行时&#xff0c;报【invalid use of xxx】可能是没有包含相关头文件 如果出现中文乱码&#xff1a;设置编译器的编码格式为UTF-8 本节主要包含创建一个按钮控件、显示按钮、设置按钮的父窗口、设…

Centos7安装Docker

Centos7安装Docker 目录 环境准备 安装Docker 启动Docker 切换源 启动第一个容器 环境准备 切换root权限 su root 升级所有包同时也升级软件和系统内核 yum -y update 卸载旧版本 yum remove docker docker-common docker-selinux docker-engine 卸载旧版本 yum rem…

UG\NX二次开发 捕获NX OPEN C++异常,乱码问题

文章作者:里海 来源网站:https://blog.csdn.net/WangPaiFeiXingYuan 简介: 捕获NX OPEN C++异常,乱码问题。 效果: 使用uc1601(ex.what(),1)显示乱码 使用 NXMessageBox()->Show("Block Styler", NXMessageBox::DialogTypeError, ex…

rce题目

<?php include "flag.php"; highlight_file(__FILE__); if(isset($_GET[HECTF])) { if (; preg_replace(/[^\W]\((?R)?\)/, NULL, $_GET[HECTF])) { if (!preg_match(/pos|high|op|na|info|dec|hex|oct|pi/i, $_GET[HECTF])) { eval(…

Kubernetes Volume及其类型(NFS、SAN) - PV - PVC - PV与PVC与Pod的关系

目录 volume 卷 官方文档&#xff1a;卷 | Kubernetes 一、emptyDir&#xff08;临时卷&#xff09; 二、hostPath卷 type字段参数 hostPath 实验&#xff1a; 三、第3方提供的存储卷&#xff08;百度云、阿里云、亚马逊云、谷歌云等&#xff09; 四、local卷 五、NF…