kafka生产者消费者举例

文章目录

    • kafka介绍
    • 生产者消费者例子
      • 一、生产者
      • 二、消费者
      • 三、效果
    • KafkaTemplate @KafkaListener

kafka介绍

Kafka 是一款分布式流处理平台,它被设计用于高吞吐量、持久性、分布式的数据流处理。

  • Kafka 简介

    • Kafka 是一个高吞吐、分布式、基于发布 订阅的消息系统。
    • Kafka 具有高吞吐量、低延迟、可扩展性、持久性、可靠性、容错性、高并发等特性。
  • Kafka 应用场景

    • 日志收集:公司可以使用 Kafka 收集各种服务的日志,然后通过 Kafka 统一接口服务的方式将这些日志开放给各种消费者,例如 Hadoop、Hbase、Solr 等。
    • 消息系统:Kafka 可以解耦生产者和消费者,缓存消息等。
    • 用户活动跟踪:Kafka 经常用于记录 web 用户或 app 用户的各种活动,如浏览网页、搜索、点击等。这些活动信息被各个服务器发布到 Kafka 的 topic 中,然后订阅者通过订阅这些 topic 来实时监控分析,或者装载到 Hadoop、数据仓库中进行离线分析和挖掘。
    • 运营指标:Kafka 也经常用来记录运营监控数据,包括收集各种分布式应用的数据、生产各种操作的集中反馈,比如报警和报告。
    • 流式处理:例如 Spark Streaming 和 Storm。

Kafka 在大规模数据流处理和实时数据传输场景中发挥着重要作用,其发布订阅模型、分区和副本机制以及异步消息传递的特性使其成为分布式系统中的重要组件。

生产者消费者例子

当Docker部署Kafka集群时,需要确保安装了ZooKeeper,因为Kafka依赖于ZooKeeper来实现集群协调与管理。ZooKeeper是一个开源的分布式协调服务,用于维护集群的状态信息、进行领导者选举以及协调分布式应用程序的工作。Kafka利用ZooKeeper来管理集群中的节点、配置信息和分区分配等关键任务,确保集群的稳定运行和可靠性。

先引入依赖:

  <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.7.0</version></dependency>

一、生产者

public class Producer {public static void main(String[] args) {// 设置Kafka生产者的配置Properties props = new Properties();// Kafka集群的地址props.put("bootstrap.servers", "192.168.13.133:9092,192.168.13.133:9093,192.168.13.133:9094");// 确认模式:全部副本确认props.put("acks", "all");props.put("retries", 2);// 键的序列化器props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 创建Kafka生产者实例org.apache.kafka.clients.producer.Producer<String, String> producer = new KafkaProducer<>(props);// 发送10条消息到主题for (int i = 0; i < 10; i++) {// send异步发送 ProducerRecord参数: 注意  key value【消息是键值对形式】producer.send(new ProducerRecord<String, String>("hac", Integer.toString(i), Integer.toString(i)));}// 关闭生产者实例producer.close();}
}

二、消费者

public class Consumer {public static void main(String[] args) {// 创建消费者配置Properties props = new Properties();props.setProperty("bootstrap.servers", "192.168.13.133:9092,192.168.13.133:9093,192.168.13.133:9094");// 消费者主props.setProperty("group.id", "groupId1"); // 消费者组ID// 是否开启自动提交偏移量props.setProperty("enable.auto.commit", "true");// 自动提交偏移量的间隔时间props.setProperty("auto.commit.interval.ms", "1000");props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 创建Kafka消费者实例consumer.subscribe(Arrays.asList("hac"));// 订阅主题 可以订阅多个主题while (true) {// 从服务器拉取消息记录ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));// 遍历接收到的消息记录for (ConsumerRecord<String, String> record : records) {// 输出消息的偏移量、键和值System.out.println("接受到的消息: " + record.key() + ":" + record.value());}}}
}

三、效果

在启动Kafka消费者之前,需要确保消费者能够连接到可用的Kafka集群,并正确地订阅了所需的主题。一旦消费者启动并成功订阅了主题,它将持续监听并处理来自Kafka集群的消息。在此期间,消费者将与集群保持连接,并持续从指定的主题中拉取消息进行处理。当生产者向所订阅的主题发送新消息时,消费者将立即收到这些消息,并进行相应的处理。在这里插入图片描述

KafkaTemplate @KafkaListener

KafkaTemplate@KafkaListener是Spring Kafka提供的两个核心组件,用于简化在Spring应用程序中与Apache Kafka集成的过程。

第一步:引入依赖

