大数据技术之Kafka:一篇文章带你学会Kafka

大数据技术之Kafka:一篇文章带你学会Kafka

第1章Kafka概述

1.1 消息队列

在这里插入图片描述
(1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)点对点模型通常是一个基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,而不是将消息推送到客户端。这个模型的特点是发送到队列的消息被一个且只有一个接收者接收处理,即使有多个消息监听者也是如此。
(2)发布/订阅模式(一对多,数据生产后,推送给所有订阅者)
发布订阅模型则是一个基于推送的消息传送模型。发布订阅模型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接收消息,而持久订阅者则监听主题的所有消息,即使当前订阅者不可用,处于离线状态。

1.2 为什么需要消息队列

1)解耦:
  允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
2)冗余:
消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
3)扩展性:
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。
4)灵活性 & 峰值处理能力:
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
5)可恢复性:
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
6)顺序保证:
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka保证一个Partition内的消息的有序性)
7)缓冲:
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
8)异步通信:
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

1.3 什么是Kafka

在流式计算中,Kafka一般用来缓存数据,Storm通过消费Kafka的数据进行计算。
1)Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目。
2)Kafka最初是由LinkedIn公司开发,并于2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。
3)Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。
4)无论是kafka集群,还是consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性。

1.4 Kafka架构

在这里插入图片描述
在这里插入图片描述
1)Producer :消息生产者,就是向kafka broker发消息的客户端;
2)Consumer :消息消费者,向kafka broker取消息的客户端;
3)Topic :可以理解为一个队列;
4) Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic;
5)Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic;
6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序;
7)Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。

第2章 Kafka集群部署

2.1 环境准备
2.1.1 集群规划

hadoop102					hadoop103				hadoop104
zk							zk						zk
kafka						kafka					kafka

2.1.2 jar包下载
http://kafka.apache.org/downloads.html

在这里插入图片描述
2.2 Kafka集群部署
1)解压安装包

[atguigu@hadoop102 software]$ tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/

2)修改解压后的文件名称

[atguigu@hadoop102 module]$ mv kafka_2.11-0.11.0.0/ kafka

3)在/opt/module/kafka目录下创建logs文件夹

[atguigu@hadoop102 kafka]$ mkdir logs

4)修改配置文件

[atguigu@hadoop102 kafka]$ cd config/
[atguigu@hadoop102 config]$ vi server.properties

输入以下内容:

#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径	
log.dirs=/opt/module/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181

5)配置环境变量

[atguigu@hadoop102 module]$ sudo vi /etc/profile
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
[atguigu@hadoop102 module]$ source /etc/profile

6)分发安装包

[atguigu@hadoop102 module]$ xsync kafka/

注意:分发之后记得配置其他机器的环境变量
7)分别在hadoop103和hadoop104上修改配置文件/opt/module/kafka/config/server.properties中的broker.id=1、broker.id=2
注:broker.id不得重复
8)启动集群

依次在hadoop102、hadoop103、hadoop104节点上启动kafka
[atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh config/server.properties &
[atguigu@hadoop103 kafka]$ bin/kafka-server-start.sh config/server.properties &
[atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh config/server.properties &

9)关闭集群

[atguigu@hadoop102 kafka]$ bin/kafka-server-stop.sh stop
[atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh stop
[atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh stop

2.3 Kafka命令行操作
1)查看当前服务器中的所有topic

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list

2)创建topic

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 \
--create --replication-factor 3 --partitions 1 --topic first

选项说明:
–topic 定义topic名
–replication-factor 定义副本数
–partitions 定义分区数

3)删除topic

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 \
--delete --topic first

需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。
4)发送消息

[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic first
>hello world
>atguigu  atguigu

5)消费消息

[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 --from-beginning --topic first

–from-beginning:会把first主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。
6)查看某个Topic的详情

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 \
--describe --topic first

标题文本样式列表图片链接目录代码片表格注脚注释自定义列表LaTeX 数学公式插入甘特图插入UML图插入Mermaid流程图插入Flowchart流程图插入类图快捷键
标题复制

第3章 Kafka工作流程分析

