# Kafka_深入探秘者(8):kafka 高级应用

Kafka_深入探秘者(8):kafka 高级应用

一、kafka 消费组管理

1、kafka 命令行工具

参考官网: http://kafka.apache.org/22/documentation.html

2、kafka 消费组管理:查看消费组


# 切换到 kafka 安装目录 
cd /usr/local/kafka/kafka_2.12-2.8.0/
# 或者 
cd /usr/local/kafka/kafka_2.12-2.2.1/# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128):
bin/kafka-consumer-groups.sh --bootstrap-server 172.18.30.110:9092 --listbin/kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --list# 查看消费组命令
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

3、kafka 消费组管理:查看消费组详情


# 切换到 kafka 安装目录 
cd /usr/local/kafka/kafka_2.12-2.8.0/
# 或者 
cd /usr/local/kafka/kafka_2.12-2.2.1/# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128):
bin/kafka-consumer-groups.sh --bootstrap-server 172.18.30.110:9092 --describe --group group.demobin/kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --describe --group group.demo# 查看消费组详情命令(group.demo 是你自己创建的需要查看的组名)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group.demo

4、kafka 消费组管理:查看消费组当前状态


# 切换到 kafka 安装目录 
cd /usr/local/kafka/kafka_2.12-2.8.0/
# 或者 
cd /usr/local/kafka/kafka_2.12-2.2.1/# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128):
bin/kafka-consumer-groups.sh --bootstrap-server 172.18.30.110:9092 --describe --group group.demo --statebin/kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --describe --group group.demo --state# 查看消费组当前状态
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group.demo --state

在这里插入图片描述

5、kafka 消费组管理:查看消费组内成员信息


# 切换到 kafka 安装目录 
cd /usr/local/kafka/kafka_2.12-2.8.0/
# 或者 
cd /usr/local/kafka/kafka_2.12-2.2.1/# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128):
bin/kafka-consumer-groups.sh --bootstrap-server 172.18.30.110:9092 --describe --group group.demo --membersbin/kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --describe --group group.demo --members# 查看消费组内成员信息
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group.demo --members

6、kafka 消费组管理:删除消费组,如果有消费者在使用,则会删除失败。


# 切换到 kafka 安装目录 
cd /usr/local/kafka/kafka_2.12-2.8.0/
# 或者 
cd /usr/local/kafka/kafka_2.12-2.2.1/# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128):
bin/kafka-consumer-groups.sh --bootstrap-server 172.18.30.110 --delete --group group.demobin/kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --delete --group group.demo# 删除消费组,如果有消费者在使用,则会删除失败
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group group.demo

7、kafka 消费位移管理:重置消费位移,如果有消费者在使用,则会重置失败。

1)kafka 重置消费位移命令:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group group.demo --all-topics --reset-offsets --to-earliest --execute

2)命令参数说明:
–all-topics :指定了所有主题。
–reset-offsets :重置消费。
–to-earliest :移到最后
–execute :开始执行命令


# 切换到 kafka 安装目录 
cd /usr/local/kafka/kafka_2.12-2.8.0/
# 或者 
cd /usr/local/kafka/kafka_2.12-2.2.1/# 重置消费位移,如果有消费者在使用,则会重置失败
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group group.demo --all-topics --reset-offsets --to-earliest --execute# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128):
bin/kafka-consumer-groups.sh --bootstrap-server 172.18.30.110:9092 --group group.demo --all-topics --reset-offsets --to-earliest --executebin/kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --group group.demo --all-topics --reset-offsets --to-earliest --execute

在这里插入图片描述

二、kafka 数据管道 Connect 文件系统

1、kafka 数据管道 Connect 文件系统 概述

kafka数据管道Connect文件系统概述.png

1)Kafka 是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析)。为何集成其他系统和解耦应用,经常使用 Producer 来发送消息到 Broker,并使用 Consumer 来消费 Broker 中的消息。Kafka Connect 是到 0.9 版本才提供的并极大的简化了其他系统与 Kafka 的集成。