 <!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency>

第二步:配置application.yml文件

spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.13.133:9092,192.168.13.133:9093,192.168.13.133:9094producer:retries: 3key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializeracks: 1consumer:group-id: groupId1key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

第三步:使用

KafkaTemplate:KafkaTemplate是Spring Kafka提供的一个工具类,用于简化向Kafka发送消息的过程。通过KafkaTemplate,可以方便地将消息发送到指定的Kafka主题。它封装了Kafka的Producer API,提供了一系列发送消息的方法,包括同步发送、异步发送、带回调函数的发送等。使用KafkaTemplate,你可以在Spring应用程序中轻松地发送消息到Kafka集群中。

@KafkaListener:@KafkaListener注解用于标记一个方法,表示这个方法是一个Kafka消息监听器。通过在方法上使用@KafkaListener注解,可以让Spring容器自动创建Kafka消息监听器并订阅指定的主题,当有消息到达时,自动调用标记了@KafkaListener注解的方法进行消息处理。

生产者:

@RestController
@RequestMapping(value = "/kafka")
public class SendController {@Autowiredprivate KafkaTemplate kafkaTemplate;@GetMapping(value = "/send")public String send() {String msg = "hello"; //这里写固定的测试一下String topic = "hac";kafkaTemplate.send(topic, msg);return "OK";}
}

消费者:

@Component 
public class KafkaListenerMessage {/**** 监听新消息*/@KafkaListener(topics = "hac", groupId = "groupId1") public void listener(ConsumerRecord<String, String> record) {String value = record.value();int partition = record.partition();long offset = record.offset();System.out.println("value:" + value + ",partition:" + partition + ",offset:" + offset);}
}

效果:
在这里插入图片描述


❤觉得有用的可以留个关注❤

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/10048.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深入了解 Flask Request

文章目录 获取请求数据获取请求信息文件上传总结 Flask 是一个轻量级的 Python Web 框架&#xff0c;其简洁的设计和灵活的扩展性使其成为了许多开发者的首选。在 Flask 中&#xff0c;处理 HTTP 请求是至关重要的&#xff0c;而 Flask 提供了丰富而强大的 request 对象来处理…

SpringAop详解

文章目录 一、Spring自定义注解1、什么是注解&#x1f468;‍&#x1f3eb;2、注解的目的或作用&#x1f49e;3、JDK内置注解&#x1f4ab; 【内置元注解 一共八个固定注解】4、元注解 &#x1f3af;5、自定义注解&#x1f4f8;5、Java反射API和类加载过程51、什么是反射基本原…

GT资源-Clock资源

一、Transmitter 时钟分布 XCLK&#xff1a;在使用TX buffer的模式下&#xff0c;XCLK来源于TXOUTCLK。在使用TX bypassing的模式下XCLK来源于TXUSERCLK。TXUSRCLK是GTX/GTH中PCS的内部逻辑时钟。TXUSRCLK2是GT Transceiver 用户侧逻辑时钟。 TXUSRCLK与TXUSRCLK2的关系 FPGA …

ECharts系列文章汇总(持续更新中)

ECharts介绍 ECharts是一款基于JavaScript的数据可视化图表库&#xff0c;提供了直观、生动、可交互、可个性化定制的数据可视化图表。以下是关于ECharts的详细介绍&#xff1a; 发展历程&#xff1a; ECharts最初由百度团队开源&#xff0c;并在2018年初捐赠给Apache基金会&…

【C++阅览室】C++之Vector(容器)

目录 vector的介绍 vector的使用 vector的定义 vector iterator 的使用 vector 空间增长问题 vector 增删查改 vector 迭代器失效问题。&#xff08;重点&#xff09; vector的介绍 1、 vector 是表示可变大小数组的序列容器&#xff0c;可以使用连…

大语言模型的后处理

后处理的输入 常规意义上的大模型处理流程 import torch from transformers import LlamaForCausalLM, LlamaTokenizer# 加载模型和tokenizer model LlamaForCausalLM.from_pretrained("decapoda-research/llama-7b-hf") tokenizer LlamaTokenizer.from_pretrain…

【0002day】citespace知网教程

文章目录 1.建立路径2.数据转换3.数据分析 citespace这个也可以用来分析研究方向。 1.建立路径 首先建立四个文件夹。 2.数据转换 这一步需要导出知网数据&#xff0c;然后还要转换数据。 首先需要选中数据。 导出数据&#xff0c;refworks 将数据下载到input里。 转换…

Linux 磁盘分区工具 gdisk / fdisk

fdisk 是传统的 Linux 磁盘分区工具&#xff0c;磁盘容量有2T的大小限制&#xff1b;gdisk 又叫 GPT fdisk, 作为 fdisk 的升级版&#xff0c;主要使用的是GPT分区类型&#xff0c;用来划分容量大于2T的硬盘&#xff0c;本文介绍使用方法。 简介 早期的磁盘使用 fdisk 工具分区…

C++ 多态 - 下

目录 1. 多态的原理 1.1. 虚函数表 1.2. 多态原理 1.3. 静态绑定和动态绑定 1.3.1. 运行时决议 1.3.2. 编译时决议 1.4. 为什么基类的对象调用虚函数不能构成多态 2. 单继承中的虚函数表 2.1. 同类型对象的虚表 2.2. 单继承的对象的虚表 2.2.1. 内存窗口查看 2.2.2…

[CISCN 2018]sm

目录 1.题目 3.解题 4.参考 1.题目 题目链接 from Crypto.Util.number import getPrime,long_to_bytes,bytes_to_long from Crypto.Cipher import AES import hashlib from random import randint def gen512num():order[]while len(order)!512:tmprandint(1,512)if tmp n…

【送书福利第六期】Java开发的150多个坑,你踩过几个?(文末送书)

文章目录 做Java开发别掉坑里还不知道 程序员为什么会掉到坑里却不自知&#xff1f;第一是意识不到坑的存在。第二是有些 bug 或问题只在特定情况下暴露。第三是变化不明显的性能问题。 《Java开发坑点解析&#xff1a;从根因分析到最佳实践》Java 开发完美避坑指南结语 &#…

2010-2022年ESA_ CCI-LC数据集下载

扫描文末二维码&#xff0c;关注微信公众号&#xff1a;ThsPool 后台回复 g009&#xff0c;领取 2010-2022年300m分辨率 ESA_ CCI-LC 数据集 哥白尼气候数据集&#xff1a;土地利用和土地覆盖研究的宝贵资源 &#x1f30d;&#x1f50d; 土地利用和土地覆盖变化是全球变化研究…

纯血鸿蒙APP实战开发——自定义安全键盘案例

介绍 金融类应用在密码输入时&#xff0c;一般会使用自定义安全键盘。本示例介绍如何使用TextInput组件实现自定义安全键盘场景&#xff0c;主要包括TextInput.customKeyboard绑定自定义键盘、自定义键盘布局和状态更新等知识点。 效果图预览 实现思路 1. 使用TextInput的cu…

docker安装nginx支持ssl 实现https访问(完整版)

全文目录,一步到位 1.前言简介1.1 专栏传送门1.1.1 本文简介 2. docker安装nginx支持ssl2.0 准备ssl证书(例: 阿里云)2.0.1 配置域名解析2.0.2 找到数字证书管理服务并签发ssl证书2.0.3 选择默认证书 填写域名 创建2.0.4 提交审核, 签发成功2.0.5 解压并上传到宿主机ssl路径下 …

异常处理/__LINE__ 与 __FILE__ 宏在调试和异常处理中的高级使用

文章目录 概述痛点分析_LINE_ 代码所在行号_LINE_ 直接转为字符串_LINE_ 作为整型数据使用_LINE_标记宏函数的调用位置 _FILE_ 代码所在文件名简单实验不期望 _FILE_ 宏代表全路径 assert 使用了 _FILE_ 和 _LINE_借助TLS技术小结 概述 _LINE_和_FILE_是C/C中的预定义宏&#…

HarmonyOS开发案例:【生活健康app之编写通用工具类】(5)

本节将介绍日志打印、时间换算等通用工具类的编写和使用&#xff0c;工具类可以简化应用代码编写和业务流程处理。 日志类 日志类Logger旨在提供一个全局的日志打印、日志管理的地方&#xff0c;既可以规范整个应用的日志打印&#xff0c;也方便日后对日志工具类进行修改&…

利用106短信群发平台能否提升沟通效率?

利用106短信群发平台确实能够显著提升沟通效率&#xff0c;具体体现在以下几个方面&#xff1a; 1.快速传递信息&#xff1a;106短信群发平台能够实现信息的快速传递。一旦设置好发送内容和接收群体&#xff0c;短信便能在瞬间发送至大量用户。这种即时性确保了信息的迅速传达…

正点原子[第二期]Linux之ARM(MX6U)裸机篇学习笔记-15.4讲 GPIO中断实验-IRQ中断服务函数详解

前言&#xff1a; 本文是根据哔哩哔哩网站上“正点原子[第二期]Linux之ARM&#xff08;MX6U&#xff09;裸机篇”视频的学习笔记&#xff0c;在这里会记录下正点原子 I.MX6ULL 开发板的配套视频教程所作的实验和学习笔记内容。本文大量引用了正点原子教学视频和链接中的内容。…

java对象互换工具类

1:将Object类型转成json字符串 /*** 将对象转为字符串* param obj* return*/public static String toString(Object obj) {if(obj null) {return null;}if ("".equals(obj.toString())) {return null;}if (obj instanceof String) {return obj.toString();}try {Ob…