14-Kafka-Day03

第 5 章 Kafka 消费者

5.1 Kafka 消费方式

5.2 Kafka 消费者工作流程

5.2.1 消费者总体工作流程

 

  一个消费者组中的多个消费者,可以看作一个整体,一个组内的多个消费者是不可能去消费同一个分区的数据的,要不然就消费重复了。

5.2.2 消费者组原理

Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。 
• 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。 
• 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者

消费者组初始化流程:

消费计划的制定:

 生产者把数据发送给各个分区,每个broker节点都有一个coordinator(协调器),消费者组对分区进行消费,到底哪个消费者消费哪个分区呢?首先groupId对50取模,看最后的结果是哪个分区节点,假如是1分区,那么1分区的协调器就是本次消费者组的老大,消费者纷纷向该协调器进行注册,协调器从中随机选择一个消费者作为本次消费的Leader,然后把本次消费的具体情况发送给Leader,让其制定一个消费计划(就是哪个消费者消费哪个分区),然后Leader发送给协调器,协调器再进行群发,将计划公布,各个消费者按照这个计划进行消费。

消费者组详细消费流程:

假如本次抓取的数量是500条,大小超过了50M,那以50M为主。

5.2.3 消费者重要参数

5.3 消费者 API

5.3.1 独立消费者案例(订阅主题)

1)需求:

创建一个独立消费者,消费 first 主题中数据。

注意:在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组 id 会被自动填写随机的消费者组 id。

2)实现步骤

(1)创建包名:com.bigdata.kafka.consumer

(2)编写代码

package com.bigdata.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;/***  编写代码消费kafka中的数据*/
public class Customer01 {public static void main(String[] args) {// 其实就是mapProperties properties = new Properties();// 连接kafkaproperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");// 字段反序列化   key 和  valueproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");// 创建一个kafka消费者的对象KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);// 消费者消费的是kafka集群的数据,消费哪个主题的数据呢?List<String> topics = new ArrayList<>();topics.add("first");// list总可以设置多个主题的名称kafkaConsumer.subscribe(topics);// 因为消费者是不停的消费,所以是while truewhile(true){// 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));// 循环打印每一条数据for (ConsumerRecord record:records) {// 打印数据中的值System.out.println(record.value());// 打印一条数据System.out.println(record);}}}
}

测试:

(1)在 IDEA 中执行消费者程序

(2)在 Kafka 集群控制台,创建 Kafka 生产者,并输入数据。

bin/kafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic first

>hello

(3)在IDEA控制台查看。

5.3.2 独立消费者案例(订阅分区)

1)需求:创建一个独立消费者,消费 first 主题 0 号分区的数据。

2)实现步骤

(1)代码编写

package com.bigdata.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;/***  编写代码消费kafka中的数据  消费者只消费某个固定分区的数据*/
public class Customer02 {public static void main(String[] args) {// 其实就是mapProperties properties = new Properties();// 连接kafkaproperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");// 字段反序列化   key 和  value/***  如何将自力传输到天安门看升国旗*   1、先将自己序列化   原子*   2、管道(网线)*   3、再进行反序列化 (自力的NDA)  活泼可爱的自力*   结论是:只要是一个对象,它想保存或者想传输,必须序列化*          传输过去之后,进行反序列化。*    比如:java  hadoop*/properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "abc");// 创建一个kafka消费者的对象KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);// 消费者消费的是kafka集群的数据,消费哪个主题的数据呢?List<TopicPartition> partitions = new ArrayList<>();partitions.add(new TopicPartition("first",0));// 指定某个分区进行消费kafkaConsumer.assign(partitions);// 因为消费者是不停的消费,所以是while truewhile(true){// 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));// 循环打印每一条数据for (ConsumerRecord record:records) {// 打印数据中的值System.out.println("目前消费的分区是"+record.partition()+",value的值为:"+record.value());// 打印一条数据System.out.println(record);}}}
}

5.3.3 消费者组案例

1)需求:测试同一个主题的分区数据,只能由一个消费者组中的消费者只能消费一个分区的数据,不能同时消费多个分区的数据。

2)案例实操

(1)运行CustomConsumer ,通过idea,将这个类运行三次

或者使用如下设置:

2023版本:

老版本:

配置完成,点击后面的运行,就可以同一个类,运行三次了。运行三次就是三个消费者。

(2)启动代码中的生产者发送50条数据,在 IDEA 控制台即可看到两个消费者在消费不同 分区的数据(如果只发生到一个分区,可以在发送时增加延迟代码 Thread.sleep(2);)。

