Kafka的配置和使用

目录

1.服务器用docker安装kafka

2.springboot集成kafka实现生产者和消费者


1.服务器用docker安装kafka

        ①、安装docker(docker类似于linux的软件商店,下载所有应用都能从docker去下载)

                a、自动安装 

curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun

                b、启动docker

sudo systemctl start docker

                c、 通过运行hello-world镜像来验证是否正确安装了Docker Engine-Community。

// 拉取镜像

sudo docker pull hello-world

// 执行

hello-world sudo docker run hello-world

                 d、安装成功

         ②、zookeeper

                a、docker search zookeeper

                b、docker pull zookeeper

        ③、安装kafka

                a、docker search kafka

                b、docker pull wurstmeister/kafka

        ④、运行zookeeper

                a、docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime zookeeper

        ⑤、运行kafka

                a、 docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=42.194.238.131:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://42.194.238.131:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka

                b、参数说明

参数说明:

-e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己

-e KAFKA_ZOOKEEPER_CONNECT=172.21.10.10:2181/kafka 配置zookeeper管理kafka的路径172.21.10.10:2181/kafka

-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.21.10.10:9092 把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP,类如Java程序访问出现无法连接。

-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口

-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间

        ⑥、检验kafka是否可以使用

docker exec -it kafka bash

cd /opt/kafka_2.13-2.8.1/

cd bin

                a、运行kafka生产者并发送消息

./kafka-console-producer.sh --broker-list localhost:9092 --topic test

                b、在开一个页面,运行kafka消费者发送消息

 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

        ⑦、结果是这个样子的

 

         ⑧、每条消息都有一个主题,消费者指定监听哪个主题的消息,如果进来消息队列的是我们指定监听的主题,就消费,否则不消费(topic这里指定的生产和消费的主题)

        ⑨、消费者宕掉了,生产者接着发,消息不会丢,消费者重启之后会重新接收到宕机之后发的所有消息

2.springboot集成kafka实现生产者和消费者

        ①、在pom中创建依赖

<dependency>

        <groupId>org.springframework.kafka</groupId>

        <artifactId>spring-kafka</artifactId>

         <version>2.7.8</version>

</dependency>

        ②、配置kafka

                a、在 application.yml 文件中添加以下配置:(注:yml中两个相同名字的会报错,比如两个spring)

spring:

         kafka:

                #自己的kafka所在的ip地址和端口号

                 bootstrap-servers: localhost:9092

                 consumer:

                 #一个group-id代表一个消费组,一个消息可以被几个消费组消费

                    group-id: my-group

                    auto-offset-reset: earliest

                producer: #序列化

                    value-serializer: org.apache.kafka.common.serialization.StringSerializer

                    key-serializer: org.apache.kafka.common.serialization.StringSerializer

        b、创建一个生产者

@Configuration
public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}}

sendMessage 方法,用于发送消息到 Kafka。

@RestController
public class KafkaController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@PostMapping("/send")public void sendMessage(@RequestBody String message) {kafkaTemplate.send("my-topic", message);}}

        c、 创建一个消费者

@Configuration
@EnableKafka
public class KafkaConsumerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}}

@KafkaListener 注解声明了一个消费者方法,用于接收从 

my-topic 主题中读取的消息