在这里插入图片描述
3.1 Kafka生产过程分析
3.1.1 写入方式
producer采用推(push)模式将消息发布到broker,每条消息都被追加(append)到分区(patition)中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka吞吐率)。
3.1.2 分区(Partition)
消息发送时都被发送到一个topic,其本质就是一个目录,而topic是由一些Partition Logs(分区日志)组成,其组织结构如下图所示:
在这里插入图片描述
在这里插入图片描述
我们可以看到,每个Partition中的消息都是有序的,生产的消息被不断追加到Partition log上,其中的每一个消息都被赋予了一个唯一的offset值。
1)分区的原因
(1)方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;
(2)可以提高并发,因为可以以Partition为单位读写了。
2)分区的原则
(1)指定了patition,则直接使用;
(2)未指定patition但指定key,通过对key的value进行hash出一个patition;
(3)patition和key都未指定,使用轮询选出一个patition。
DefaultPartitioner类

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (keyBytes == null) {int nextValue = nextValue(topic);List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() > 0) {int part = Utils.toPositive(nextValue) % availablePartitions.size();return availablePartitions.get(part).partition();} else {// no partitions are available, give a non-available partitionreturn Utils.toPositive(nextValue) % numPartitions;}} else {// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}

3.1.3 副本(Replication)
同一个partition可能会有多个replication(对应 server.properties 配置中的 default.replication.factor=N)。没有replication的情况下,一旦broker 宕机,其上所有 patition 的数据都不可被消费,同时producer也不能再将数据存于其上的patition。引入replication之后,同一个partition可能会有多个replication,而这时需要在这些replication之间选出一个leader,producer和consumer只与这个leader交互,其它replication作为follower从leader 中复制数据。
3.1.4 写入流程
producer写入消息流程如下:
在这里插入图片描述
1)producer先从zookeeper的 "/brokers/…/state"节点找到该partition的leader
2)producer将消息发送给该leader
3)leader将消息写入本地log
4)followers从leader pull消息,写入本地log后向leader发送ACK
5)leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)并向producer发送ACK
3.2 Broker 保存消息
3.2.1 存储方式
物理上把topic分成一个或多个patition(对应 server.properties 中的num.partitions=3配置),每个patition物理上对应一个文件夹(该文件夹存储该patition的所有消息和索引文件),如下:

[atguigu@hadoop102 logs]$ ll
drwxrwxr-x. 2 atguigu atguigu  4096 86 14:37 first-0
drwxrwxr-x. 2 atguigu atguigu  4096 86 14:35 first-1
drwxrwxr-x. 2 atguigu atguigu  4096 86 14:37 first-2
[atguigu@hadoop102 logs]$ cd first-0
[atguigu@hadoop102 first-0]$ ll
-rw-rw-r--. 1 atguigu atguigu 10485760 86 14:33 00000000000000000000.index
-rw-rw-r--. 1 atguigu atguigu      219 86 15:07 00000000000000000000.log
-rw-rw-r--. 1 atguigu atguigu 10485756 86 14:33 00000000000000000000.timeindex
-rw-rw-r--. 1 atguigu atguigu        8 86 14:37 leader-epoch-checkpoint

3.2.2 存储策略
无论消息是否被消费,kafka都会保留所有消息。有两种策略可以删除旧数据:
1)基于时间:log.retention.hours=168
2)基于大小:log.retention.bytes=1073741824
需要注意的是,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。

3.2.3 Zookeeper存储结构
在这里插入图片描述
注意:producer不在zk中注册,消费者在zk中注册。
3.3 Kafka消费过程分析
kafka提供了两套consumer API:高级Consumer API和低级Consumer API。
3.3.1 高级API
1)高级API优点
高级API 写起来简单
不需要自行去管理offset,系统通过zookeeper自行管理。
不需要管理分区,副本等情况,.系统自动管理。
消费者断线会自动根据上一次记录在zookeeper中的offset去接着获取数据(默认设置1分钟更新一下zookeeper中存的offset)
可以使用group来区分对同一个topic 的不同程序访问分离开来(不同的group记录不同的offset,这样不同程序读取同一个topic才不会因为offset互相影响)
2)高级API缺点
不能自行控制offset(对于某些特殊需求来说)
不能细化控制如分区、副本、zk等
3.3.2 低级API
1)低级 API 优点
能够让开发者自己控制offset,想从哪里读取就从哪里读取。
自行控制连接分区,对分区自定义进行负载均衡
对zookeeper的依赖性降低(如:offset不一定非要靠zk存储,自行存储offset即可,比如存在文件或者内存中)
2)低级API缺点
太过复杂,需要自行控制offset,连接哪个分区,找到分区leader 等。
3.3.3 消费者组
在这里插入图片描述
消费者是以consumer group消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个topic。每个分区在同一时间只能由group中的一个消费者读取,但是多个group可以同时消费这个partition。在图中,有一个由三个消费者组成的group,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也可以叫做某个消费者是某个分区的拥有者。
在这种情况下,消费者可以通过水平扩展的方式同时读取大量的消息。另外,如果一个消费者失败了,那么其他的group成员会自动负载均衡读取之前失败的消费者读取的分区。
3.3.4 消费方式
consumer采用pull(拉)模式从broker中读取数据。
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
对于Kafka而言,pull模式更合适,它可简化broker的设计,consumer可自主控制消费消息的速率,同时consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直等待数据到达。为了避免这种情况,我们在我们的拉请求中有参数,允许消费者请求在等待数据到达的“长轮询”中进行阻塞(并且可选地等待到给定的字节数,以确保大的传输大小)。
3.3.5 消费者组案例
1)需求:测试同一个消费者组中的消费者,同一时刻只能有一个消费者消费。
2)案例实操
(1)在hadoop102、hadoop103上修改/opt/module/kafka/config/consumer.properties配置文件中的group.id属性为任意组名。