5.4 生产经验——分区的分配以及再平衡

这个章节主要讲:本次消费任务的计划是如何制定的?

1、一个consumer group中有多个consumer组成,一个 topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个partition的数据。2、Kafka有四种主流的分区分配策略: Range、RoundRobin(轮询)、Sticky(粘性)、CooperativeSticky(配合的粘性)。可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略。

 

参数名称

描述

heartbeat.interval.ms

Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。 该条目的值必须小于session.timeout.ms,也不应该高于 session.timeout.ms 的 1/3。

session.timeout.ms

Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超 过该值,该消费者被移除,消费者组执行再平衡。

max.poll.interval.ms

消费者处理消息的最大时长,默认是 5 分钟。超过该值,该 消费者被移除,消费者组执行再平衡

partition.assignment.strategy

消 费 者 分 区 分 配 策 略 , 默 认 策 略 是 Range +CooperativeSticky。Kafka 可以同时使用多个分区分配策略。

可 以 选 择 的 策 略 包 括 : Range 、 RoundRobin 、 Sticky 、CooperativeSticky

5.4.1 Range 以及再平衡

1)Range 分区策略原理

2Range 分区分配策略案例

(1)修改主题 first 为 7 个分区。

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --alter --topic first --partitions 7

注意:分区数可以增加,但是不能减少。

(2)这样可以由三个消费者

CustomConsumer、CustomConsumer1、CustomConsumer2 组成消费者组,组名都为“test”, 同时启动 3 个消费者。

(3)启动 CustomProducer 生产者,发送 500 条消息,随机发送到不同的分区。

备注:只需要将以前的CustomProducerCallback,修改发送次数为500次即可。

package com.bigdata.kafka.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) throws InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.235.128:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = newKafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 500; i++) {// 添加回调kafkaProducer.send(new ProducerRecord<>("first","bigdata " + i), new Callback() {// 该方法在 Producer 收到 ack 时调用,为异步调用@Overridepublic void onCompletion(RecordMetadata metadata,Exception exception) {if (exception == null) {// 没有异常,输出信息到控制台System.out.println(" 主题: " +metadata.topic() + "->" + "分区:" + metadata.partition());} else {// 出现异常打印exception.printStackTrace();}}});// 延迟一会会看到数据发往不同分区Thread.sleep(20);}// 5. 关闭资源kafkaProducer.close();}
}
说明:Kafka 默认的分区分配策略就是 Range + CooperativeSticky,所以不需要修改策略。
默认是Range,但是在经过一次升级之后,会自动变为CooperativeSticky。这个是官方给出的解释。
默认的分配器是[RangeAssignor, CooperativeStickyAssignor],默认情况下将使用RangeAssignor,但允许通过一次滚动反弹升级到CooperativeStickyAssignor,该滚动反弹会将RangeAssignor从列表中删除。

 (4)观看 3 个消费者分别消费哪些分区的数据。

假如消费情况和预想的不一样:
1、集群是否健康,比如某些kafka进程没启动
2、发送数据的时候7个分区没有使用完,因为它使用了粘性分区。如何让它发送给7个分区呢,代码中添加:// 延迟一会会看到数据发往不同分区Thread.sleep(20);

发现一个消费者消费了,5,6分区,一个消费了0,1,2分区,一个消费了3,4分区。

此时并没有修改分区策略,原因是默认是Range.

3Range 分区分配再平衡案例

(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。

1 号消费者:消费到 3、4 号分区数据。

2 号消费者:消费到 5、6 号分区数据。

0号的数据,没人消费。

说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需

要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

(2)再次重新发送消息观看结果(45s 以后)。

1 号消费者:消费到 0、1、2、3 号分区数据。

2 号消费者:消费到 4、5、6 号分区数据。

说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配。

5.4.2 RoundRobin(轮询) 以及再平衡

1)RoundRobin 分区策略原理

2RoundRobin 分区分配策略案例

(1)依次在 CustomConsumer、CustomConsumer1、CustomConsumer2 三个消费者代 码中修改分区分配策略为 RoundRobin。

