流量对于网站盈利/西安网络seo公司

流量对于网站盈利,西安网络seo公司,2008 .net 网站 目录 权限管理,做国外直播网站1、概述 最近感觉上班实在是太无聊,打算给大家分享一下Kafka的使用,本篇文章首先给大家分享三种方式搭建Kafka环境,接着给大家介绍kafka核心的基础概念以及Java API的使用,最后分享一个SpringBoot的集成案例,希望对大…

1、概述

最近感觉上班实在是太无聊,打算给大家分享一下Kafka的使用,本篇文章首先给大家分享三种方式搭建Kafka环境,接着给大家介绍kafka核心的基础概念以及Java API的使用,最后分享一个SpringBoot的集成案例,希望对大家有所帮助

2、环境搭建

2.1、安装包安装

关于环境搭建这块我们先来通过手动下载安装包的方式来完成,首先下载安装包,这是我使用的环境是CentOS7。

下载的地址 :

https://archive.apache.org/dist/kafka/3.7.0/kafka_2.13-3.7.0.tgz

可以使用wget 工具下载 

下载完成后我们解压,这里我是用的路径是 /usr/local 解压后如下图所示:

来到bin路径下,我们需要先启动zookeeper 然后再启动kafka,相关命令如下

# 启动zookeeper
./zookeeper-server-start.sh ../config/zookeeper.properties &# 启动 kafka
./kafka-server-start.sh ../config/server.properties &
启动后 我们可以查看进程

kafka 已经启动成功了

2.2、Kraft 方式启动 kafka

上面我们使用zookeeper的方式运行了Kafka,kafka从2.8的版本就引入了KRaft模式,主要是用来取代zookeeper来管理元数据,下面我们来试试使用KRaft的方式来启动kafka

# 停掉kafka
./kafka-server-stop.sh ../config/server.properties# 停掉zookeeper
./zookeeper-server-stop.sh ../config/zookeeper.properties# 生成Cluster UUID
./kafka-storage.sh random-uuid

这里我们能需要记录下 生成的这个uuid值

# 格式化日志目录
./kafka-storage.sh format -t y__hXaR2QVKaehk5FiNLfQ -c ../config/kraft/server.properties

接着启动kafka

# 启动Kafka
./kafka-server-start.sh ../config/kraft/server.properties &

2.3、 Docker 安装

相信大家一定喜欢这种方式,直接使用 Docker 一把梭哈

# 拉取Kafka镜像
docker pull apache/kafka:3.7.0# 启动Kafka容器
docker run -p 9092:9092 apache/kafka:3.7.0# 复制出来一份配置文件
docker cp 7434ce960297:/etc/kafka/docker/server.properties /opt/docker/kafka

然后我们需要修改配置文件

修改完成后我们使用以下命令启动kafka

docker run -d  \
-v /opt/docker/kafka:/mnt/shared/config  \
-v /opt/data/kafka-data:/mnt/kafka-data  \
-p 9092:9092  \
--name kafka-container  \
apache/kafka:3.7.0

完成之后我们可以使用客户端工具连接试试 

 至此我们的环境搭建完成了。

3、基础概念

3.1、Topic & event 简述

在kafka中 消息(event) 被组织并持久地存储在Topic中。非常简单地说,Topic 类似于文件系统中的一个文件夹,而事件就是该文件夹中的文件。这既是官方给出的Topic和event的定义

https://kafka.apache.org/37/documentation.html#introduction

接下来我们来创建一个主题,首先我们来到 kafka 安装目录的 bin 目录下

# 创建一个名为 tianlongbabu的主题 
./kafka-topics.sh --create --topic tianlongbabu --bootstrap-server 192.168.200.100:9092# 列出所有的主题(在主机上操作可以使用 localhost)
./kafka-topics.sh --list --bootstrap-server localhost:9092

 创建好了之后 我们可以回到客户端工具查看

同样的 我们可以直接在这个客户端上删除这个 topic 