[atguigu@hadoop103 config]$ vi consumer.properties
group.id=atguigu
	(2)在hadoop102、hadoop103上分别启动消费者
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 --topic first --consumer.config config/consumer.properties
[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first --consumer.config config/consumer.properties
(3)在hadoop104上启动生产者
[atguigu@hadoop104 kafka]$ bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic first
>hello world
(4)查看hadoop102和hadoop103的接收者。同一时刻只有一个消费者接收到消息。

第4章 Kafka API实战

4.1 环境准备
1)启动zk和kafka集群,在kafka集群中打开一个消费者

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 --topic first

2)导入pom依赖

<dependencies><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId><version>0.11.0.0</version></dependency>
</dependencies>

4.2 Kafka生产者Java API
4.2.1 创建生产者(过时的API)

package com.atguigu.kafka;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;public class OldProducer {@SuppressWarnings("deprecation")public static void main(String[] args) {Properties properties = new Properties();properties.put("metadata.broker.list", "hadoop102:9092");properties.put("request.required.acks", "1");properties.put("serializer.class", "kafka.serializer.StringEncoder");Producer<Integer, String> producer = new Producer<Integer,String>(new ProducerConfig(properties));KeyedMessage<Integer, String> message = new KeyedMessage<Integer, String>("first", "hello world");producer.send(message );}
}

4.2.2 创建生产者(新API)

package com.atguigu.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;public class NewProducer {public static void main(String[] args) {Properties props = new Properties();// Kafka服务端的主机名和端口号props.put("bootstrap.servers", "hadoop103:9092");// 等待所有副本节点的应答props.put("acks", "all");// 消息发送最大尝试次数props.put("retries", 0);// 一批消息处理大小props.put("batch.size", 16384);// 请求延时props.put("linger.ms", 1);// 发送缓存区内存大小props.put("buffer.memory", 33554432);// key序列化props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// value序列化props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 50; i++) {producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), "hello world-" + i));}producer.close();}
}

4.2.3 创建生产者带回调函数(新API)

package com.atguigu.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;public class CallBackProducer {public static void main(String[] args) {Properties props = new Properties();// Kafka服务端的主机名和端口号props.put("bootstrap.servers", "hadoop103:9092");// 等待所有副本节点的应答props.put("acks", "all");// 消息发送最大尝试次数props.put("retries", 0);// 一批消息处理大小props.put("batch.size", 16384);// 增加服务端请求延时props.put("linger.ms", 1);
// 发送缓存区内存大小props.put("buffer.memory", 33554432);// key序列化props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// value序列化props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);for (int i = 0; i < 50; i++) {kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (metadata != null) {System.err.println(metadata.partition() + "---" + metadata.offset());}}});}kafkaProducer.close();}
}

4.2.4 自定义分区生产者
0)需求:将所有数据存储到topic的第0号分区上
1)定义一个类实现Partitioner接口,重写里面的方法(过时API)

package com.atguigu.kafka;
import java.util.Map;
import kafka.producer.Partitioner;public class CustomPartitioner implements Partitioner {public CustomPartitioner() {super();}@Overridepublic int partition(Object key, int numPartitions) {// 控制分区return 0;}
}
2)自定义分区(新APIpackage com.atguigu.kafka;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;public class CustomPartitioner implements Partitioner {@Overridepublic void configure(Map<String, ?> configs) {}@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 控制分区return 0;}@Overridepublic void close() {}
}

3)在代码中调用

package com.atguigu.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;public class PartitionerProducer {public static void main(String[] args) {Properties props = new Properties();// Kafka服务端的主机名和端口号props.put("bootstrap.servers", "hadoop103:9092");// 等待所有副本节点的应答props.put("acks", "all");// 消息发送最大尝试次数props.put("retries", 0);// 一批消息处理大小props.put("batch.size", 16384);// 增加服务端请求延时props.put("linger.ms", 1);// 发送缓存区内存大小props.put("buffer.memory", 33554432);// key序列化props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// value序列化props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 自定义分区props.put("partitioner.class", "com.atguigu.kafka.CustomPartitioner");Producer<String, String> producer = new KafkaProducer<>(props);producer.send(new ProducerRecord<String, String>("first", "1", "atguigu"));producer.close();}
}

4)测试
(1)在hadoop102上监控/opt/module/kafka/logs/目录下first主题3个分区的log日志动态变化情况