轮询的类的全路径是:
org.apache.kafka.clients.consumer.RoundRobinAssignorA list of class names or class types, ordered by preference, of supported partition assignment strategies that the client will use to distribute partition ownership amongst consumer instances when group management is used. Available options are:org.apache.kafka.clients.consumer.RangeAssignor: Assigns partitions on a per-topic basis.
org.apache.kafka.clients.consumer.RoundRobinAssignor: Assigns partitions to consumers in a round-robin fashion.
org.apache.kafka.clients.consumer.StickyAssignor: Guarantees an assignment that is maximally balanced while preserving as many existing partition assignments as possible.
org.apache.kafka.clients.consumer.CooperativeStickyAssignor: Follows the same StickyAssignor logic, but allows for cooperative rebalancing.
package com.bigdata.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumerWithFenPei {public static void main(String[] args) {Properties properties = new Properties();// 连接kafkaproperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop11:9092");// 字段反序列化   key 和  valueproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");// 指定分区的分配方案properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);// 消费者订阅主题,主题有数据就会拉取数据// 指定消费的主题ArrayList<String> topics = new ArrayList<>();topics.add("first");// 一个消费者可以订阅多个主题kafkaConsumer.subscribe(topics);while(true){//1 秒中向kafka拉取一批数据ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String,String> record :records) {// 打印一条数据System.out.println(record);// 可以打印记录中的很多内容,比如 key  value  offset topic 等信息System.out.println(record.value());}}}
}修改一下消费者组为test2

(2)重启 3 个消费者,重复发送消息的步骤,观看分区结果

3RoundRobin 分区分配再平衡案例

(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。

1 号消费者:消费到 2、5 号分区数据

2 号消费者:消费到 4、1 号分区数据

0 号消费者 以前对应的数据没有人消费

说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

(2)再次重新发送消息观看结果(45s 以后)。

1 号消费者:消费到 0、2、4、6 号分区数据

2 号消费者:消费到 1、3、5 号分区数据

说明:消费者 0 已经被踢出消费者组,所以重新按照 RoundRobin 方式分配。

5.4.3 Sticky 以及再平衡

粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前, 考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。 粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区 到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。

比如分区有   0   1    2   3   4    5   6   
消费者有     c1     c2      c3
c1 消费 3个     c2 消费2个    c3 消费2个分区
跟以前不一样的是,c1 消费的3个分区是随机的,不是按照  0   1    2 这样的顺序来的。 

 

1)需求

设置主题为 first,7 个分区;准备 3 个消费者,采用粘性分区策略,并进行消费,观察

消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。

2)步骤

(1)修改分区分配策略为粘性。

注意:3 个消费者都应该注释掉,之后重启 3 个消费者,如果出现报错,全部停止等

会再重启,或者修改为全新的消费者组。

// 修改分区分配策略
ArrayList<String> startegys = new ArrayList<>();
startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);

(2)使用同样的生产者发送 500 条消息。

可以看到会尽量保持分区的个数近似划分分区。

3Sticky 分区分配再平衡案例

(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。

1 号消费者:消费到 2、5、3 号分区数据。

2 号消费者:消费到 4、6 号分区数据。

0 号消费者的任务没人顶替它消费

说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需

要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

(2)再次重新发送消息观看结果(45s 以后)。

1 号消费者:消费到 2、3、5 号分区数据。

2 号消费者:消费到 0、1、4、6 号分区数据。

说明:消费者 0 已经被踢出消费者组,所以重新按照粘性方式分配。

5.4.4 CooperativeSticky 的解释【新的kafka中刚添加的策略】

在消费过程中,会根据消费的偏移量情况进行重新再平衡,也就是粘性分区,运行过程中还会根据消费的实际情况重新分配消费者,直到平衡为止。
好处是:负载均衡,不好的地方是:多次平衡浪费性能。

5.5 offset 位移[偏移量](重要)

记录消费到了哪里的这个值,就是偏移量。

5.5.1 offset 的默认维护位置 

从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets  【topic 其实就是数据,就是位置 topic -log --segment- 一个个文件】
Kafka0.9版本之前,consumer默认将offset 保存在Zookeeper中。
kafka0.11 版本 高于 kafka 0.9,咱们用的kafka是 3.0版本。
假如公司中想重置kafka。 删除每一个kafka  logs 以及 datas,zk中的kafka 文件夹删除掉。

为什么要把消费者的偏移量从zk中挪到 kafka中呢?原因是避免Conusmer频发跟zk进行通信。

__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是group.id+topic+ 分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行 compact (压缩),也就是每个 group.id+topic+分区号就只保留最新数据。

1)消费 offset 案例

(0)思想:__consumer_offsets 为 Kafka 中的 topic,那就可以通过消费者进行消费。

(1)在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,

默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。

如果不修改是无法查看offset的值的,因为这些都是加密数据。