3.2、生产消息和消费消息

我们接下来看看 怎么往主题中写入事件(消息)

# 在主机上 指定topic  连接到kafka服务端
./kafka-console-producer.sh --topic tianlongbabu  \
--bootstrap-server 192.168.200.100:9092

连接上之后 就可以发送事件了

同样的我们新开一个终端  使用 kafka-console-consumer.sh 这个脚本 就可以消费topic中的数据了

## 主机上操作  可以使用localhost 
## --from-beginning 表示从kafka最早的消息开始消费./kafka-console-consumer.sh --topic tianlongbabu  \
--from-beginning --bootstrap-server localhost:9092

我们连接上之后 就会打印出刚刚发送的事件(消息)了。

3.3、关于事件的组成

看到这里 相信大家对事件有了一定的了解,关于事件这里给出一段官方文档上的原文

我们 从这段话中至少可以知道

1、event 在文档中也被称为记录(record)或消息(message) 

2、 当你向Kafka读写数据时,采用的是事件形式

3、概念上,一个事件包含键(key)、值(value)、时间戳(timestamp),以及可选的元数据(metadata)。

所以一个事件可以设置一个key, 那么你可能会问key 是干什么用的呢,作用是什么呢?这个会在后面的编码的章节中给出解释,大家先知道有这个概念就行了

3.4、关于 Partition (分区)

先来给出一张图,出自官方文档的主要术语解释的章节

从这张图中我们可以看到,topic 中存在多个 partition(分区),也就是说事件其实都是被保存在topic中的分区里,并且一个topic 可以创建多个分区。

消息在分区中以追加的方式存储,每条消息都有一个唯一的偏移量(offset),表示它在分区中的位置。

那么分区的作用是什么呢,我自己根据文档上描述主要总结了4点

1、分区允许多个消费者并行消费同一主题中的消息,不同的消费者可以消费不同的分区,从而提高系统的吞吐量

2、Kafka会根据配置将消息均匀地分布到各个分区,确保负载在多个消费者之间均匀分配

3、在同一分区内,消息的顺序是有保障的。这意味着对于同一键的所有消息都会被发送到同一分区,从而保持顺序

4、通过增加分区数,可以扩展主题的并发能力和存储能力。Kafka支持动态增加分区,但请注意,这可能影响到现有数据的顺序。

这几点大家先了解即可。

4、入门程序开发 

4.1、新建项目引入依赖

新建项目,添加 kafka 客户端依赖

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.7.0</version></dependency>

4.2、编写生产者

我们先来创建消息生产者的代码

public class KafkaProducerTest {public static void main(String[] args) {// Kafka配置Properties props = new Properties();props.put("bootstrap.servers", "192.168.200.100:9092"); // Kafka服务器地址props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 创建Kafka生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);String topic = "tianlongbabu"; // 主题名称String key = "gaibang"; // 消息键String value = "乔峰"; // 消息值// 发送消息ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);producer.send(record, (RecordMetadata metadata, Exception e) -> {if (e != null) {e.printStackTrace();} else {System.out.println("Sent message with key: " + key + ", value: " + value + ", to partition: " + metadata.partition() + ", offset: " + metadata.offset());}});// 关闭生产者producer.close();}
}