2)Kafka Connect 运用用户快速定义并实现各种 Connector(File,dbc,Hdfs等),这些功能让大批量数据导入/导出 Kafka 很方便。

3)在 Kafka Connect 中还有两个重要的概念: Task 和 Worker。

4)Connect 中一些概念:

  • 连接器: 实现了 ConnectAPI,决定需要运行多少个任务,按照任务来进行数据复制,从 work 进程获取任务配置并将其传递下去。

  • 任务: 负责将数据移入或移出 Kafka。

  • work 进程: 相当与 connector 和任务的容器,用于负责管理连接器的配置、启动连接器和连接器任务,提供 RESTAPI。

  • 转换器: kafka connect 和其他存储系统直接发送或者接受数据之间转换数据。

2、kafka 独立模式-文件系统

场景
以下示例使用到了两个 Connector,将文件 source.txt 中的内容通过 Source 连接器写入 Kafka 主题中,然后将内容写入 srouce.sink.txt 中。

  • FileStreamSource: 从 source.txt 中读取并发布到 Broker 中。
  • FileStreamSink: 从 Broker 中读取数据并写入到 source.sink.txt 文件中。

3、步骤详情

3.1、首先我们来看下 Worker 进程用到的配置文件

${(KAFKA_HOME}/config/connect-standalone.properties

# 切换到 kafka 安装目录 
cd /usr/local/kafka/kafka_2.12-2.8.0/// Kafka 集群连接的地址(把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110 或:192.168.19.128))
# bootstrap.servers=localhost:9092
bootstrap.servers=172.18.30.110:9092# 打开并编辑 config/connect-standalone.properties 文件
vim config/connect-standalone.properties# 修改以下几项:# 启动端口,注意不能冲突
rest.port=8084// 格式转化类
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter// json消息中是否包含schema
key.converter.schemas.enable=true
value.converter.schemas.enable.true// 保存偏移量的文件路径
offset.storage.file.filename=/tmp/connect.offsets// 设定提交偏移量的频率
offset.flush.interval.ms=10808
3.2 其中的 Source 使用到的配置文件是 $KAFKA_HOME}/config/connect-file-source.properties

# 切换到 kafka 安装目录 
cd /usr/local/kafka/kafka_2.12-2.8.0/# 打开并编辑 config/connect-file-source.properties 文件
vim config/connect-file-source.properties# 修改以下几项://配置连接器的名称
name.local-file-source//连接器的全限定名称,设置类名称也是可以的
connector.class=FileStreamSource// task数量
tasks .max=1//数据源的文件路径(修改为你自己创建source.txt文件实际路径)
file=/tmp/source.txt// 主题名称(修改为你自己创建的主题,如:heima)
# topic=topic0703
topic=heima
3.3 其中的 Sink 使用到的配置文件是 $KAFKA_HOME}/config/connect-file-sink.properties
# 切换到 kafka 安装目录 
cd /usr/local/kafka/kafka_2.12-2.8.0/# 打开并编辑 config/connect-file-sink.properties 文件
vim config/connect-file-sink.properties# 修改以下几项:name=local-file-sink
connectér.class=FileStreamsink
tasks.max=1# 修改为你自己创建sink.txt文件实际路径
#file=/tmp/source.sink.txt
file=/tmp/sink.txt# 修改为你自己创建的主题,如:heima
#topics=topice703
topics=heima
3.4 启动 source 连接器

# 切换到 kafka 安装目录 
cd /usr/local/kafka/kafka_2.12-2.8.0/bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties
3.5 启动 slink 连接器

# 切换到 kafka 安装目录 
cd /usr/local/kafka/kafka_2.12-2.8.0/bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink.properties 
3.6 source 写入文本信息

# 写入消息到 source.txt
echo "hello kafka,coming">> /tmp/source.txt # 查询是否写入成功
less /tmp/source.txt # 同时查询sink.txt 是否有内容传输过来
cat /tmp/sink.txt 

三、kafka springboot+kafka 001

1、打开 idea 创建 artifactId 名为 kafka_spring_learn 的 maven 工程。