修改完,记得同步给其他的节点
重新启动zk和kafka.
zk.sh start
kf.sh start

 (2)采用命令行方式,创建一个新的 topic。

bin/kafka-topics.sh --bootstrap-server hadoop11:9092 --create --topic bigdata --partitions 2 --replication-factor 2

(3)启动生产者往 bigdata 生产数据。

bin/kafka-console-producer.sh --topic  bigdata --bootstrap-server hadoop11:9092

(4)启动消费者消费 bigdata 数据。

bin/kafka-console-consumer.sh --bootstrap-server bigdata01:9092 --topic five --group suibian
注意:指定消费者组名称,更好观察数据存储位置(key 是 group.id+topic+分区号)
假如出现消费不到数据的情况,将分区去掉或者组名称修改一下,起个别的名字

(5)查看消费者消费主题__consumer_offsets。

bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server bigdata01:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

5.5.2 自动提交 offset 

为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。 
自动提交offset的相关参数: 
enable.auto.commit:是否开启自动提交offset功能,默认是true 
auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s

 

代码演示:

1)消费者自动提交 offset

package com.bigdata.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumerAutoOffset {public static void main(String[] args) {Properties properties = new Properties();// 连接kafkaproperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop11:9092");// 字段反序列化   key 和  valueproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 是否自动提交 offset  通过这个字段设置properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);// 提交 offset 的时间周期 1000ms,默认 5sproperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);// 消费者订阅主题,主题有数据就会拉取数据// 指定消费的主题ArrayList<String> topics = new ArrayList<>();topics.add("first");// 一个消费者可以订阅多个主题kafkaConsumer.subscribe(topics);while(true){//1 秒中向kafka拉取一批数据ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String,String> record :records) {// 打印一条数据System.out.println(record);// 可以打印记录中的很多内容,比如 key  value  offset topic 等信息System.out.println(record.value());}}}
}

5.5.3 手动提交 offset 

        虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API。 手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。 
• commitSync  (同步提交):必须等待offset提交完毕,再去消费下一批数据。 
• commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。

1)同步提交 offset

由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。以下为同步提交 offset 的示例。

package com.bigdata.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumerByHandSync {public static void main(String[] args) {Properties properties = new Properties();// 连接kafkaproperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop11:9092");// 字段反序列化   key 和  valueproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 是否自动提交 offset  通过这个字段设置properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);// 消费者订阅主题,主题有数据就会拉取数据// 指定消费的主题ArrayList<String> topics = new ArrayList<>();topics.add("first");// 一个消费者可以订阅多个主题kafkaConsumer.subscribe(topics);while(true){//1 秒中向kafka拉取一批数据ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String,String> record :records) {// 打印一条数据System.out.println(record);// 可以打印记录中的很多内容,比如 key  value  offset topic 等信息System.out.println(record.value());}// 同步提交 offsetkafkaConsumer.commitSync();}}
}

2)异步提交 offset

虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。

以下为异步提交 offset 的示例:

记得将自动提交给关了
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 同步提交 offset//kafkaConsumer.commitSync();// 异步提交kafkaConsumer.commitAsync();

5.5.4 指定 Offset 消费 【重要】

auto.offset.reset = earliest | latest | none 默认是 latest。

当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?

(1)earliest:自动将偏移量重置为最早的偏移量,--from-beginning

(2)latest(默认值):自动将偏移量重置为最新偏移量。