 上述代码的写法  在org.apache.kafka.clients.producer.KafkaProducer 类的注释中有详细的说明

这个简单的入门案例中用到了前面给大家介绍的key(事件键)了,看了这个案例大家应该能够明白是什么意思了吧。这个时候 我们运行main方法 就能把测试数据写入到 topic 中了 

4.3、编写消费者

相关代码如下:

public static void main(String[] args) {// Kafka配置Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.100:9092"); // Kafka服务器地址props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // 消费者组IDprops.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 创建Kafka消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);String topic = "tianlongbabu"; // 主题名称// 订阅主题consumer.subscribe(Collections.singletonList(topic));// 不断消费消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("Consumed message with key: %s, value: %s, from partition: %d, offset: %d%n",record.key(), record.value(), record.partition(), record.offset());}}// 关闭消费者(通常不会到达这里)// consumer.close();}

消费者的代码里我们需要指定一个消费组 id ,当存在多个消费者服务的时候 需要为他们各自的消费组id。

我们运行main方法即可收到刚才发送的消息了。

到这里一个简单的生产-消费的案例已经结束了,相信你对Kafka也有了一个初步的认知。

4.4、关于消费组ID

上述入门程序的消费者程序中有一个消费组id 。在 Kafka 中,消费组 ID(Consumer Group ID)是一个重要的概念,用于标识一组协同工作的消费者。

简单来说就是:

1、对于一个特定的主题,如果多个消费者属于同一个消费组,那么主题中的每条消息只会被该消费组内的一个消费者消费。
2、如果消费者属于不同的消费组,则它们可以独立消费同一主题的消息,并且不会互相影响

5、SpringBoot集成

5.1、相关依赖

SpringBoot版本:

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.9</version><relativePath/> <!-- lookup parent from repository --></parent>

项目相关依赖: 

<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- Spring Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Spring Boot Starter Web (optional, if you want to create a REST API) --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Add other dependencies as needed -->
</dependencies>

5.2、配置详解

我们在applicatio.properties  添加以下配置

spring.kafka.bootstrap-servers=192.168.200.100:9092
## 消费组
spring.kafka.consumer.group-id=my-group
## 从最早的消息开始消费
spring.kafka.consumer.auto-offset-reset=earliest

5.3、编写生产者

@Service
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);}
}

5.4、编写消费者

@Service
public class KafkaConsumer {@KafkaListener(topics = "tianlongbabu", groupId = "my-group")public void listen(String message) {System.out.println("Received Message: " + message);}
}

5.5、测试

@RestController
public class KafkaController {@Autowiredprivate KafkaProducer kafkaProducer;@PostMapping("/send")public void sendMessage(@RequestBody String message) {kafkaProducer.sendMessage("tianlongbabu", message);}
}

启动类

@SpringBootApplication
public class BootKafkaApplication {public static void main(String[] args) {SpringApplication.run(BootKafkaApplication.class, args);}}

启动服务,由于我们配置的是earliest所以会打印出 topic中之前的消息

我们可以继续使用postman发送消息 ,然后观察控制台的输出

 

好了一个简单的消息生产和消费的流程就是这样了

6、Kafka 的应用

今天这篇文章主要给大家介绍了Kafka的一些基础知识,并且实现了SpringBoot快速集成的案例,大家后续可以使用上述案例作为模板添加自己的业务代码。后续我也会分享一些 Kafka工程化落地的实战案例在我的微信公众号(代码洁癖症患者)上,感兴趣的小伙伴可以去查阅

最后我们聊聊Kafka的应用场景

首先 Kafka 可以作为高吞吐量的消息队列,支持异步通信。生产者将消息发送到主题,消费者从主题中读取消息,适合实现解耦的微服务架构。

其次 Kafka 可以收集各个服务或应用的日志信息,并将其集中存储,以便后续的分析和监控。

在大数据领域 使用 Kafka 结合流处理框架(如 Apache Flink、Apache Spark Streaming),可以实时处理和分析数据流,适用于实时监控、欺诈检测等场景。

在数据集成领域 Kafka 可以作为不同数据源之间的数据桥梁,实现数据的实时传输和整合,常用于 ETL(提取、转换、加载)流程。

这里仅仅列出几个比较常用的场景,还有很多领域都可以见到Kafka的身影。

好了,纸上得来终觉浅,大家赶紧动手试试吧

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/54942.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ubuntu的基本用法与指令(为后面学习ROS打基础)

目录 0.声明&#xff1a;此博客的部分内容来自B站up主 机器人工匠阿杰&#xff0c;欢迎大家前往up主视频区学习&#xff08;本人正在跟随此up主的视频学习无人机的部分相关知识&#xff09; 1.win空格&#xff08;切换中英文&#xff09; 2.终端指令 1.ls&#xff1a;显示主…