--> idea --> File --> New --> Project --> Maven Project SDK: ( 1.8(java version "1.8.0_131" ) --> Next --> Groupld : ( djh.it )Artifactld : ( kafka_spring_learn )Version : 1.0-SNAPSHOT--> Name: ( kafka_spring_learn )Location: ( ...\kafka_spring_learn\ )	--> Finish

2、在 kafka_spring_learn 工程的 pom.xml 文件中导入依赖坐标。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>djh.it</groupId><artifactId>kafka_spring_learn</artifactId><version>1.0-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.8.RELEASE</version>
<!--        <relativePath></relativePath>--></parent><properties><java.version>8</java.version><!--        <scala.version>2.11</scala.version>--><scala.version>2.12</scala.version><slf4j.version>1.7.21</slf4j.version><!--        <kafka.version>2.0.0</kafka.version>--><kafka.version>2.8.0</kafka.version><lombok.version>1.18.8</lombok.version><junit.version>4.11</junit.version><gson.version>2.2.4</gson.version><protobuff.version>1.5.4</protobuff.version><!--        <spark.version>2.3.1</spark.version>--><spark.version>2.4.8</spark.version></properties><dependencies><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.6.RELEASE</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_${scala.version}</artifactId><version>${kafka.version}</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>log4j</groupId><artifactId>log4j</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>${gson.version}</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>${junit.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version><scope>provided</scope></dependency><dependency><groupId>io.protostuff</groupId><artifactId>protostuff-core</artifactId><version>${protobuff.version}</version></dependency><dependency><groupId>io.protostuff</groupId><artifactId>protostuff-runtime</artifactId><version>${protobuff.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.9.4</version></dependency><dependency><groupId>com.fasterxml.jackson.module</groupId><artifactId>jackson-module-scala_2.11</artifactId><version>2.9.5</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_${scala.version}</artifactId><version>${spark.version}</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>log4j</groupId><artifactId>log4j</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_${scala.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_${scala.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${scala.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId><version>${spark.version}</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

3、在 kafka_spring_learn 工程,创建 application.properties 配置文件。


# kafka_spring_learn\src\main\resources\application.propertieslogging.level.root=INFO
# spring.kafka.producer.bootstrap-servers=127.0.0.1:9092# IP 地址换成你自己的虚拟机 IP 地址
#spring.kafka.producer.bootstrap-servers=192.168.19.128:9092
#spring.kafka.consumer.bootstrap-servers=192.168.19.128:9092spring.kafka.producer.bootstrap-servers=172.18.30.110:9092
spring.kafka.consumer.bootstrap-servers=172.18.30.110:9092

4、在 kafka_spring_learn 工程,创建 KafkaLearnApplication.java 启动类文件。


/***  kafka_spring_learn\src\main\java\djh\it\kafkalearn\KafkaLearnApplication.java**  2024-6-24 创建 KafkaLearnApplication.java 启动类文件*/
package djh.it.kafkalearn;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@SpringBootApplication
@RequestMapping
//避免Gson版本冲突快捷配置
@EnableAutoConfiguration(exclude = {org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration.class})
public class KafkaLearnApplication {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaLearnApplication.class);public static void main(String[] args) {SpringApplication.run(KafkaLearnApplication.class, args);}@RequestMapping("/index")public String index(){return "hello,kafka spring comming";}@Autowiredprivate KafkaTemplate template;private static final String topic = "heima";// 消息生产者@GetMapping("/send/{input}")public String sendToKafka(@PathVariable String input){this.template.send(topic, input);return "Send success!  " + input;}//消息的接收@KafkaListener(id ="", topics = topic, groupId = "group.demo")public void listener(String input){LOGGER.info("message input value:{}", input);}
}

5、启动 KafkaLearnApplication.java 启动类,进行测试。

1)浏览器地址栏输入: localhost:8080/index
输出: hello,kafka spring comming

2)浏览器地址栏输入: localhost:8080/send/kafka
输出: Send success! kafka

在这里插入图片描述

四、kafka springboot+kafka 事务 001

1、在 kafka_spring_learn 工程中,修改 application.properties 配置文件,添加事务。


# kafka_spring_learn\src\main\resources\application.propertieslogging.level.root=INFO
# spring.kafka.producer.bootstrap-servers=127.0.0.1:9092# IP 地址换成你自己的虚拟机 IP 地址
#spring.kafka.producer.bootstrap-servers=192.168.19.128:9092
#spring.kafka.consumer.bootstrap-servers=192.168.19.128:9092spring.kafka.producer.bootstrap-servers=172.18.30.110:9092
spring.kafka.consumer.bootstrap-servers=172.18.30.110:9092# 事务的支持
spring.kafka.producer.transaction-id-prefix=kafka_tx.

2、在 kafka_spring_learn 工程中,修改 KafkaLearnApplication.java 启动类,添加事务。


/***  kafka_spring_learn\src\main\java\djh\it\kafkalearn\KafkaLearnApplication.java**  2024-6-24 创建 KafkaLearnApplication.java 启动类文件*/
package djh.it.kafkalearn;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@SpringBootApplication
@RequestMapping
//避免Gson版本冲突快捷配置
@EnableAutoConfiguration(exclude = {org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration.class})
@Transactional
public class KafkaLearnApplication {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaLearnApplication.class);public static void main(String[] args) {SpringApplication.run(KafkaLearnApplication.class, args);}@RequestMapping("/index")public String index(){return "hello,kafka spring comming";}@Autowiredprivate KafkaTemplate template;private static final String topic = "heima";// 消息生产者@GetMapping("/send/{input}")public String sendToKafka(@PathVariable String input){this.template.send(topic, input);//事务的支持template.executeInTransaction(t ->{t.send(topic, input);if("error".equals(input)){throw new RuntimeException("input is error");}t.send(topic, input+" anthor");return true;});return "Send success!  " + input;}//消息的接收@KafkaListener(id ="", topics = topic, groupId = "group.demo")public void listener(String input){LOGGER.info("message input value:{}", input);}
}

3、启动 KafkaLearnApplication.java 启动类,进行测试。

1)浏览器地址栏输入: localhost:8080/send/kafka
输出: Send success! kafka

2)浏览器地址栏输入: localhost:8080/send/error
会抛出异常。