(3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常

props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 设置为latest

这个参数的力度太大了。不是从头,就是从尾。有没有一种方法能我们自己选择消费的位置呢?
有。
kafka提供了seek方法,可以让我们从分区的固定位置开始消费。
入参为seek (TopicPartition topicPartition,offset offset)。前面我们讲过TopicPartition这个对象里有2个成员变量。一个是Topic,一个是partition。再结合offset,完全就可以定位到某个主题、某个分区的某个leader副本的active日志文件的某个位置。
offset是指分区的消息偏移量

 

package com.bigdata.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
import java.util.Set;public class CustomConsumerSeek {public static void main(String[] args) {Properties properties = new Properties();// 连接kafkaproperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop11:9092");// 字段反序列化   key 和  valueproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);// 2 订阅一个主题ArrayList<String> topics = new ArrayList<>();topics.add("first");kafkaConsumer.subscribe(topics);// 执行计划// 此时的消费计划是空的,因为没有时间生成Set<TopicPartition> assignment = kafkaConsumer.assignment();while(assignment.size() == 0){// 这个本身是拉取数据的代码,此处可以帮助快速构建分区方案出来kafkaConsumer.poll(Duration.ofSeconds(1));// 一直获取它的分区方案,什么时候有了,就什么时候跳出这个循环assignment = kafkaConsumer.assignment();}for (TopicPartition tp:assignment) {kafkaConsumer.seek(tp,10);}while(true){//1 秒中向kafka拉取一批数据ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String,String> record :records) {// 打印一条数据System.out.println(record);// 可以打印记录中的很多内容,比如 key  value  offset topic 等信息System.out.println(record.value());}}}
}

注意:每次执行完,需要修改消费者组名;

5.5.5 指定时间消费

需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。

例如要求按照时间消费前一天的数据,怎么处理?

操作步骤:

package com.bigdata.consumer;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.*;/***  从某个特定的时间开始进行消费*/
public class Customer05 {public static void main(String[] args) {// 其实就是mapProperties properties = new Properties();// 连接kafkaproperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");// 字段反序列化   key 和  valueproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testf");// 指定分区的分配方案  为轮询策略//properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");ArrayList<String> startegys = new ArrayList<>();startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);// 创建一个kafka消费者的对象KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);// 消费者消费的是kafka集群的数据,消费哪个主题的数据呢?List<String> topics = new ArrayList<>();topics.add("five");// list总可以设置多个主题的名称kafkaConsumer.subscribe(topics);// 因为消费者是不停的消费,所以是while true// 指定了获取分区数据的起始位置。// 这样写会报错的,因为前期消费需要指定计划,指定计划需要时间// 此时的消费计划是空的,因为没有时间生成Set<TopicPartition> assignment = kafkaConsumer.assignment();while(assignment.size() == 0){// 这个本身是拉取数据的代码,此处可以帮助快速构建分区方案出来kafkaConsumer.poll(Duration.ofSeconds(1));// 一直获取它的分区方案,什么时候有了,就什么时候跳出这个循环assignment = kafkaConsumer.assignment();}Map<TopicPartition, Long> hashMap = new HashMap<>();for (TopicPartition partition:assignment) {hashMap.put(partition,System.currentTimeMillis()- 60*60*1000);}Map<TopicPartition, OffsetAndTimestamp> map = kafkaConsumer.offsetsForTimes(hashMap);for (TopicPartition partition:assignment) {OffsetAndTimestamp offsetAndTimestamp = map.get(partition);kafkaConsumer.seek(partition,offsetAndTimestamp.offset());}while(true){// 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));// 循环打印每一条数据for (ConsumerRecord record:records) {// 打印数据中的值System.out.println(record.value());System.out.println(record.offset());// 打印一条数据System.out.println(record);}}}
}

5.5.6 漏消费和重复消费

重复消费:已经消费了数据,但是 offset 没提交。

漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。

思考:怎么能做到既不漏消费也不重复消费呢?详看消费者事务。

5.6 生产经验——消费者事务

如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset 过程做原子绑定。此时我们需要将Kafka的offset保存到支持事务的自定义介质(比 如MySQL)。这部分知识会在后续项目部分涉及。
事务的四大特征:ACID
转账:张三 --> 李四

 

5.7 生产经验——数据积压(消费者如何提高吞吐量)

负利率:钱存银行,银行收你钱。
目前:只能存一年,1.9,1万存一年190元。

 

bit  --> byte --> kb -->mb -->gb --> tb --> pb --> eb -> zb -->yb

 

第 6 章 Kafka-Eagle 监控

Kafka-Eagle 框架可以监控 Kafka 集群的整体运行情况,在生产环境中经常使用。

在生产过程中,想创建topic、查看所有topic、想查看某个topic 想查看分区等,都需要写命令,能不能有一个图形化的界面,让我们操作呢?

6.1 MySQL 环境准备

Kafka-Eagle 的安装依赖于 MySQL,MySQL 主要用来存储可视化展示的数据。如果集

群中之前安装过 MySQL 可以跨过该步。

6.2 Kafka 环境准备

1)关闭 Kafka 集群

kf.sh stop

2)修改/opt/installs/kafka3/bin/kafka-server-start.sh 命令中

vi bin/kafka-server-start.sh

修改如下参数值:

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" 
fi

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"export JMX_PORT="9999"#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

注意:修改之后在启动 Kafka 之前要分发之其他节点

xsync.sh kafka-server-start.sh

 

6.3 Kafka-Eagle 安装

0)官网:https://www.kafka-eagle.org

1)上传压缩包 kafka-eagle-bin-2.0.8.tar.gz 到集群/opt/modules 目录