数据结构-3.4.队列的基本概念

一.队列的定义&#xff1a; 1.图解&#xff1a; 2.重要术语&#xff1a; 空队列&#xff1a;队列中不含任何元素。 二.队列的基本操作&#xff1a; 三.总结&#xff1a;

神经网络(一):神经网络入门

文章目录 一、神经网络1.1神经元结构1.2单层神经网络&#xff1a;单层感知机1.3两层神经网络&#xff1a;多层感知机1.4多层神经网络 二、全连接神经网络2.1基本结构2.2激活函数、前向传播、反向传播、损失函数2.2.1激活函数的意义2.2.2前向传播2.2.3损失函数、反向传播2.2.4梯…

NLP 文本分类任务核心梳理

解决思路 分解为多个独立二分类任务将多标签分类转化为多分类问题更换 loss 直接由模型进行多标签分类 数据稀疏问题 标注更多数据&#xff0c;核心解决方案&#xff1a; 自己构造训练样本 数据增强&#xff0c;如使用 chatGPT 来构造数据更换模型 减少数据需求增加规则弥补…

LaTex符号不好记忆?

总结在Matlab中常用的LaTeX符号如下&#xff1a; 1. **希腊字母**&#xff1a; - \alpha 表示 α - \beta 表示 β - \gamma 表示 γ - \delta 表示 δ - \epsilon 表示 ε - \zeta 表示 ζ - \eta 表示 η - \theta 表示 θ - \iota 表示 ι -…

安全的价值:构建现代企业的基础

物理安全对于组织来说并不是事后才考虑的问题&#xff1a;它是关键的基础设施。零售商、医疗保健提供商、市政当局、学校和所有其他类型的组织都依赖安全系统来保障其人员和场所的安全。 随着安全技术能力的不断发展&#xff0c;许多组织正在以更广泛的视角看待他们的投资&am…

Codeforces Round 975 (Div. 2)

传送门&#xff1a;https://codeforces.com/contest/2019 B. All Pairs Segments 题意&#xff1a; 首先样例解释一下&#xff1a; 一共有&#xff1a;[1,2],[1,3],[1,5],[1,6],[1,7],[2,3],[2,5],[2,6],[2,7],[3,5],[3,6],[3,7],[5,6],[5,7],[6,7] 点 1&#xff0c;7 在5个…

Mac电脑上最简单安装Python的方式

背景 最近换了一台新的 MacBook Air 电脑&#xff0c;所有的开发软件都没有了&#xff0c;需要重新配环境&#xff0c;而我现在最常用的开发程序就是Python。这篇文章记录一下我新Mac电脑安装Python的全过程&#xff0c;也给大家一些思路上的提醒。 以下是我新电脑的配置&…

Django项目配置日志

需求 在Django项目中实现控制台输出到日志文件&#xff0c;并且设置固定的大小以及当超过指定大小后覆盖最早的信息。 系统日志 使用Django自带的配置&#xff0c;可以自动记录Django的系统日志。 可以使用logging模块来配置。下面是一个完整的示例代码&#xff0c;展示了如…

xxl-job 适配达梦数据库

前言 在数字化转型的浪潮中&#xff0c;任务调度成为了后端服务不可或缺的一部分。XXL-JOB 是一个轻量级、分布式的任务调度框架&#xff0c;广泛应用于各种业务场景。达梦数据库&#xff08;DM&#xff09;&#xff0c;作为一款国内领先的数据库产品&#xff0c;已经被越来越…

Redis篇(应用案例 - 商户查询缓存)

目录 一、什么是缓存? 二、为什么要使用缓存 三、如何使用缓存 四、添加商户缓存 1. 缓存模型和思路 2. 代码如下 五、缓存更新策略 1. 内存淘汰 2. 超时剔除 3. 主动更新 六、数据库缓存不一致解决方案 1. 数据库缓存不一致解决方案 2. 数据库和缓存不一致采用什…