[atguigu@hadoop102 first-0]$ tail -f 00000000000000000000.log
[atguigu@hadoop102 first-1]$ tail -f 00000000000000000000.log
[atguigu@hadoop102 first-2]$ tail -f 00000000000000000000.log
(2)发现数据都存储到指定的分区了。

4.3 Kafka消费者Java API
4.3.1 高级API
0)在控制台创建发送者

[atguigu@hadoop104 kafka]$ bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic first
>hello world

1)创建消费者(过时API)

package com.atguigu.kafka.consume;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;public class CustomConsumer {@SuppressWarnings("deprecation")public static void main(String[] args) {Properties properties = new Properties();properties.put("zookeeper.connect", "hadoop102:2181");properties.put("group.id", "g1");properties.put("zookeeper.session.timeout.ms", "500");properties.put("zookeeper.sync.time.ms", "250");properties.put("auto.commit.interval.ms", "1000");// 创建消费者连接器ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));HashMap<String, Integer> topicCount = new HashMap<>();topicCount.put("first", 1);Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCount);KafkaStream<byte[], byte[]> stream = consumerMap.get("first").get(0);ConsumerIterator<byte[], byte[]> it = stream.iterator();while (it.hasNext()) {System.out.println(new String(it.next().message()));}}
}

2)官方提供案例(自动维护消费情况)(新API)

package com.atguigu.kafka.consume;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;public class CustomNewConsumer {public static void main(String[] args) {Properties props = new Properties();// 定义kakfa 服务的地址,不需要将所有broker指定上 props.put("bootstrap.servers", "hadoop102:9092");// 制定consumer group props.put("group.id", "test");// 是否自动确认offset props.put("enable.auto.commit", "true");// 自动确认offset的时间间隔 props.put("auto.commit.interval.ms", "1000");// key的序列化类props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// value的序列化类 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 定义consumer KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 消费者订阅的topic, 可同时订阅多个 consumer.subscribe(Arrays.asList("first", "second","third"));while (true) {// 读取数据,读取超时时间为100ms ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}
}

4.3.2 低级API
实现使用低级API读取指定topic,指定partition,指定offset的数据。
1)消费者使用低级API 的主要步骤:
在这里插入图片描述
2)方法描述:
在这里插入图片描述
3)代码:

