Kafka应用Demo:按主题订阅消费消息

安装环境

  Kafka安装可参考官方网站的指导(https://kafka.apache.org/quickstart), 按步骤解压压缩包,修改配置。然后再启动zookeeper和kafka-server即可。

  需要注意的一点:如果是在VMware虚拟机上启动的kafka, 需要修改一下server.properties配置文件,增加如下配置:
在这里插入图片描述
  advertised.listener指定访问kafka的IP和端口,IP设置为虚拟机暴露给外部访问的IP。通过本地代码连接kafka,需要使用该配置

生产者代码样例

public class KafkaProducerService {private static final String NEO_TOPIC = "elon-topic";private KafkaProducer<String, String> producer = null;public KafkaProducerService() {Properties props = new Properties();props.put("bootstrap.servers", "192.168.5.128:9092");props.put("acks", "0");props.put("group.id", "1111");props.put("retries", "2");//设置key和value序列化方式props.put("key.serializer", StringSerializer.class);props.put("value.serializer", StringSerializer.class);//生产者实例producer = new KafkaProducer<>(props);}/*** 外部调用的发消息接口*/public void sendMessage() {for (int i = 0; i < 10; ++i) {int p = i % 2;ProducerRecord<String, String> record = new ProducerRecord(NEO_TOPIC, p, "neo", JSON.toJSONString(i));producer.send(record);}}
}

 发送消息时,将10个数据分别发送到0分区和1分区。

消费者代码样例

public class KafkaConsumerService {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerService.class);private static final String NEO_TOPIC = "elon-topic";Properties properties = new Properties();private KafkaConsumer consumer = null;public KafkaConsumerService() {properties.put("bootstrap.servers","192.168.5.128:9092");  // 指定 Brokerproperties.put("group.id", "neo1");              // 指定消费组群 IDproperties.put("max.poll.records", "5");properties.put("enable.auto.commit", "false");properties.put("key.deserializer", StringDeserializer.class); // 将 key 的字节数组转成 Java 对象properties.put("value.deserializer", StringDeserializer.class);  // 将 value 的字节数组转成 Java 对象consumer = new KafkaConsumer<String, String>(properties);consumer.subscribe(Collections.singletonList(NEO_TOPIC));  // 订阅主题 order-eventsnew Thread(this::receiveMessage).start();}public void receiveMessage() {try {while (true) {synchronized (this) {ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));LOGGER.info("Fetch record num:{}", records.count());for (ConsumerRecord<String,String> record: records) {String info = String.format("[Topic: %s][Partition:%d][Offset:%d][Key:%s][Message:%s]",record.topic(), record.partition(), record.offset(), record.key(), record.value());LOGGER.info("Received:" + info);Thread.sleep(100);}consumer.commitSync();}}} catch (Exception e){} finally {consumer.close();}}

 消费者按主题订阅。从打印的结果可以看到,消费者循环从topic下取出各个分区的消息依次消费。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/7554.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

STM32G0存储器和总线架构

文章目录 前言一、系统架构二、存储器构成三、存储器地址映射四、存储器边界地址五、外设寄存器边界地址 前言 此文章是STM32G0 MCU的学习记录&#xff0c;并非权威&#xff0c;请谨慎参考。 STM32G0主流微控制器基于工作频率可达64 MHz的高性能Arm Cortex-M0 32位RISC内核。该…

观测云 VS ELK:谁是日志监控的王者?

前言 作为 IT 信息系统运行状态感知和故障分析的重要手段&#xff0c;日志在行业兴起之初便为运维和开发环节所广泛应用。当应用和系统发生故障或出现问题时&#xff0c;日志数据成为了排查和诊断问题的重要依据。通过分析日志&#xff0c;开发人员和运维人员可以了解系统的运…

java JMH 学习

JMH 是什么&#xff1f; JMH&#xff08;Java Microbenchmark Harness&#xff09;是一款专用于代码微基准测试的工具集&#xff0c;其主要聚焦于方法层面的基准测试&#xff0c;精度可达纳秒级别。此工具由 Oracle 内部负责实现 JIT 的杰出人士编写&#xff0c;他们对 JIT 及…

PD芯片取电:电子设备的动力之源6020 6500

随着现代电子技术的迅猛发展&#xff0c;电源管理技术在各种电子设备中扮演着越来越重要的角色。特别是近年来&#xff0c;随着USB Power Delivery&#xff08;PD&#xff09;技术的普及&#xff0c;PD芯片取电技术因其高效、灵活和安全的特点&#xff0c;成为了电子设备充电和…

Vue + Element-plus 快速入门

1. 构建项目 npm init vuelatest # 可选项一路回车&#xff0c;使用默认NO,按提示执行3条命令 cd 项目名 npm install npm run dev 2. 下载element-plus npm install element-plus --save 3.替换main.js import { createApp } from vue import ElementPlus from element-plu…

相关性分析

目录 1.交叉功率谱 2. 相关系数 1.交叉功率谱 % 生成两个信号 t 0:0.001:100; x sin(2*pi*1*t)sin(2*pi*2*t); y sin(2*pi*t )sin(2*pi*2*t); % 计算交叉功率谱密度 [Pxy, F] cpsd(x, y, [], [], [], 1/(t(2)-t(1))); % 使用正确的采样频率 % 绘制交叉功率谱密度图 …

ISIS的基本配置

1.IS-IS协议的基本配置&#xff08;1&#xff09; 2.IS-IS协议的基本配置&#xff08;2&#xff09; 3.IS-IS协议的基本配置&#xff08;3&#xff09; 4.案例&#xff1a;IS-IS配置 R1的配置如下&#xff1a; [AR1czy]isis 1 [AR1czy-isis-1]is-level level-1 [AR1czy-isis-…

电磁兼容(EMC):静电放电(ESD)基本原理

目录 1. 静电学简史 2. 摩擦生电原理 3. 总结 静电放电是电磁兼容&#xff08;EMC&#xff09;系列里最让人头疼的问题之一。无论是现在还是未来&#xff0c;静电问题肯定是做产品设计需要重点考虑的问题。这里来聊聊关于静电放电的一些发展历程和基本原理。 1. 静电学简史…

市面上好用的AI工具有哪些?

市面上的AI工具数不胜数&#xff0c;选择合适自己的AI工具则需要考虑自己的需求&#xff0c;看是否能满足的使用需求。那么市面上又有哪些好用的AI工具呢&#xff1f; 泰迪智能科技拥有简单易用的大数据挖掘建模平台&#xff0c;能够让数据创造更大的价值。 功能板块&…

C++实现二叉搜索树(模型)

目录 1.二叉搜索树的概念 2.二叉搜索树的实现 2.1总体代码预览 2.2各个函数实现原理 链表结构体 二叉搜索树的成员变量 二叉搜索树的插入 二叉搜索树的查找 二叉搜索树的遍历 二叉搜索树的删除 1.二叉搜索树的概念 二叉搜索树又称二叉排序树&#xff0c;它或者是一棵空树&#…

字符设备驱动流程

字符设备驱动&#xff1a; linux系统驱动程序分为三大类&#xff0c;字符设备驱动&#xff0c;块设备驱动和网络设备驱动。其中字符设备驱动是使用最多的一种&#xff0c;从点灯到llC&#xff0c;SPI&#xff0c;音频设备等的驱动都是字符设备驱动。块设备和网络设备驱动要比字…

5.12 VUE项目实现Google 第三方登录

VUE项目实现Google 第三方登录 目录一、Google开发者平台配置1. 新建项目2. 配置 OAuth 权限请求页面并选择范围3. 启动API 和 服务 二、 登录代码实现1. 参考Google官网文档2. Google官网代码生成器3. 项目中实装 目录 一、Google开发者平台配置 Google Cloud: https://conso…

盒模型,BFC以及行内块级元素

一.盒模型篇 css基础框盒模型介绍&#xff1a; 当对一个文档进行布局的时候&#xff0c;浏览器的渲染引擎会根据标准之一的css基础框盒模型&#xff0c;将所有元素表示为一个个矩形的盒子&#xff0c;每个盒子由四部分组成&#xff0c;分别是内容 内边距 边框 外边距&#xff…

如何快速搭建nginx虚拟主机

华子目录 实验1&#xff1a;基于IP地址的虚拟主机原理 实验2&#xff1a;基于端口号的虚拟主机原理 实验3&#xff1a;基于域名的虚拟主机原理 实验1&#xff1a;基于IP地址的虚拟主机 原理 如果一台服务器有多个IP地址&#xff0c;而且每个IP地址与服务器上部署的每个网站一一…

beacon-chain+ethereum打镜像及推送镜像

部署详情 1、编写Dockerfile镜像 beacon chain对应Dockerfile文件 # 使用 Ubuntu 20.04 作为基础镜像 FROM ubuntu:20.04# 安装必要的系统库和工具 RUN apt-get update && \apt-get install -y curl && \apt-get clean# 创建存储数据的目录 RUN mkdir -p /dat…

HarmonyOS开发案例:【计算器】

介绍 基于基础组件、容器组件&#xff0c;实现一个支持加减乘除混合运算的计算器。 说明&#xff1a; 由于数字都是双精度浮点数&#xff0c;在计算机中是二进制存储数据的&#xff0c;因此小数和非安全整数&#xff08;超过整数的安全范围[-Math.pow(2, 53)&#xff0c;Math.…

【稳定检索|投稿优惠】2024年新能源技术与环境工程国际会议(ICNTEE 2024)

2024 International Conference on New Energy Technology and Environmental Engineering 一、大会信息 会议名称&#xff1a;2024年新能源技术与环境工程国际会议会议简称&#xff1a;ICNTEE 2024收录检索&#xff1a;提交Ei Compendex,CPCI,CNKI,Google Scholar等会议官网&…

【运维】如何安装ubuntu-24.04? 如何分区?

如何安装ubuntu-24.04&#xff1f;如何分区 经过一系列折腾&#xff0c;我总结了这几点&#xff1a; &#xff08;1&#xff09;在BIOS启动设置里&#xff0c;如果是GPT的硬盘格式&#xff0c;那么对应的就是UEFI的启动方式&#xff1b;如果是MBR的硬盘格式&#xff0c;那么对…

森林消防的新利器:高扬程水泵的应用与优势/恒峰智慧科技

森林是地球上的绿色肺叶&#xff0c;保护森林安全对于维护生态平衡和人类生存环境至关重要。在森林消防领域&#xff0c;高效、快速的灭火设备是保障森林安全的重要武器。近年来&#xff0c;高扬程水泵作为一种新型的消防设备&#xff0c;在森林消防中发挥了重要作用。本文将详…

密室逃脱游戏-第12届蓝桥杯省赛Python真题精选

[导读]&#xff1a;超平老师的Scratch蓝桥杯真题解读系列在推出之后&#xff0c;受到了广大老师和家长的好评&#xff0c;非常感谢各位的认可和厚爱。作为回馈&#xff0c;超平老师计划推出《Python蓝桥杯真题解析100讲》&#xff0c;这是解读系列的第58讲。 密室逃脱游戏&…