AMD ROCm™ 安装指南

AMD ROCm™ installation — ROCm Blogs 注意: 本文之前是 AMD 实验笔记博客系列的一部分。 AMD ROCm™ 是第一个面向 HPC/超大规模级 GPU 计算的开源软件开发平台。AMD ROCm™ 将 UNIX 的选择权、极简主义和模块化软件开发哲学引入 GPU 计算领域。有关更多信息&#xff0c;请参…

搜索引擎简介

搜索引擎架构 整个搜索引擎分为三个系统 爬虫系统 索引系统 线上搜素服务 爬虫系统 爬虫分为两个阶段&#xff1a; 第一阶段&#xff1a;根据目标网站的列表页&#xff0c;爬对应的文档 URL 第二阶段&#xff1a;根据文档 URL&#xff0c;下载文档内容 触发器&#xff1…

【行业报告】AI大模型对我国劳动力市场潜在影响研究报告(2024),附PDF下载!!

前言 9月13日&#xff0c;北京大学国家发展研究院联合智联招聘在中国国际服务贸易交易会上发布的《AI大模型对我国劳动力市场潜在影响研究&#xff1a;2024》&#xff08;以下简称“报告”&#xff09;显示&#xff0c;2024年上半年&#xff0c;招聘职位数同比增速前五的人工智…

数据结构——二叉树的性质和存储结构

二叉树的抽象类型定义 基本操作&#xff1a; CreateBiTree(&T&#xff0c;definition) 初始条件&#xff1a;definition给出二叉树T的定义。 操作结果:按definition构造二叉树T。 PreOrderTraverse(T) 初始条件:二叉树T存在。 操作结果:先序遍历T&#xff0c;对每个结…

基于Hive和Hadoop的白酒分析系统

本项目是一个基于大数据技术的白酒分析系统&#xff0c;旨在为用户提供全面的白酒市场信息和深入的价格分析。系统采用 Hadoop 平台进行大规模数据存储和处理&#xff0c;利用 MapReduce 进行数据分析和处理&#xff0c;通过 Sqoop 实现数据的导入导出&#xff0c;以 Spark 为核…

Java五子棋

目录 一&#xff1a;案例要求&#xff1a; 二&#xff1a;代码&#xff1a; 三&#xff1a;结果&#xff1a; 一&#xff1a;案例要求&#xff1a; 实现一个控制台下五子棋的程序。用一个二维数组模拟一个15*15路的五子棋棋盘&#xff0c;把每个元素赋值位“┼”可以画出棋…

通过OpenScada在ARMxy边缘计算网关上实现数字化转型

随着工业4.0概念的普及&#xff0c;数字化转型已成为制造业升级的关键路径之一。在此背景下&#xff0c;边缘计算技术因其能够有效处理大量数据、减少延迟并提高系统响应速度而受到广泛关注。ARMxy边缘计算网关&#xff0c;特别是BL340系列&#xff0c;凭借其强大的性能和灵活的…

SQL高可用优化-优化SQL中distinct和Where条件对索引字段进行非空检查语句

最近做一个需求&#xff0c;关于SQL高可用优化&#xff0c;需要优化项目中的SQL&#xff0c;提升查询效率。 SQL高可用优化 一、优化SQL包含distinct场景二、优化SQL中Where条件中索引字段是否为NULL三、代码验证1. NodeMapper2. NodeService3. NodeController4.数据库数据5.项…

SPI总结

1.前言 1.1 SPI简介 SPI全称Serial Peripheral Interface&#xff0c;串行外设接口&#xff0c;是一种用于连接外设的全双工通信总线。主机和从机支持一对一或一对多通讯连接。 图1 SPI物理层通讯连接 表1 Signal description 1.2 SPI特征 串行&#xff0c;每个时钟周期只传…