Kafka:安装和配置

 producer:发布消息的对象,称为消息产生者 (Kafka topic producer)

topic:Kafka将消息分门别类,每一个消息称为一个主题(topic)

consumer:订阅消息并处理发布消息的对象称为消费者(consumer)

broker:已发布的消息保存在一组服务器中,称为kafka集群,集群中的每一个服务器都是一个代理(broker),消费者(consumer)可以订阅一个或者多个主题(topic),并从broker中拉取数据,从而消费这些已发布的信息。

1、Kafka对zookeeper是一个强依赖,保存Kafka相关的节点数据,所以安装kafka之前要先安装zookeeper

下载镜像

docker pull zookeeper:3.4.14

创建容器

docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14

下载镜像

docker pull wurstmeister/kafka:2.12-2.3.1

创建容器

docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1

2、入门案例

①创建kafka-demo工程并引入依赖

        <!--kafka--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency>

②创建ProducerQuickStart生产者类并实现

package com.heima.kafkademo.sample;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*生产者*/
public class ProducerQuickStart {public static void main(String[] args) {/*1、kafka配置信息*/Properties properties = new Properties();//kafka连接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//发送失败,失败的重试次数properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,5);//key和value的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");/*2、生产对象*/KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);//封装发送消息的对象ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");/*3、发送消息*/producer.send(record);/*4、关闭通道,负责消息发送不成功*/producer.close();}
}

③创建ConsumerQuickStart消费者类并实现

package com.heima.kafkademo.sample;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*消费者*/
public class ConsumerQuickStart {public static void main(String[] args) {/*1、kafka配置信息*/Properties properties = new Properties();//kafka连接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//发送失败,失败的重试次数properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,5);//key和value的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");/*2、消费者对象*/KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);/*3、订阅主题*/consumer.subscribe(Collections.singletonList("itheima-topic"));//当前线程处于一直监听状态while (true){//4、获取消息ConsumerRecords<String,String> consumerRecords =consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : consumerRecords) {System.out.println(record.key());System.out.println(record.value());}}}
}

④运行测试

        成功接收到消息

总结

  • 生产者发送消息,多个消费者订阅同一个主题,只能有一个消费者收到消息(一对一)