package com.atguigu;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.BrokerEndPoint;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;public class SimpleExample {private List<String> m_replicaBrokers = new ArrayList<>();public SimpleExample() {m_replicaBrokers = new ArrayList<>();}public static void main(String args[]) {SimpleExample example = new SimpleExample();// 最大读取消息数量long maxReads = Long.parseLong("3");// 要订阅的topicString topic = "test1";// 要查找的分区int partition = Integer.parseInt("0");// broker节点的ipList<String> seeds = new ArrayList<>();seeds.add("192.168.9.102");seeds.add("192.168.9.103");seeds.add("192.168.9.104");// 端口int port = Integer.parseInt("9092");try {example.run(maxReads, topic, partition, seeds, port);} catch (Exception e) {System.out.println("Oops:" + e);e.printStackTrace();}}public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {// 获取指定Topic partition的元数据PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);if (metadata == null) {System.out.println("Can't find metadata for Topic and Partition. Exiting");return;}if (metadata.leader() == null) {System.out.println("Can't find Leader for Topic and Partition. Exiting");return;}String leadBroker = metadata.leader().host();String clientName = "Client_" + a_topic + "_" + a_partition;SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);int numErrors = 0;while (a_maxReads > 0) {if (consumer == null) {consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);}FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();FetchResponse fetchResponse = consumer.fetch(req);if (fetchResponse.hasError()) {numErrors++;// Something went wrong!short code = fetchResponse.errorCode(a_topic, a_partition);System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);if (numErrors > 5)break;if (code == ErrorMapping.OffsetOutOfRangeCode()) {// We asked for an invalid offset. For simple case ask for// the last element to resetreadOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);continue;}consumer.close();consumer = null;leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);continue;}numErrors = 0;long numRead = 0;for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {long currentOffset = messageAndOffset.offset();if (currentOffset < readOffset) {System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);continue;}readOffset = messageAndOffset.nextOffset();ByteBuffer payload = messageAndOffset.message().payload();byte[] bytes = new byte[payload.limit()];payload.get(bytes);System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));numRead++;a_maxReads--;}if (numRead == 0) {try {Thread.sleep(1000);} catch (InterruptedException ie) {}}}if (consumer != null)consumer.close();}public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);OffsetResponse response = consumer.getOffsetsBefore(request);if (response.hasError()) {System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));return 0;}long[] offsets = response.offsets(topic, partition);return offsets[0];}private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {for (int i = 0; i < 3; i++) {boolean goToSleep = false;PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);if (metadata == null) {goToSleep = true;} else if (metadata.leader() == null) {goToSleep = true;} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {// first time through if the leader hasn't changed give// ZooKeeper a second to recover// second time, assume the broker did recover before failover,// or it was a non-Broker issue//goToSleep = true;} else {return metadata.leader().host();}if (goToSleep) {Thread.sleep(1000);}}System.out.println("Unable to find new leader after Broker failure. Exiting");throw new Exception("Unable to find new leader after Broker failure. Exiting");}private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {PartitionMetadata returnMetaData = null;loop:for (String seed : a_seedBrokers) {SimpleConsumer consumer = null;try {consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");List<String> topics = Collections.singletonList(a_topic);TopicMetadataRequest req = new TopicMetadataRequest(topics);kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);List<TopicMetadata> metaData = resp.topicsMetadata();for (TopicMetadata item : metaData) {for (PartitionMetadata part : item.partitionsMetadata()) {if (part.partitionId() == a_partition) {returnMetaData = part;break loop;}}}} catch (Exception e) {System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e);} finally {if (consumer != null)consumer.close();}}if (returnMetaData != null) {m_replicaBrokers.clear();for (BrokerEndPoint replica : returnMetaData.replicas()) {m_replicaBrokers.add(replica.host());}}return returnMetaData;}
}

第5章 Kafka producer拦截器(interceptor)

5.1 拦截器原理
Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。
对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
(1)configure(configs)
获取配置信息和初始化数据时调用。
(2)onSend(ProducerRecord):
该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算
(3)onAcknowledgement(RecordMetadata, Exception):
该方法会在消息被应答或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率
(4)close:
关闭interceptor,主要用于执行一些资源清理工作
如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。
5.2 拦截器案例
1)需求:
实现一个简单的双interceptor组成的拦截链。第一个interceptor会在消息发送前将时间戳信息加到消息value的最前部;第二个interceptor会在消息发送后更新成功发送消息数或失败发送消息数。
在这里插入图片描述
2)案例实操
(1)增加时间戳拦截器