2)解压到本地

tar -zxvf kafka-eagle-bin-2.0.8.tar.gz

3)将 efak-web-2.0.8-bin.tar.gz 解压至/opt/installs

cd kafka-eagle-bin-2.0.8
tar -zxvf efak-web-2.0.8-bin.tar.gz -C /opt/installs/ 

4)修改名称

mv efak-web-2.0.8/ efak

5)修改配置文件 /opt/installs/efak/conf/system-config.properties

vi system-config.properties
修改如下:
# offset 保存在 kafka 
cluster1.efak.offset.storage=kafkaefak.zk.cluster.alias=cluster1,cluster2
cluster1.zk.list=bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka
cluster2.zk.list=bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka# 修改数据库连接:&serverTimezone=GMT  时区一定要写,否则报405错误!
# 127.0.0.1 = localhost   hosts文件中定义的
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT
efak.username=root
efak.password=123456

7)添加环境变量

# kafkaEFAK
export KE_HOME=/opt/installs/efak
export PATH=$PATH:$KE_HOME/bin

注意:source /etc/profile

启动数据库:

[root@hadoop11 conf]# systemctl start mysqld
[root@hadoop11 conf]# systemctl status mysqld

新建一个数据库,叫做ke

8)启动

(1)注意:启动之前需要先启动 ZK 以及 KAFKA。

zk.sh start
kf.sh start
xcall.sh zkServer.sh start
kf.sh start

(2)启动 efak

bin/ke.sh start

说明:如果停止 efak,执行命令

bin/ke.sh stop

假如启动无法访问,怎么办?查看日志,必定有答案!!

使用的时候,一定要在配置文件中编写正确的路径,否则kafka集群没办法连接:

cluster1.zk.list=hadoop11:2181,hadoop12:2181,hadoop13:2181/kafka
cluster2.zk.list=hadoop11:2181,hadoop12:2181,hadoop13:2181/kafka

查看可视化大屏的时候:

1、同步一下时间   systemctl restart chronyd
2、要开启消费者
3、要开发生产者
4、关闭flume (选项)
5、如果都没效果,可以添加一句话  在zkServer.sh 中ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"
全部重启一下,这些服务

要想看到大屏数据,此处的JMX必须是上线状态:

6.4 Kafka-Eagle 页面操作

1)登录页面查看监控数据

http://ip地址:8048/

或者

http://ip地址:8048/ke

切记:假如访问不了,查看efak中的log日志,里面的错误特别的详细,绝对可以解决你的问题。

eagle 可以通过这个图形化界面管理Topic,查看kafka的集群的消息发送和消费情况,还可以操作zk.

6.5-Kafka-UI的安装

上传安装包 到 /opt/modules下面

tar -zxvf kafka-ui-lite-1.2.11-bin.tar.gz -C /opt/installs/
mv kafka-ui-lite-1.2.11/ kafka-ui
进入bin路径下:./kafkaUI.sh -d start

进入界面:http://bigdata01:8889/

七、第七章--与其他软件整合

Flume和kafka的整合

1、Kafka作为Source 【数据进入到kafka中,抽取出来】

在flume的conf文件夹下,有一个flumeconf 文件夹:

创建一个flume脚本文件: kafka-memory-logger.conf

Flume 1.9用户手册中文版 — 可能是目前翻译最完整的版本了

a1.sources = r1
a1.sinks = k1
a1.channels = c1# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 100
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
a1.sources.r1.kafka.topics = five
a1.sources.r1.kafka.consumer.group.id = qiaodaohu# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sinks.k1.type = logger
a1.sinks.k1.maxBytesToLog = 128

 接着创建一个topic ,名字叫做 kafka-flume,或者直接使用以前的five 主题

kafka-topics.sh --create --topic kafka-flume --bootstrap-server bigdata01:9092 --partitions 3 --replication-factor 1

测试:

启动一个消息生产者,向topic中发送消息,启动flume,接收消息

kafka-console-producer.sh --topic kafka-flume --bootstrap-server bigdata01:9092

 启动flume,查看log日志:

flume-ng agent -n a1 -c ../conf -f ./kafka-memory-logger.conf -Dflume.root.logger=INFO,console

编写一个脚本:flume-kafka-sink.conf

##a1就是flume agent的名称
## source r1
## channel c1
## sink k1
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = bigdata01
a1.sources.r1.port = 44444# 修改sink为kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = bigdata01:9092
a1.sinks.k1.kafka.topic = five
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
netcat  --> memory -->kafka

 创建topic (flume-kafka):