在这里插入图片描述

五、kafka springboot+kafka 事务 002

1、在 kafka_spring_learn 工程中,修改 application.properties 配置文件,添加事务。

# kafka_spring_learn\src\main\resources\application.propertieslogging.level.root=INFO
# spring.kafka.producer.bootstrap-servers=127.0.0.1:9092# IP 地址换成你自己的虚拟机 IP 地址
#spring.kafka.producer.bootstrap-servers=192.168.19.128:9092
#spring.kafka.consumer.bootstrap-servers=192.168.19.128:9092spring.kafka.producer.bootstrap-servers=172.18.30.110:9092
spring.kafka.consumer.bootstrap-servers=172.18.30.110:9092# 事务的支持
spring.kafka.producer.transaction-id-prefix=kafka_tx.

2、在 kafka_spring_learn 工程中,修改 KafkaLearnApplication.java 启动类,添加事务。

/***  kafka_spring_learn\src\main\java\djh\it\kafkalearn\KafkaLearnApplication.java**  2024-6-24 创建 KafkaLearnApplication.java 启动类文件*/
package djh.it.kafkalearn;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@SpringBootApplication
@RequestMapping
//避免Gson版本冲突快捷配置
@EnableAutoConfiguration(exclude = {org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration.class})
@Transactional
public class KafkaLearnApplication {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaLearnApplication.class);public static void main(String[] args) {SpringApplication.run(KafkaLearnApplication.class, args);}@RequestMapping("/index")public String index(){return "hello,kafka spring comming";}@Autowiredprivate KafkaTemplate template;private static final String topic = "heima";// 消息生产者@GetMapping("/send/{input}")public String sendToKafka(@PathVariable String input){this.template.send(topic, input);//事务的支持template.executeInTransaction(t ->{t.send(topic, input);if("error".equals(input)){throw new RuntimeException("input is error");}t.send(topic, input+" anthor");return true;});return "Send success!  " + input;}// 消息生产者2: 演示事务@GetMapping("/send2/{input}")@Transactional(rollbackFor = RuntimeException.class)public String sendToKafkaTransaction(@PathVariable String input){//事务的支持template.send(topic, input);if("error".equals(input)){throw new RuntimeException("input is error2!");}template.send(topic, input + " anthor2");return "Send success2!  " + input;}//消息的接收@KafkaListener(id ="", topics = topic, groupId = "group.demo")public void listener(String input){LOGGER.info("message input value:{}", input);}
}