  • 生产者发送消息,多个消费者订阅同一个主题,所有消费者都能收到消息(一对多)

下一篇: springboot集成kafka收发消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/36091.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java课题笔记~ ServletContext

单个Servlet的配置对象 web.xml <servlet><servlet-name>FirstServlet</servlet-name><servlet-class>com.ambow.test.FirstServlet</servlet-class><init-param><param-name>charset</param-name><param-value>utf-8&…

centos自动同步北京时间

1、安装ntpdate服务 yum -y install ntpdate 2、加入自动任务计划 查找ntpdate的路径&#xff1a; which ntpdate 复制这个路径。 编辑自动任务计划并加入ntpdate&#xff1a; crontab -e # 每小时第30分钟同步AD域控时间 30 * * * * /usr/sbin/ntpdate -u 192.168.2.8 > …

用户端Web自动化测试-L1

目录&#xff1a; Web自动化测试价值与体系环境安装与使用自动化用例录制自动化测试用例结构分析web浏览器控制常见控件定位方法强制等待与隐式等待常见控件交互方法自动化测试定位策略搜索功能自动化测试用户端Web自动化测试 1.Web自动化测试价值与体系 功能测试场景: UI 自…

AUTOSAR规范与ECU软件开发(基础篇)2.5 AUTOSAR方法论

前言 AUTOSAR方法论(AUTOSAR Methodology) 中车用控制器软件的开发涉及系统级、 ECU级和软件组件级。 系统级主要考虑系统功能需求、 硬件资源、 系统约束, 然后建立系统架构; ECU级根据抽象后的信息对ECU进行配置; 系统级和ECU级设计的同时, 伴随着软件组件级的开发。 上…

Sql server还原失败(数据库正在使用,无法获得对数据库的独占访问权)

一.Sql server还原失败(数据库正在使用,无法获得对数据库的独占访问权) 本次测试使用数据库实例SqlServer2008r2版 错误详细&#xff1a; 标题: Microsoft SQL Server Management Studio ------------------------------ 还原数据库“Mvc_HNHZ”时失败。 (Microsoft.SqlServer.…

《甲午》观后感——GPT-3.5所写

《甲午》是一部令人深思的纪录片&#xff0c;通过生动的画面和真实的故事&#xff0c;向观众展示了中国历史上的一段重要时期。观看这部纪录片&#xff0c;我深受触动&#xff0c;对历史的认识也得到了深化。 首先&#xff0c;这部纪录片通过精心搜集的历史资料和珍贵的影像资料…

低成本搭建NAS,利用HFS进行内网穿透,实现公网访问

通过HFS低成本搭建NAS&#xff0c;并内网穿透实现公网访问 文章目录 通过HFS低成本搭建NAS&#xff0c;并内网穿透实现公网访问前言1.下载安装cpolar1.1 设置HFS访客1.2 虚拟文件系统 2. 使用cpolar建立一条内网穿透数据隧道2.1 保留隧道2.2 隧道名称2.3 成功使用cpolar创建二级…

[保研/考研机试] KY103 2的幂次方 上海交通大学复试上机题 C++实现

题目链接&#xff1a; KY103 2的幂次方 https://www.nowcoder.com/share/jump/437195121691999575955 描述 Every positive number can be presented by the exponential form.For example, 137 2^7 2^3 2^0。 Lets present a^b by the form a(b).Then 137 is present…

MySQL8安装和删除教程 保姆级(Windows)

下载 官网: mysql官网点击Downloads->MySQL Community(GPL) Downloads->MySQL Community Server(或者点击MySQL installer for Windows) Windows下有两种安装方式 在线安装 一般带有 web字样 这个需要联网离线安装 一般没有web字样 安装 下载好之后,版本号可以不一样&…

Postman中,既想传递文件,还想传递多个参数(后端)

需求:既想传文件又想传多个参数可以用以下方式实现

做了这件事,精准拿捏企业资产管理!

资产管理系统是一种为组织和个人提供管理各类资产的重要工具。无论是金融资产还是实物资产&#xff0c;这些都构成了一个实体或个人财务状况的重要组成部分。 无论是企业寻求优化其固定资产维护&#xff0c;还是个人希望更好地管理他们的投资组合&#xff0c;资产管理系统在现代…

NZ系列工具NZ02:VBA读取PDF使用说明

【分享成果&#xff0c;随喜正能量】时光绽放并蒂莲&#xff0c;更是一份殷殷嘱托&#xff0c;更是一份诚挚祝福&#xff0c;是一份时光馈赠&#xff0c;又是一份时光陪伴。。 我的教程一共九套及VBA汉英手册一部&#xff0c;分为初级、中级、高级三大部分。是对VBA的系统讲解…

uniapp+uview封装小程序请求

提要&#xff1a; uniapp项目引入uview库 此步骤不再阐述 1.创建环境文件 env.js&#xff1a; let BASE_URL;if (process.env.NODE_ENV development) {// 开发环境BASE_URL 请求地址; } else {// 生产环境BASE_URL 请求地址; }export default BASE_URL; 2.创建请求文件 该…

QLExpress动态脚本引擎解析工具

介绍 QLExpress脚本引擎 1、线程安全&#xff0c;引擎运算过程中的产生的临时变量都是threadlocal类型。 2、高效执行&#xff0c;比较耗时的脚本编译过程可以缓存在本地机器&#xff0c;运行时的临时变量创建采用了缓冲池的技术&#xff0c;和groovy性能相当。 3、弱类型脚本…

广西Geotrust单位多域名https证书推荐

Geotrust是国际知名CA认证机构&#xff0c;根证书是Digicert&#xff0c;还有RapidSSL、QuickSSL等子品牌&#xff0c;拥有多种类型的多域名https证书&#xff0c;比如OV企业型https证书和EV增强型多域名https证书。那么&#xff0c;哪种多域名https证书更适合企事业单位使用呢…

SpringBoot复习:(43)如何以war包的形式运行SpringBoot程序

一、.pom.xml配置packging为war <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven…

Android 内存泄漏

名词解释 内存泄漏:即memory leak。是指内存空间使用完毕后无法被释放的现象&#xff0c;虽然Java有垃圾回收机制&#xff08;GC&#xff09;&#xff0c;但是对于还保持着引用&#xff0c; 该内存不能再被分配使用&#xff0c;逻辑上却已经不会再用到的对象&#xff0c;垃圾回…

考公-判断推理-定义判断

第九节课 例题 例题 例题 例题 例题 例题 脚一滑&#xff0c;就是工伤&#xff0c;这难道不是操作不当吗 例题 不要较真&#xff0c;公务员&#xff0c;把没有全局观念的人排除在公务员队伍之外 例题 例题 下次看到不字&#xff0c;先给我画上 例题 例题 例题 例题…

力扣63.不同路径II(动态规划)

/*** author Limg* date 2022/08/09* 一个机器人位于一个 m x n 网格的左上角 &#xff08;起始点在下图中标记为 “Start” &#xff09;。* 机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角&#xff08;在下图中标记为 “Finish”&#xff09;。* 现在考虑网…

探讨uniapp的生命周期问题

在uniapp中,生命周期函数分为应用生命周期函数、页面生命周期函数和组件生命周期函数. 1应用声明周期 应用生命周期函数只能在 App.vue 中监听有效&#xff0c;在其他页监听无效。 onLaunch&#xff1a;当uni-app 初始化完成时触发&#xff08;全局只触发一次&#xff09;on…