kafka-topics.sh --create --topic flume-kafka --bootstrap-server bigdata01:9092 --partitions 3 --replication-factor 1

测试:

启动:

flume-ng agent -n a1 -c conf -f $FLUME_HOME/job/flume-kafka-sink.conf -Dflume.root.logger=INFO,console

使用telnet命令,向端口发送消息:

yum -y install telnettelnet bigdata01 44444

在窗口不断的发送文本数据,数据被抽取到了kafka中,如何获取kafka数据呢?使用消费者:

kafka-console-consumer.sh --topic flume-kafka --bootstrap-server bigdata01:9092 --from-beginning

这样数据就被消费了。

假定一个场景:

fluem可以抽取不断产生的日志,抽取到的日志数据,发送给kafka,kafka经过处理,展示在页面上,或者进行汇总统计。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/31320.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

WIC 图像处理初体验——读取像素的值

先放上运行结果&#xff1a; 可以发现红绿蓝是从后往前的。 必须以C方式编译代码&#xff01; #define _CRT_SECURE_NO_WARNINGS #include <stdio.h> #include <wincodec.h>int main(void) {CoInitialize(nullptr);IWICImagingFactory* fac;CoCreateInstance(CLS…

勒索病毒猖狂,请提前做好安全防护,德迅卫士保护你的安全

随着互联网的飞速发展&#xff0c;网络安全问题日益凸显。其中&#xff0c;勒索病毒作为一种极具危害性的网络安全威胁&#xff0c;已经引起了广泛关注。为了帮助大家更好地预防和应对勒索病毒攻击&#xff0c;我们特地为您精心准备了这份超实用的勒索病毒自救预防指南。让我们…

数据中心技术:大数据时代的机遇与挑战

在大数据时代&#xff0c;数据中心网络对于存储和处理大量信息至关重要。随着云计算的出现&#xff0c;数据中心已成为现代技术的支柱&#xff0c;支持社交媒体、金融服务等众多行业。然而&#xff0c;生成和处理的大量数据带来了一些挑战&#xff0c;需要创新的解决方案。在这…

Android系统 抓trace方法(手机及车机)

1、先说说什么是trace trace是一种以perfetto.trace结尾的文件。一般用来分析卡顿、启动时间慢等问题&#xff0c;还可以用来分析方法耗时&#xff0c;android系统的性能、功耗等等问题。所需要使用到的网站是&#xff1a; Perfetto UI 他的前身是Systrace&#xff0c;不过Pe…

分布式事务的八种方案解析

分布式事务的八种方案解析 针对不同的分布式场景业界常见的解决方案有2PC、TCC、可靠消息最终一致性、最大努力通知等方案&#xff0c;以下总结8 种常见的解决方案&#xff0c;帮助大家在实际的分布式系统中更好地运用事务。 1.2PC 二阶段提交协议&#xff08;Two-phase commit…

2024年旅游与经济发展国际会议(ICTED 2024)

2024年旅游与经济发展国际会议&#xff08;ICTED 2024&#xff09; 2024 International Conference on Tourism and Economic Development 【重要信息】 大会地点&#xff1a;青岛 大会官网&#xff1a;http://www.icicted.com 投稿邮箱&#xff1a;icictedsub-conf.com 【注意…

第一个Java程序

编写第一个Java程序通常从经典的"Hello,World!"程序开始。下面是一个简单的Java程序示例&#xff0c;它将打印出"Hello, World!"到控制台&#xff1a; 1.编写代码&#xff1a; 打开文本编辑器&#xff08;如记事本、Notepad、Visual StudioCode等&#x…

【HarmonyOS4学习笔记】《HarmonyOS4+NEXT星河版入门到企业级实战教程》课程学习笔记(十五)

课程地址&#xff1a; 黑马程序员HarmonyOS4NEXT星河版入门到企业级实战教程&#xff0c;一套精通鸿蒙应用开发 &#xff08;本篇笔记对应课程第 23 - 24 节&#xff09; P23《22.Stage模型-基本概念》 一个应用可以有很多的能力&#xff0c;每个能力可以成为一个 Ability Mod…

领域驱动设计(DDD)微服务架构模式总结

part1. Domain Driven Design(Strategic Design,Tactical Design) Top Down focus on business or activityy domain Ubiquitous Language:统一语言 Tactical Design Tools&#xff1a;战术性设计工具 Implementing Domain Driven Design(Event storming,DDD in code) DDD总结…

阿里巴巴橙点同学达摩院认证证书