3、启动 KafkaLearnApplication.java 启动类,进行测试。

1)浏览器地址栏输入: localhost:8080/send2/kafka
输出: Send success2! kafka

2)浏览器地址栏输入: localhost:8080/send2/error
会抛出异常。服务器上重新打开一个客户消费,也收不到消息。

在这里插入图片描述

在这里插入图片描述

4、消息中间件选型对比:

4.1 资料文档

Kafka: 中。有 kafka 作者自己写的书,网上资料也有一些,
rabbitmq: 多。有一些不错的书,网上资料多。
zeromq: 少。没有专门写 zeromg 的书,网上的资料多是一些代码的实现和简单介绍。
rocketmq: 少。没有专门写rocketmq的书,网上的资料良莠不齐,官方文档很简洁,但是对技术细节没有过多的描述。
activemq: 多。没有专门写activemq的书,网上资料多

4.2 开发语言

Kafka: Scala
rabbitmg: Erlang
zeromg:c
rocketmg: java
activemg: java

4.3 支持的协议

Kafka: 自己定义的一套.【基于TCP)
rabbitmg: AMQPI
zeromq: TCP、UDP
rocketmq: 自己定义的一套.
activemq: OpenWire、STOMP、REST、XMPP、AMQP

4.4 消息存储

Kafka: 内存、磁盘、数据库。支持大量堆积。
rocketmq: 磁盘。支持大量堆积。
rabbitmq: 内存、磁盘、支持少量规程。
activemq: 内存、磁盘、数据库。支持少量堆积。
zeromq: 消息发送端的内存或者磁盘中。不支持持久化。

4.5 消息事务

Kafka: 支持
rabbitmq: 支持。客户端将信道设置为事务模式只有当消息被 rabbitMq 接收,事务才能提交成功,否则在捕获异常后进行回滚。使用事务会使得性能有所下降
zeromq: 不支持
rocketmq: 支持
activemq: 支持。

4.6 负载均衡

Kafka: 支持负载均衡。
rabbitmq: 对负载均衡支持不好。
zeromq: 去中心化,不支持负载均衡。本身只是一个多线程网络库。
rocketmq: 支持负载均衡。
activemq: 支持负载均衡。可以基于 zookeeper 实现负载均衡。

4.7 集群方式:

Kafka: 天然的 Leader-Slave 无状态集群。
rabbitmq: 支持简单集群,复制模式,对高级集群模式支持不好。
zeromq: 去中心化,不支持集群。
rocketmq: 常用,多对 Master-Slave 模式。开源版本需手动切换 Slave 模式变成 Master 模式。
activemq: 。支持简单集群模式,比如 主-备,对高级集群模式支持不好。

4.8 管理界面

Kafka: 一般
rabbitmq: 好
zeromq: 无
rocketmq: 无
activemq: 一般

4.9 可用性

Kafka: 非常高(分布式)
rabbitmg: 高(主从)。
zeromg: 高。
rocketmq: 非常高(分布式)
activemq: 高(主从)

4.10 消息重复

Kafka:支持at least once、at most once
rabbitmg:支持at least once、at most once
zeromq:只有重传机制,但是没有持久化,消息丢了重传也没有用。既不是atleastonce、也不是at most
once、更不是exactly only once
rocketmq:支持atleast once
activemq:支持atleast once

4.11 吞吐量 TPS

Kafka: 极大
Kafka 按批次发送消,息和消费消,息。发送端将多个小消,息合并,批量发向 Broker,消费端每次取出一个批次的消,息批量外理。