package com.atguigu.kafka.interceptor;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;public class TimeInterceptor implements ProducerInterceptor<String, String> {@Overridepublic void configure(Map<String, ?> configs) {}@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {// 创建一个新的record,把时间戳写入消息体的最前部return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),System.currentTimeMillis() + "," + record.value().toString());}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {}
}

(2)统计发送消息成功和发送失败消息数,并在producer关闭时打印这两个计数器

package com.atguigu.kafka.interceptor;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;public class CounterInterceptor implements ProducerInterceptor<String, String>{private int errorCounter = 0;private int successCounter = 0;@Overridepublic void configure(Map<String, ?> configs) {}@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {return record;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {// 统计成功和失败的次数if (exception == null) {successCounter++;} else {errorCounter++;}}@Overridepublic void close() {// 保存结果System.out.println("Successful sent: " + successCounter);System.out.println("Failed sent: " + errorCounter);}
}

(3)producer主程序

package com.atguigu.kafka.interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;public class InterceptorProducer {public static void main(String[] args) throws Exception {// 1 设置配置信息Properties props = new Properties();props.put("bootstrap.servers", "hadoop102:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 2 构建拦截链List<String> interceptors = new ArrayList<>();interceptors.add("com.atguigu.kafka.interceptor.TimeInterceptor"); 	interceptors.add("com.atguigu.kafka.interceptor.CounterInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);String topic = "first";Producer<String, String> producer = new KafkaProducer<>(props);// 3 发送消息for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message" + i);producer.send(record);}// 4 一定要关闭producer,这样才会调用interceptor的close方法producer.close();}
}

3)测试
(1)在kafka上启动消费者,然后运行客户端java程序。

[atguigu kafka]$ bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 --from-beginning --topic first1501904047034,message0
1501904047225,message1
1501904047230,message2
1501904047234,message3
1501904047236,message4
1501904047240,message5
1501904047243,message6
1501904047246,message7
1501904047249,message8
1501904047252,message9

(2)观察java平台控制台输出数据如下:

Successful sent: 10
Failed sent: 0

第6章 Kafka Streams

6.1 概述
6.1.1 Kafka Streams
Kafka Streams。Apache Kafka开源项目的一个组成部分。是一个功能强大,易于使用的库。用于在Kafka上构建高可分布式、拓展性,容错的应用程序。
6.1.2 Kafka Streams特点
1)功能强大
高扩展性,弹性,容错
2)轻量级
无需专门的集群
一个库,而不是框架
3)完全集成
100%的Kafka 0.10.0版本兼容
易于集成到现有的应用程序
4)实时性
毫秒级延迟
并非微批处理
窗口允许乱序数据
允许迟到数据
6.1.3 为什么要有Kafka Stream
当前已经有非常多的流式处理系统,最知名且应用最多的开源流式处理系统有Spark Streaming和Apache Storm。Apache Storm发展多年,应用广泛,提供记录级别的处理能力,当前也支持SQL on Stream。而Spark Streaming基于Apache Spark,可以非常方便与图计算,SQL处理等集成,功能强大,对于熟悉其它Spark应用开发的用户而言使用门槛低。另外,目前主流的Hadoop发行版,如Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易。
既然Apache Spark与Apache Storm拥用如此多的优势,那为何还需要Kafka Stream呢?主要有如下原因。
第一,Spark和Storm都是流式处理框架,而Kafka Stream提供的是一个基于Kafka的流式处理类库。框架要求开发者按照特定的方式去开发逻辑部分,供框架调用。开发者很难了解框架的具体运行方式,从而使得调试成本高,并且使用受限。而Kafka Stream作为流式处理类库,直接提供具体的类给开发者调用,整个应用的运行方式主要由开发者控制,方便使用和调试。
在这里插入图片描述
第二,虽然Cloudera与Hortonworks方便了Storm和Spark的部署,但是这些框架的部署仍然相对复杂。而Kafka Stream作为类库,可以非常方便的嵌入应用程序中,它对应用的打包和部署基本没有任何要求。
第三,就流式处理系统而言,基本都支持Kafka作为数据源。例如Storm具有专门的kafka-spout,而Spark也提供专门的spark-streaming-kafka模块。事实上,Kafka基本上是主流的流式处理系统的标准数据源。换言之,大部分流式系统中都已部署了Kafka,此时使用Kafka Stream的成本非常低。
第四,使用Storm或Spark Streaming时,需要为框架本身的进程预留资源,如Storm的supervisor和Spark on YARN的node manager。即使对于应用实例而言,框架本身也会占用部分资源,如Spark Streaming需要为shuffle和storage预留内存。但是Kafka作为类库不占用系统资源。
第五,由于Kafka本身提供数据持久化,因此Kafka Stream提供滚动部署和滚动升级以及重新计算的能力。
第六,由于Kafka Consumer Rebalance机制,Kafka Stream可以在线动态调整并行度。
6.2 Kafka Stream数据清洗案例
0)需求:
实时处理单词带有”>>>”前缀的内容。例如输入”atguigu>>>ximenqing”,最终处理成“ximenqing”
1)需求分析:
在这里插入图片描述
2)案例实操
(1)创建一个工程,并添加jar包
(2)创建主类

package com.atguigu.kafka.stream;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;public class Application {public static void main(String[] args) {// 定义输入的topicString from = "first";// 定义输出的topicString to = "second";// 设置参数Properties settings = new Properties();settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");StreamsConfig config = new StreamsConfig(settings);// 构建拓扑TopologyBuilder builder = new TopologyBuilder();builder.addSource("SOURCE", from).addProcessor("PROCESS", new ProcessorSupplier<byte[], byte[]>() {@Overridepublic Processor<byte[], byte[]> get() {// 具体分析处理return new LogProcessor();}}, "SOURCE").addSink("SINK", to, "PROCESS");// 创建kafka streamKafkaStreams streams = new KafkaStreams(builder, config);streams.start();}
}

(3)具体业务处理

package com.atguigu.kafka.stream;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;public class LogProcessor implements Processor<byte[], byte[]> {private ProcessorContext context;@Overridepublic void init(ProcessorContext context) {this.context = context;}@Overridepublic void process(byte[] key, byte[] value) {String input = new String(value);// 如果包含“>>>”则只保留该标记后面的内容if (input.contains(">>>")) {input = input.split(">>>")[1].trim();// 输出到下一个topiccontext.forward("logProcessor".getBytes(), input.getBytes());}else{context.forward("logProcessor".getBytes(), input.getBytes());}}@Overridepublic void punctuate(long timestamp) {}@Overridepublic void close() {}
}

(4)运行程序
(5)在hadoop104上启动生产者

[atguigu@hadoop104 kafka]$ bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic first>hello>>>world
>h>>>atguigu
>hahaha

(6)在hadoop103上启动消费者

[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 --from-beginning --topic secondworld
atguigu
hahaha

第7章 扩展

7.1 Kafka与Flume比较
在企业中必须要清楚流式数据采集框架flume和kafka的定位是什么:
flume:cloudera公司研发:
适合多个生产者;
适合下游数据消费者不多的情况;
适合数据安全性要求不高的操作;
适合与Hadoop生态圈对接的操作。
kafka:linkedin公司研发:
适合数据下游消费众多的情况;
适合数据安全性要求较高的操作,支持replication。
因此我们常用的一种模型是:
线上数据 --> flume --> kafka --> flume(根据情景增删该流程) --> HDFS
7.2 Flume与kafka集成
1)配置flume(flume-kafka.conf)

# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/module/datas/flume.log
a1.sources.r1.shell = /bin/bash -c# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2) 启动kafkaIDEA消费者
3) 进入flume根目录下,启动flume

$ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf

4) 向 /opt/module/datas/flume.log里追加数据,查看kafka消费者消费情况