网址&#xff1a;https://orange-class.com/ 为竞争激烈的职业做好充分的准备&#xff0c;无需相关经验立即开始学习。 阿里达摩院组织背书认证。 内容包括八个职业方向&#xff0c;涉及AI、开发、营销、设计等不同岗位&#xff1a; 其中&#xff0c;AI的高级认证情况如下&…

收藏丨世界地形图(超高清)

原文链接https://mp.weixin.qq.com/s?__bizMzUyNzczMTI4Mg&mid2247668146&idx2&snbebd2f4921994ab05ed47efdebe8e706&chksmfa770e8fcd008799bf1d1cabd62edca7f60a5c7a1ef9c0bacf3bdc102bccf0f12d54d6c07c49&token1405091246&langzh_CN&scene21#we…

Ubuntu 22.04.4 LTS openresty(Nginx) 通过Lua+Redis 实现动态封禁IP

1 系统环境 testiZbp1g7fmjea77vsqc5hmmZ:~$ cat /etc/os-release PRETTY_NAME"Ubuntu 22.04.4 LTS" NAME"Ubuntu" VERSION_ID"22.04" VERSION"22.04.4 LTS (Jammy Jellyfish)" VERSION_CODENAMEjammy IDubuntu ID_LIKEdebian HOME…

星谷案例入选新华社国家高端智库报告,彰显国际影响力!

中关村科学城星谷(创新园)以其在商业航天领域的卓越创新能力和显著产业影响力,被新华社国家高端智库专题组认可,并入选《更好赋能中国繁荣世界——新质生产力的理论贡献和实践价值》智库报告,成为新质生产力的典型代表案例。 这是中关村科学城星谷(创新园)继登上《新闻联播》后…

卓越的 App UI 风格引领潮流

卓越的 App UI 风格引领潮流

RAG实操教程langchain+Milvus向量数据库创建你的本地知识库 二

Miluvs 向量数据库 关于 Milvui 可以参考我的前两篇文章 • 一篇文章带你学会向量数据库Milvus&#xff08;一&#xff09;[1]• 一篇文章带你学会向量数据库Milvus&#xff08;二&#xff09;[2] 下面我们安装 pymilvus 库 pip install --upgrade --quiet pymilvus如果你…

如何开启Claude 3的Artifacts功能以及如何注册Claude3

就很突然&#xff0c;Claude 3.5&#xff0c;它来了&#xff01; Anthropic发布3.5系列第一个版本Claude 3.5 Sonnet。在多个关键指标中&#xff0c;GPT-4o几乎被吊打&#xff01; 另外Claude 3.5 Sonnet是免费的&#xff0c;提供了跟gpt-4o一样的次数。更高的速度和次数&…

python循环写入新样本到csv文件,并解决中文乱码的问题

新样本循环写入csv中 def write_sample(self):# 创建一个包含所有字段的列表&#xff0c;它将作为CSV的一行fields [基地, 拉线, 正/负极车间, 罐体编号, 样本ID, 工步序号, 检测结果]row_data [self.base_id, self.line_id, self.workshop_id, self.device, self.filename[:…

微信公众号多域名回调系统V1.5 源码

这是一款基于ThinkPHP6.0开发的微信公众号多域名回调系统。本系统有如下功能&#xff1a; 微信公众号多域名回调功能&#xff1a;微信公众号后台默认只能授权2个网页域名&#xff0c;用本系统突破这个限制&#xff0c;用同一个公众号对接无限多个网站。网站后台支持回调域名白…

王者荣耀图鉴皮肤怎么来的

王者荣耀图鉴皮肤怎么来的 最近一个王者荣耀图鉴开源很火 这个项目里面有很多的图片和音效资源&#xff0c;最简单的方法就是利用爬虫技术爬取这些图片资源。 第一步环境准备 Pyhton3.12macos系统 第二步查看王者荣耀官网 这些图片资源最简单的来源就是王者荣耀官网网站…

【FPGA + Nvidia/算能GPU+AI】自动驾驶多核异构实现 16路车载摄像头实时AI分析解决方案

基于 Xilinx 公司ZYNQ Ultrascale MPSoC系列 FPGA 芯片设计&#xff0c;应用于无人驾驶、慢速特种车及数据采集车、车载仿真测试系统等自动驾驶领域 自动驾驶&#xff1a;16通道车载摄像头PCIE采集卡方案。 16 通道摄像头 最多支持 16 通道 GMSL1/2 摄像头输入 8MP 摄像头 最…