六、kafka spark+kafka

1、流式处理 Spark

  • Spark 最初诞生于美国加州大学伯克利分校(UCBerkeley)的AMP实验室,是一个可应用于大规模数据处理的快速、通用引擎。

  • 2013年,Spark 加入 Apache 孵化器项目后,开始获得迅猛的发展,如今已成为 Apache 软件基金会最重要的三大分布式计算系统开源项目之一(即 Hadoop、Spark、Storm)。

  • Spark 最初的设计目标是使数据分析更快–不仅运行速度快,也要能快速、容易地编写程序。

  • 为了使程序运行更快,Spark 提供了内存计算,减少了迭代计算时的I0开销;

  • 而为了使编写程序更为容易,Spark 使用简练、优雅的 Scala 语言编写,基于 Scala 提供了交互式的编程体验。

  • 虽然,Hadoop 已成为大数据的事实标准,但其 MapReduce 分布式计算模型仍存在诸多缺陷,

  • 而 Spark 不仅具备 Hadoop MapReduce 所具有的优点,且解决了 Hadoop MapReduce 的缺陷。

  • Spark 正以其结构一体化、功能多元化的优势逐渐成为当今大数据领域最热门的大数据计算平台。

流式处理Spark.png

2、Spark 安装与应用

Spark 官网下载:

http://spark.apache.org/downloads.html
https://spark.apache.org/downloads.html
https://archive.apache.org/dist/spark/

3、下载完成安装包,解压即安装


# 创建目录并切换目录:
mkdir /usr/local/spark/
cd /usr/local/spark/# 上传安装包到服务器
sftp> put # 解压即安装
tar -zxvf apark-2.4.4-bin-hadoop2.7# 配置 JDK 
vim sbin/spark-config.sh # 你的 jdk 安装路径(java 默认安装路径:/usr/lib/jvm/java-8-openjdk-amd64/ )
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64# 切换到 spark 的安装目录
cd /usr/local/spark/apark-2.4.4-bin-hadoop2.7/# 启动 spark 
sbin/start-all.sh starting# 验证
jps -l2819 kafka.Kafka

4、浏览器地址栏输入: http://localhost:8080

http://127.0.0.1:8080
http://172.18.30.110:8080

5、Spark 和 Kafka 整合:

5.1 在 kafka_spring_learn 工程中,创建 Spark 和 Kafka 整合 类 SparkStreamingFromKafka.java
/***  kafka_spring_learn\src\main\java\djh\it\kafkalearn\spark\SparkStreamingFromKafka.java**  2024-6-26 创建 Spark 和 Kafka 整合 类 SparkStreamingFromKafka.java*/
package djh.it.kafkalearn.spark;import lombok.SneakyThrows;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;import scala.Tuple2;import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;public class SparkStreamingFromKafka {@SneakyThrowspublic static void main( String[] args ) {SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName(("SparkStreamingFromKafka"));JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(10));Map<String, Object> kafkaParams = new HashMap<>();kafkaParams.put("bootstrap.servers", "172.18.30.110:9092");kafkaParams.put("key.deserializer", StringDeserializer.class);kafkaParams.put("value.deserializer", StringDeserializer.class);kafkaParams.put("group.id", "sparkStreaming");Collection<String> topics = Arrays.asList("heima");JavaInputDStream<ConsumerRecord<String, String>> javaInputDStream = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topics, kafkaParams));JavaPairDStream<String, String> javaPairDStream = javaInputDStream.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, String> call( ConsumerRecord<String, String> consumerRecord ) throws Exception {return new Tuple2<>(consumerRecord.key(), consumerRecord.value());}});javaPairDStream.foreachRDD(new VoidFunction<JavaPairRDD<String, String>>() {@Overridepublic void call( JavaPairRDD<String, String> javaPairRDD ) throws Exception {javaPairRDD.foreach(new VoidFunction<Tuple2<String, String>>() {@Overridepublic void call( Tuple2<String, String> tuple2 ) throws Exception {System.out.println(tuple2._2);}});}});streamingContext.start();streamingContext.awaitTermination();}
}
5.2、在服务器端打开一个生产消息发送服务,进行消息发送测试。