@Service
public class KafkaConsumer {@KafkaListener(topics = "my-topic", groupId = "my-group-id")public void consume(String message) {System.out.println("Received message: " + message);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/25170.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Visual Studio配置PCL库

Visual Studio配置PCL库 Debug和Release配置新建项目配置属性表测试参考 Debug和Release Debug和Release的配置过程一模一样&#xff0c;唯一区别就在于最后一步插入的附加依赖项不同&#xff0c;因此下面以debug为例。 配置新建项目 1、新建一个C空项目&#xff0c;模式设置…

Linux文本三剑客之awk

目录 前言 awk 1.认识awk 2.使用awk 2.1语法 2.2常用命令选项 2.3awk变量 2.3.1内置变量 2.3.2自定义变量 2.4printf命令 awk例题 前言 awk、grep、sed是linux操作文本的三大利器&#xff0c;合称文本三剑客&#xff0c;也是必须掌握的linux命令之一。三者的功能都是…

3维空间下按平面和圆柱面上排版设计

AR空间中将若干平面窗口排列在指定平面或圆柱体面上 平面排版思路 指定平面方向向量layout_centre ,平面上的一点作为排版版面的中心layout_position float3 layout_position = float3(0,0,-10); float3 layout_centre = float3(0,0,1

FreeRTOS源码分析-9 互斥信号量

目录 1 优先级翻转问题 2 互斥信号量概念及其应用 2.2FreeRTOS互斥信号量介绍 2.3FreeRTOS互斥信号量工作原理 3 互斥信号量函数应用 3.1功能分析 3.2API详解 3.3功能实现 4 递归互斥信号量函数应用 4.1死锁现象 ​编辑 4.2API详解 4.3解决死锁 5 互斥信号量实现原…

C++数据结构之平衡二叉搜索树(一)——AVL的实现(zig与zag/左右双旋/3+4重构)

本文目录 00.BBST——平衡二叉搜索树01.AVL树02.AVL的插入2.1单旋——zig 与 zag2.2插入节点后的单旋实例2.3手玩小样例2.4双旋实例2.5小结 03.AVL的删除3.1单旋删除3.2双旋删除3.3小结 04.34重构05.综合评价AVL5.1优点5.2缺点 06.代码注意插入算法删除算法完整代码&#xff1a…

通过有名管道实现AB进程对话

一、要求实现AB进程对话 A进程先发送一句话给B进程&#xff0c;B进程接收后打印B进程再回复一句话给A进程&#xff0c;A进程接收后打印重复1.2步骤&#xff0c;当收到quit后&#xff0c;要结束AB进程 A进程 #include <stdio.h> #include <sys/types.h> #include…

Oracle EBS OM客制化调用API创建销售订单非常慢(FND_FLEX_HASH死锁)

业务场景 由于Oracle EBS标准功的公司间关联交易操作涉及业务节点环节多,需要多个业务部门参考操作完成,浪费人力和花费时间。随着国内集团公司通过业务整合优化,大幅度减少间中很多环节的人为操作,如国内公司间贸易通过类似于客制化出货单申请方式,跨国公司间贸易通过类似…

关于接口测试用例设计的一些思考

接口测试发现的典型问题 传入参数处理不当&#xff0c;引起程序错误类型溢出&#xff0c;导致数据读取和写入不一致对象权限校验出错&#xff0c;可获取其他角色信息状态出错&#xff0c;导致逻辑处理出现问题逻辑校验不完善定时任务执行出错 接口测试用例设计 接口测试用例…

redis入门3-在java中操作redis

Redis的java客户端 Jedis、Lettuce、Redisson、以及spring提供的spring data redis Jedis操作redis //添加依赖 <dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.8.0</version> </dep…

JJWT快速入门

本篇介绍使用 JJWT&#xff08;Java JWT&#xff09;库来生成 JWT Token&#xff0c;步骤如下&#xff1a; 添加依赖&#xff1a; 在项目中添加 JJWT 依赖项。对于 Maven 项目&#xff0c;可以在 pom.xml 文件中添加以下依赖项&#xff1a; <dependency><groupId>…

python解析帆软cpt及frm文件(xml)获取源数据表及下游依赖表

#!/user/bin/evn python import os,re,openpyxl 输入&#xff1a;帆软脚本文件路径输出&#xff1a;帆软文件检查结果Excel#获取来源表 def table_scan(sql_str):# remove the /* */ commentsq re.sub(r"/\*[^*]*\*(?:[^*/][^*]*\*)*/", "", sql_str)# r…

ssh免密rsa登录

1. 本地要执行ssh的机器 ssh-keygen -t rsa # 下面为对应密钥生成地址 公钥&#xff08;id_rsa.pub&#xff09;和私钥&#xff08;id_rsa&#xff09;。 # windows: C:\Users\<your_username>\.ssh # linux: /home/service/.ssh 2. 讲密钥写入到对应文…

基于Hyperledger Fabric+CP-ABE加密的溯源类应用系统

加密机制中有ABE属性加密&#xff0c;属性基加密一般又分为两类&#xff0c;即密钥策略属性基加密&#xff08;KP-ABE&#xff09;和密文策略属性基加密&#xff08;CP-ABE&#xff09;。 今天我们重点说一下cp-abe及如何与fabric区块链技术相结合来建设相关溯源、确权、认证等…

秋招算法备战第39天 | 62.不同路径、63. 不同路径 II

62. 不同路径 - 力扣&#xff08;LeetCode&#xff09; 按照动态规划五部曲走&#xff0c;非常清晰 class Solution:def uniquePaths(self, m: int, n: int) -> int:dp [[0 for _ in range(n)] for _ in range(m)]for i in range(m):dp[i][0] 1for j in range(n):dp[0][…

c++学习(特殊类设计)[30]

只能在堆上创建对象的类 如果你想要确保对象只能在堆上创建&#xff0c;可以通过将析构函数声明为私有&#xff0c;并提供一个静态成员函数来创建对象。这样&#xff0c;类的实例化只能通过调用静态成员函数来完成&#xff0c;而无法直接在栈上创建对象。 以下是一个示例&…

【开源项目--稻草】Day03

【开源项目--稻草】Day03 1. 续Spring-Security1.1 自定义登录界面 2. 用户注册2.1 将注册页面显示2.2 编写控制器进行测试2.3 编写注册业务逻辑2.4 注册功能的收尾 3. VUE3.1 VUE的基本使用3.1.1 什么是VUE 3.2 使用VUEAjax完善稻草问答的注册功能 1. 续Spring-Security 1.1 …

append()函数

go 内置appned函数 append()函数函数说明代码例子 append()函数 函数说明 append函数是Go语言内置的一个非常常用的函数&#xff0c;用于在切片&#xff08;slice&#xff09;末尾追加元素。它的原理比较复杂&#xff0c;涉及到Go语言中切片的底层实现和内存管理&#xff0c;…

Kubespray-offline v2.21.0-1 下载 Kubespray v2.22.1 离线部署 kubernetes v1.25.6

文章目录 1. 目标2. 预备条件3. vcenter 创建虚拟机4. 系统初始化4.1 配置网卡4.2 配置主机名4.3 内核参数 5. 打快照6. 安装 git7. 配置科学8. 安装 docker9. 下载介质9.1 下载安装 docker 介质9.2 下载 kubespray-offline-ansible 介质9.3 下载 kubernetes 介质 10. 搬运介质…

微信小程序 选择年和月以及回显 使用picker-view组件

<!--选择年月--><view bindtap"pickCalendar">{{year}}年{{month}}月</view><picker-view wx:if"{{open}}" class"fixed-select" indicator-style"height: 50px;" style"width: 100%; height: 300px;"…

6.物联网操作系统信号量

一。信号量的概念与应用 信号量定义 FreeRTOS信号量介绍 FreeRTOS信号量工作原理 1.信号量的定义 多任务环境下使用&#xff0c;用来协调多个任务正确合理使用临界资源。 2.FreeRTOS信号量介绍 Semaphore包括Binary&#xff0c;Count&#xff0c;Mutex&#xff1b; Mutex包…