$ echo hello > /opt/module/datas/flume.log

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/188188.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

视频怎么去水印?如何下载保存无水印视频?

你是否曾经在观看鬼畜素材视频时&#xff0c;被烦人的水印挡住了视线&#xff0c;让你感到十分郁闷&#xff1f;不要担心&#xff0c;今天我将为你介绍几种经典的方法&#xff0c;让你轻松下载无水印视频&#xff0c;让观看体验更加清爽不留痕迹。让我们一起来试试吧&#xff0…

【Linux】TCP套接字编程

目录 前言 UDP服务器的完善 线程的封装 结构定义 接口实现 环形队列 结构定义 接口实现 加锁 信号量的申请与释放 入队与出队 整体组装 初始化与析构 信息接收线程 消息发送线程 TCP套接字 创建套接字 listen accept 收发操作 客户端的编写 进一步完善 …

每日一题:LeetCode-1089. 复写零

每日一题系列&#xff08;day 09&#xff09; 前言&#xff1a; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f308; &#x1f50e…

浅学指针(4)函数指针数组和qsort的使用

系列文章目录 文章目录 系列文章目录前言1.函数指针数组的⽤途作用&#xff1a;可以让代码更简洁&#xff0c;逻辑更清晰 2. 回调函数回调函数就是⼀个通过函数指针调⽤的函数 3 . qsort函数qsort函数可以排序所有数据类型解释如图&#xff1a;![在这里插入图片描述](https://i…

Google Chrome 下载 (离线版)

1 访问网址 Google Chrome 网络浏览器 2 点击 下载Chrome 3 直接运行 ChromeStandaloneSetup64.exe 其他&#xff1a; ####################### 谷歌浏览器 (Google Chrome) 最新版离线安装包下载 https://www.iplaysoft.com/tools/chrome/#google_vignette Google Chrome …

socks5代理如何工作?socks5代理可以用来做什么?

socks5代理是一种网络代理服务器&#xff0c;它通常用于改变网络请求的传输方式和地址&#xff0c;从而使得网络请求能够通过代理服务器进行访问。本文将介绍socks5代理的工作原理、优势、使用场景以及如何选择合适的socks5代理。 一、socks5代理的工作原理 socks5代理是一种协…

一文读懂设备巡检的主要内容

在现代企业和组织中&#xff0c;设备的正常运行是业务持续发展的关键&#xff0c;尤其是制造业&#xff0c;由于其发展趋势不断机械化、自动化、大型化、高速化和复杂化&#xff0c;对设备巡检的要求越来越高。然而&#xff0c;在信息化时代&#xff0c;很多企业目前仍采用纸笔…

GPT实战系列-大模型训练和预测,如何加速、降低显存

GPT实战系列-大模型训练和预测&#xff0c;如何加速、降低显存 不做特别处理&#xff0c;深度学习默认参数精度为浮点32位精度&#xff08;FP32&#xff09;。大模型参数庞大&#xff0c;10-1000B级别&#xff0c;如果不注意优化&#xff0c;既耗费大量的显卡资源&#xff0c;…

Python应用:利用matplotlib画学生成绩分布饼图

1. 题目 给定一组学生成绩&#xff1a;[85, 92, 78, 65, 95, 88, 72, 60, 98, 45, 100, 46, 23, 88, 67, 89, 67, 88, 99]&#xff0c;现在评分等级为优&#xff08;90-100&#xff09;、良&#xff08;70-89&#xff09;、及格&#xff08;60-69&#xff09;、不及格&#xff…