# 切换到 kafka 安装目录下:
cd /usr/local/kafka/kafka_2.12-2.8.0# 打开生产消息服务:
bin/kafka-console-producer.sh --broker-list 172.18.30.110:9092 --topic heima# 开始发送消息:
>hello nihao
>to^H^H
>producer broker kafka
># 在 idea 启动类控制台这边就会收到 消息,这就是 Spark 流式处理。

Spark流式处理.png

上一节关联链接请点击
# Kafka_深入探秘者(7):kafka 稳定性

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/36338.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

leetCode.91. 解码方法

leetCode.91. 解码方法 题目思路 题解 class Solution { public:int numDecodings(string s) {int n s.size();// dp 中f[0]一般不做使用&#xff0c;只是存一个初值1&#xff0c;表示默认由一种方案s s;vector<int> f( n 1 );f[0] 1;for ( int i 1; i < n;…

【数学】100332. 包含所有 1 的最小矩形面积 II

本文涉及知识点 数学 LeetCode100332. 包含所有 1 的最小矩形面积 II 给你一个二维 二进制 数组 grid。你需要找到 3 个 不重叠、面积 非零 、边在水平方向和竖直方向上的矩形&#xff0c;并且满足 grid 中所有的 1 都在这些矩形的内部。 返回这些矩形面积之和的 最小 可能值…

vant4的组件气泡弹出框van-popover,在列表中遍历后点击一个全部/显示隐藏,解决办法

环境&#xff1a;vue3 vant-ui4 <div v-for"(info, index) in item.infoListVOs" :key"index"><van-popoverv-model:show"showPopover":actions"actions"overlayplacement"bottom-end"select"onSelect(info…

软件工程全套学习培训资料,实际优质项目编制及各类建设方案,信息安全,运维资料

目的&#xff1a;规范系统开发流程&#xff0c;提高系统开发效率。 立项申请需求分析方案设计方案评审开发调整测试阶段系统培训试运行测试验收投入使用 所有文档过去进主页获取。 获取方式&#xff1a;本文末个人名片直接获取。 软件资料清单列表部分文档清单&#xff1a;工作…

AGI 远不止 ChatGPT!一文入门 AGI 通识及应用开发_通向agi之路网站使用什么开发的网站

AI 大语言模型进入爆发阶段 2022 年 12 月 ChatGPT 突然爆火&#xff0c;原因是其表现出来的智能化已经远远突破了我们的常规认知。虽然其呈现在使用者面前仅仅只是一个简单的对话问答形式&#xff0c;但是它的内容化水平非常强大&#xff0c;甚至在某些方面已经超过人类了&am…

程序的调试技术,设置断点

断点&#xff08;break point&#xff09;是指在代码中指定位置&#xff0c;当程序运行到此位置时变中断下来&#xff0c;并让开发者可查看此时各变量的值。因断点中断的程序并没有结束&#xff0c;可以选择继续执行。 在程序的调试过程中&#xff0c;设置断点是一个很有用的分…

可用的搜索引擎

presearchhttps://presearch.com/yandexhttps://ya.ru

书归正传,说说颍川士族

我的非遗项目是《颍川士族传说》&#xff0c;此前做的视频只是触及了边缘&#xff0c;属于气氛的营造&#xff0c;今后就正式转入主题了。 首先说说什么是士族&#xff0c;它有两个同义词&#xff1a;世族和势族。“世”是一代又一代的意思&#xff0c;“势”是权势&#xff0…

python中类的继承详解

面向对象编程 (OOP) 语言的一个主要功能就是“继承”。继承是指这样一种能力&#xff1a;它可以使用现有类的所有功能&#xff0c;并在无需重新编写原来的类的情况下对这些功能进行扩展 &#xff08;1&#xff09;在类的继承中&#xff0c;存在父类跟子类&#xff0c;子类可以继…

【项目实训】数据库内容丰富

经团队讨论&#xff0c;对前端页面展示数据进行了增加&#xff0c;于是相应的修改数据库 经团队成员使用大模型对各公司面试经验中问题的总结优化&#xff0c;我们打算将大模型的回答存储到数据库中&#xff0c;以显示在前端页面 于是在数据库中存储大模型的回答&#xff1a;…

三种三相交流电动机正反转互锁电路的分析

PLC和固态继电器应用都很普及了&#xff0c;常规电磁继电器还有用武之地吗?答案是&#xff1a;有用武之地的。因为微处理器的应用使逻辑控制发生了变革&#xff0c;极大地发挥了开关功能的特性&#xff0c;但在应用中&#xff0c;它还是无法承受较大的负载&#xff0c;因此还要…

Charles网络抓包工具安装和web抓包(一)

目录 概述 抓包工具对比 安装 下载 web抓包配置 按键说明 前言-与正文无关 ​ 生活远不止眼前的苦劳与奔波&#xff0c;它还充满了无数值得我们去体验和珍惜的美好事物。在这个快节奏的世界中&#xff0c;我们往往容易陷入工作的漩涡&#xff0c;忘记了停下脚步&#…

Unity开发者转UE 新手必读

前言 本页面为熟悉Unity的用户概述了 虚幻引擎(UE)。如果你具备一些Unity知识&#xff0c;而且想学习如何运用自己所学的知识在虚幻引擎中工作&#xff0c;下面各小节将帮助你入门。 下面的截图并排显示了Unity和虚幻编辑器。各个区域采用相同的颜色来表示相同的功能。每个区…

用友 U8+ 控制金额、单价等字段权限设置

进入路径 系统服务——权限——数据权限控制设置 本功能是数据权限设置的前提&#xff0c;用户可以根据需要先在数据权限控制设置中选择需要进行权限控制的对象。 数据权限的控制分为记录级和字段级两个层次&#xff0c;对应系统中的两个页签"记录级"和"字段…

LabVIEW在光学与光子学实验室中的应用

光学与光子学实验室致力于光学和光子学前沿领域的研究&#xff0c;涉及超快光学、非线性光学、光纤通信、光子晶体等多个方向。实验室需要高精度的实验控制和数据采集系统&#xff0c;以进行复杂的光学实验&#xff0c;并对实验数据进行实时处理和分析。 项目需求 实时控制与监…

Vue + SpringBoot 实现文件的断点上传、秒传,存储到Minio

一、前端 1. 计算文件的md5值 前端页面使用的elment-plus的el-upload组件。 <el-upload action"#" :multiple"true" :auto-upload"false" :on-change"handleChange" :show-file-list"false"><FileButton content&…

telnet_h3c_ap

import telnetlib import time # 定义交换机参数 HOSTS [10.61.168.x, ]for HOST in HOSTS:tn telnetlib.Telnet(HOST,timeout22)tn.read_until(b"Password: ")tn.write("h3capadmin".encode(ascii) b"\n") ##h3c默认密码time.sleep(0.2)tn.…

JS对象、数组、字符串超详细方法

JavaScript 对象方法 对象创建的方式 对象字面量 var dog1 {name: "大黄",age: 2,speak: function () {console.log("汪汪");}, };使用Object构造函数 var dog2 new Object(); dog2.name "大黄"; dog2.age 2; dog2.speak function () …

BIO、NIO编程深入理解与直接内存、零拷贝

网路编程基本常识 一. Socket 什么是Socket Socket是对网络中不同主机上的应用进程之间进行双向通信的端点的抽象。它提供了应用层进程利用网络协议交换数据的机制&#xff0c;是应用程序与网络协议栈进行交互的接口。 说白了&#xff0c;Socket就是把TCP/IP协议族进行封装…

iOS之如何创建.a静态库

番外&#xff1a;想要查看如何创建.framework静态库可前往看我​​​​​​​iOS之如何创建.framework静态库-CSDN博客这篇文章。 一、创建静态库项目 ①、打开 Xcode 并创建一个新的 Xcode 项目。 ②、选择 "macOS" -> "Framework & Library" -&…