玩转大数据4:大数据的崛起与应用领域探索

图片来源网络 引言 在当今数字化时代&#xff0c;大数据正以前所未有的速度和规模崛起。大数据的出现不仅改变了企业和组织的经营模式&#xff0c;也对我们的社会生活带来了深刻的影响。Java作为一种广泛使用的编程语言&#xff0c;在大数据领域发挥着重要的作用。本文将重点…

工程师每日刷题 -4

文章目录 1、深度学习2、算法与数据结构2.1、暴力解法2.2、滑动窗口法 3、编程基础 1、深度学习 问题&#xff1a;CNN的本质和优势&#xff1f; CNN 本质上是一个多层感知机 (MLP)&#xff0c;其成功的原因关键在于它所采用的【稀疏连接】&#xff08;局部感受&#xff09;和…

【带头学C++】----- 九、类和对象 ---- 9.3 析构函数

9.3 析构函数 9.3.1 如何定义析构函数 函数名和类名称相同&#xff0c;在函数名前加 ~ &#xff0c;没有返回值类型&#xff0c;没有函数形参。 (不能被重载) 当对象生命周期结束的时候&#xff0c;系统自动调用析构函数&#xff08;析构函数会先清理对象占用内存空间存放的…

【openssl】Window系统如何编译openssl

本文主要记录如何编译出windows版本的openss的lib库 1.下载openssl&#xff0c;获得openssl-master.zip。 a.可以通过github&#xff08;网址在下方&#xff09;上下载最新的代码、今天是2023.12.1我用的master版本&#xff0c;下载之后恭喜大侠获得《openssl-master.zip》 …

快递物流模拟系统

快递物流模拟系统 文章目录 快递物流模拟系统一、目的二、技术实现&#xff1a;三、网页功能具体介绍 一、目的 调用百度地图 JavaScript API 创建的简单的基站物流GPS定位与监控系统的示例网页 二、技术实现&#xff1a; 使用百度地图 JavaScript API 版本 2.0。利用 BMap …

Webpack——Webpack简介

1、什么是Webpack&#xff1f; Webpack是一个开源的JavaScript模块打包工具&#xff0c;其最核心的功能是解决模块之间的依赖&#xff0c;把各个模块按照特定的规则和顺序组织在一起&#xff0c;最终合并为一个JS文件&#xff08;有时会有多个&#xff0c;这里讨论的只是最基本…

SQL Sever 基础知识 - 数据排序

SQL Sever 基础知识 - 二 、数据排序 二 、对数据进行排序第1节 ORDER BY 子句简介第2节 ORDER BY 子句示例2.1 按一列升序对结果集进行排序2.2 按一列降序对结果集进行排序2.3 按多列对结果集排序2.4 按多列对结果集不同排序2.5 按不在选择列表中的列对结果集进行排序2.6 按表…

人才缺口达150万!云计算凭什么这么火?

《中国互联网发展报告2022》指出&#xff0c;2021年&#xff0c;我国云计算市场规模达到3229亿元&#xff0c;增速为54.4%。未来5年内&#xff0c;我国云计算产业将面临高达近150万的人才缺口&#xff0c;预计未来市场仍将保持30%的增速。与此同时&#xff0c;随着大数据、人工…

【每日OJ —— KY11 二叉树遍历】

每日OJ —— KY11 二叉树遍历 1.题目&#xff1a;KY11 二叉树遍历2.解法2.1.算法讲解2.2.代码实现2.3.提交通过展示 1.题目&#xff1a;KY11 二叉树遍历 2.解法 2.1.算法讲解 1.首先需要创建二叉树结构。 2.其次&#xff0c;根据题目根据题目遍历的顺序要求来实现构建二叉树的…

代码demo-内部订单批量投料

为了简化用户操作&#xff0c;开发内部订单批量投料功能 用户可以批量上传&#xff0c;或者选择对应的物料&#xff0c;输入库位和内部订单号后进行过账操作 对用户选择的内部订单做校验&#xff0c;内部订单是否正确 内部订单的公司是否和工厂对应的公司一致等等 下面展示…

Sui与阿联酋科技孵化器Hub71合作支持生态项目建设,扩大全球影响力

近日&#xff0c;总部位于阿联酋&#xff08; United Arab Emirates &#xff0c;UAE&#xff09;的科技孵化器Hub71宣布与Mysten Labs合作&#xff0c;将支持Sui上的新项目。通过本次合作&#xff0c;孵化项目的开发者们不仅可以获得Mysten Labs的技术专业知识和支持&#xff…