Kafka生产消费流程

Kafka生产消费流程

1.Kafka一条消息发送和消费的流程图(非集群)

image.png

2.三种发送方式

  • 准备工作

创建maven工程,引入依赖

        <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.3.1</version></dependency>
  • 消费者
/*** 类说明:消费者入门*/
public class HelloKafkaConsumer {public static void main(String[] args) {// 设置属性Properties properties = new Properties();// 指定连接的kafka服务器的地址properties.put("bootstrap.servers","192.168.140.103:9092");// 设置String的反序列化properties.put("key.deserializer", StringDeserializer.class);properties.put("value.deserializer", StringDeserializer.class);// 设置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"llp_consumerA");// 构建kafka消费者对象KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);try {//指定消费者订阅的主题consumer.subscribe(Collections.singletonList("llp-topic"));// 调用消费者拉取消息while(true){// 设置1秒的超时时间ConsumerRecords<String, String> records= consumer.poll(Duration.ofSeconds(1));for(ConsumerRecord<String, String> record:records){String key = record.key();String value = record.value();//分区int partition = record.partition();System.out.println("接收到消息: partition="+partition+", key = " + key + ", value = " + value);}}} finally {// 释放连接consumer.close();}}
}

1.1 发送并忘记

忽略send方法的返回值,不做任何处理。大多数情况下,消息会正常到达,而且生产者会自动重试,但有时会丢失消息。

消费者

/*** 类说明:kafak生产者*/
public class HelloKafkaProducer {public static void main(String[] args) {// 设置属性Properties properties = new Properties();// 指定连接的kafka服务器的地址,9092kafka默认端口properties.put("bootstrap.servers","192.168.140.103:9092");//补充一下: 配置多台的服务  用,分割, 其中一个宕机,生产者 依然可以连上(集群)// 设置String的序列化 (对象-》二进制字节数组 : 能够在网络上传输 )properties.put("key.serializer", StringSerializer.class);properties.put("value.serializer", StringSerializer.class);// 构建kafka生产者对象KafkaProducer<String,String> producer  = new KafkaProducer<String, String>(properties);try {ProducerRecord<String,String> record;try {// 构建消息record = new ProducerRecord<String,String>("llp-topic", "nickname","llp0");// 发送消息producer.send(record);System.out.println("消息发送成功");} catch (Exception e) {e.printStackTrace();}} finally {// 释放连接producer.close();}}}

测试结果

image-20240114161541282

image-20240114161554133

1.2同步发送

/*** 类说明:发送消息--同步模式*/
public class SynProducer {public static void main(String[] args) {// 设置属性Properties properties = new Properties();// 指定连接的kafka服务器的地址properties.put("bootstrap.servers","192.168.140.103:9092");// 设置String的序列化properties.put("key.serializer", StringSerializer.class);properties.put("value.serializer", StringSerializer.class);// 构建kafka生产者对象KafkaProducer<String,String> producer  = new KafkaProducer<String, String>(properties);try {ProducerRecord<String,String> record;try {// 构建消息record = new ProducerRecord<String,String>("llp-topic", "llp","同步发送");// 发送消息Future<RecordMetadata> future =producer.send(record);RecordMetadata recordMetadata = future.get();if(null!=recordMetadata){System.out.println("偏移量offset:"+recordMetadata.offset()+","+"分区partition:"+recordMetadata.partition());}System.out.println("消息发送成功");} catch (Exception e) {e.printStackTrace();}} finally {// 释放连接producer.close();}}}

测试结果

image-20240114162213572

image-20240114162239690

1.3 异步发送

/*** 类说明:发送消息--异步模式*/
public class AsynProducer {public static void main(String[] args) {// 设置属性Properties properties = new Properties();// 指定连接的kafka服务器的地址properties.put("bootstrap.servers","192.168.140.103:9092");// 设置String的序列化properties.put("key.serializer", StringSerializer.class);properties.put("value.serializer", StringSerializer.class);// 构建kafka生产者对象KafkaProducer<String,String> producer  = new KafkaProducer<String, String>(properties);try {ProducerRecord<String,String> record;try {// 构建消息record = new ProducerRecord<String,String>("llp-topic", "nickname","孙悟空");// 发送消息producer.send(record, new Callback() {public void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null){// 没有异常,输出信息到控制台System.out.println("偏移量offset:"+recordMetadata.offset()+"," +"分区partition:"+recordMetadata.partition());} else {// 出现异常打印e.printStackTrace();}}});} catch (Exception e) {e.printStackTrace();}} finally {// 释放连接producer.close();}}
}

测试结果

image-20240114162541172

image-20240114162600616

3. 消费者群组

Kafka里消费者从属于消费者群组,一个群组里的消费者订阅的都是同一个主题,每个消费者接收主题一部分分区的消息。

image-20240114170048830


image.png

如上图,主题T有4个分区,群组中只有一个消费者,则该消费者将收到主题T1全部4个分区的消息。


image.png

如上图,在群组中增加一个消费者2,那么每个消费者将分别从两个分区接收消息,上图中就表现为消费者1接收分区1和分区3的消息,消费者2接收分区2和分区4的消息。


image.png

如上图,在群组中有4个消费者,那么每个消费者将分别从1个分区接收消息。


image.png

但是,当我们增加更多的消费者,超过了主题的分区数量,就会有一部分的消费者被闲置,不会接收到任何消息。

往消费者群组里增加消费者是进行横向伸缩能力的主要方式。所以我们有必要为主题设定合适规模的分区,在负载均衡的时候可以加入更多的消费者。但是要记住,一个群组里消费者数量超过了主题的分区数量,多出来的消费者是没有用处的。

4.序列化

创建生产者对象必须指定序列化器,默认的序列化器并不能满足我们所有的场景。我们完全可以自定义序列化器。只要实现org.apache.kafka.common.serialization.Serializer接口即可。

  • 举个例子,比如我现在需要传输一个user类的实体对象

user实体类

/*** 类说明:实体类*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {private int id;private String name;public User(int id) {this.id = id;}
}

自定义序列化器,在生产者端进行指定

/*** 类说明:自定义序列化器*/
public class UserSerializer implements Serializer<User> {public void configure(Map<String, ?> configs, boolean isKey) {//do nothing}public byte[] serialize(String topic, User data) {try {byte[] name;int nameSize;if(data==null){return null;}if(data.getName()!=null){name = data.getName().getBytes("UTF-8");//字符串的长度nameSize = name.length;}else{name = new byte[0];nameSize = 0;}/*id的长度4个字节,字符串的长度描述4个字节,字符串本身的长度nameSize个字节*/ByteBuffer buffer = ByteBuffer.allocate(4+4+nameSize);buffer.putInt(data.getId());//4buffer.putInt(nameSize);//4buffer.put(name);//nameSizereturn buffer.array();} catch (Exception e) {throw new SerializationException("Error serialize User:"+e);}}public void close() {//do nothing}
}

反序列化器,在消费者端指定

/*** 类说明:自定义反序列化器*/
public class UserDeserializer implements Deserializer<User> {public void configure(Map<String, ?> configs, boolean isKey) {//do nothing}public User deserialize(String topic, byte[] data) {try {if(data==null){return null;}if(data.length<8){throw new SerializationException("Error data size.");}ByteBuffer buffer = ByteBuffer.wrap(data);int id;String name;int nameSize;id = buffer.getInt();nameSize = buffer.getInt();byte[] nameByte = new byte[nameSize];buffer.get(nameByte);name = new String(nameByte,"UTF-8");return new User(id,name);} catch (Exception e) {throw new SerializationException("Error Deserializer DemoUser."+e);}}public void close() {//do nothing}
}

生产者

/*** 类说明:发送消息--value使用自定义序列化*/
public class ProducerUser {public static void main(String[] args) {// 设置属性Properties properties = new Properties();// 指定连接的kafka服务器的地址properties.put("bootstrap.servers","127.0.0.1:9092");// 设置String的序列化properties.put("key.serializer", StringSerializer.class);// 设置value的自定义序列化properties.put("value.serializer", UserSerializer.class);// 构建kafka生产者对象KafkaProducer<String,User> producer  = new KafkaProducer<String, User>(properties);try {ProducerRecord<String,User> record;try {// 构建消息record = new ProducerRecord<String,User>("llp-user", "teacher",new User(1,"孙悟空"));// 发送消息producer.send(record);System.out.println("message is sent.");} catch (Exception e) {e.printStackTrace();}} finally {// 释放连接producer.close();}}
}

消费者

/*** 类说明:*/
public class ConsumerUser {public static void main(String[] args) {// 设置属性Properties properties = new Properties();// 指定连接的kafka服务器的地址properties.put("bootstrap.servers","127.0.0.1:9092");// 设置String的反序列化properties.put("key.deserializer", StringDeserializer.class);// 设置自定义的反序列化properties.put("value.deserializer", UserDeserializer.class);properties.put(ConsumerConfig.GROUP_ID_CONFIG,"ConsumerOffsets");// 构建kafka消费者对象KafkaConsumer<String,User> consumer = new KafkaConsumer<String, User>(properties);try {consumer.subscribe(Collections.singletonList("llp-user"));// 调用消费者拉取消息while(true){// 每隔1秒拉取一次消息ConsumerRecords<String, User> records= consumer.poll(Duration.ofSeconds(1));for(ConsumerRecord<String, User> record:records){String key = record.key();User user = record.value();System.out.println("接收到消息: key = " + key + ", value = " + user.toString());}}} finally {// 释放连接consumer.close();}}
}

测试结果

image-20240114172048655

image-20240114172058088

自定义序列化容易导致程序的脆弱性。举例,在我们上面的实现里,我们有多种类型的消费者,每个消费者对实体字段都有各自的需求,比如,有的将字段变更为long型,有的会增加字段,这样会出现新旧消息的兼容性问题。特别是在系统升级的时候,经常会出现一部分系统升级,其余系统被迫跟着升级的情况。

解决这个问题,可以考虑使用自带格式描述以及语言无关的序列化框架。比如Protobuf,Kafka官方推荐的Apache Avro

5.分区

因为在Kafka中一个topic可以有多个partition,所以当一个生产发送消息,这条消息应该发送到哪个partition,这个过程就叫做分区。

当然,我们在新建消息的时候,我们可以指定partition,只要指定partition,那么分区器的策略则失效。

image-20240114172357181

5.1 Kafka3种分区分配策略

在我们的代码中可以看到,生产者参数中是可以选择分区器的。

注意,在测试之前我修改了server.properties中每个主题默认的分区数

# 配置每个主题默认的分区数
num.partitions=4
1.DefaultPartitioner 默认分区策略

全路径类名:org.apache.kafka.clients.producer.internals.DefaultPartitioner

  • 如果消息中指定了分区,则使用它
  • 如果未指定分区但存在key,则根据序列化key使用murmur2哈希算法对分区数取模。
  • 如果不存在分区或key,则会使用粘性分区策略

采用默认分区的方式,键的主要用途有两个:

一,用来决定消息被写往主题的哪个分区,拥有相同键的消息将被写往同一个分区。

二,还可以作为消息的附加消息。

2.RoundRobinPartitioner 分区策略

全路径类名:org.apache.kafka.clients.producer.internals.RoundRobinPartitioner

  • 如果消息中指定了分区,则使用它
  • 将消息平均的分配到每个分区中。

即key为null,那么这个时候一般也会采用RoundRobinPartitioner

image-20240114181401389

3.UniformStickyPartitioner 纯粹的粘性分区策略

全路径类名:org.apache.kafka.clients.producer.internals.UniformStickyPartitioner

他跟DefaultPartitioner 分区策略的唯一区别就是。

DefaultPartitionerd 如果有key的话,那么它是按照key来决定分区的,这个时候并不会使用粘性分区
UniformStickyPartitioner 是不管你有没有key, 统一都用粘性分区来分配

另外关于粘性分区策略

image-20240114174631916

在producer.properties中配置

# 配置生产者发送消息之前延迟多长时间在进行发送,默认0
#linger.ms=# 对发送到分区的多个记录进行批处理时的默认批处理大小(以字节为单位)默认16K
#batch.size=# 配置缓冲区的总大小,默认32M
#buffer.memory=

当linger.ms 和 batch.size有一个条件满足时,kafka客户端就会立即推送消息到kafka服务

https://bbs.huaweicloud.com/blogs/348729?utm_source=oschina&utm_medium=bbs-ex&utm_campaign=other&utm_content=content

5.2自定义分区器

在kafka中,我们可以通过实现Partitioner接口来自定义分区规则

/*** 类说明:自定义分区器,以value值进行分区*/
public class SelfPartitioner implements Partitioner {public int partition(String topic, Object key, byte[] keyBytes,Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);int num = partitionInfos.size();// 来自DefaultPartitioner的处理,在存在key值时是根据key值去计算分区的Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;// 我们定义一个分区器,根据value值去做计算消息所属分区int parId = Utils.toPositive(Utils.murmur2(valueBytes)) % num;return parId;}public void close() {//do nothing}public void configure(Map<String, ?> configs) {//do nothing}}

生产者,指定自定义分区器

/*** 类说明:使用value值的分区器*/
public class SelfPartitionProducer {private static KafkaProducer<String,String> producer = null;public static void main(String[] args) {// 设置属性Properties properties = new Properties();// 指定连接的kafka服务器的地址properties.put("bootstrap.servers","127.0.0.1:9092");// 设置String的序列化properties.put("key.serializer", StringSerializer.class);properties.put("value.serializer", StringSerializer.class);// 设置自定义分区器properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, SelfPartitioner.class);// 构建kafka生产者对象KafkaProducer<String,String> producer  = new KafkaProducer<String, String>(properties);try {ProducerRecord<String,String> record;try {for (int i =0;i<10;i++){// 构建消息record = new ProducerRecord<String,String>("llp-topic1", "teacher","孙悟空"+i);// 发送消息Future<RecordMetadata> future =producer.send(record);RecordMetadata recordMetadata = future.get();if(null!=recordMetadata){System.out.println(i+","+"offset:"+recordMetadata.offset()+","+"partition:"+recordMetadata.partition());}}} catch (Exception e) {e.printStackTrace();}} finally {// 释放连接producer.close();}}}

消费者

public class GroupAConusmer1 {public static void main(String[] args) {// 设置属性Properties properties = new Properties();// 指定连接的kafka服务器的地址properties.put("bootstrap.servers","127.0.0.1:9092");// 设置String的反序列化properties.put("key.deserializer", StringDeserializer.class);properties.put("value.deserializer", StringDeserializer.class);properties.put(ConsumerConfig.GROUP_ID_CONFIG,"groupA");// 构建kafka消费者对象KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);try {consumer.subscribe(Collections.singletonList("llp-topic1"));// 调用消费者拉取消息while(true){// 每隔1秒拉取一次消息ConsumerRecords<String, String> records= consumer.poll(Duration.ofSeconds(1));for(ConsumerRecord<String, String> record:records){String key = record.key();String value = record.value();int partition =  record.partition();System.out.println("接收到消息:"+",partition ="+partition +", key = " + key + ", value = " + value);}}} finally {// 释放连接consumer.close();}}}

测试结果

image-20240114180846856

image-20240114180930552

6.生产缓冲机制

客户端发送消息给kafka服务器的时候、消息会先写入一个内存缓冲中,然后直到多条消息组成了一个Batch,才会一次网络通信把Batch发送过去。producer.properties主要有以下参数:

buffer.memory

设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果数据产生速度大于向broker发送的速度,导致生产者空间不足,producer会阻塞或者抛出异常。缺省33554432 (32M)

buffer.memory: 所有缓存消息的总体大小超过这个数值后,就会触发把消息发往服务器。此时会忽略batch.size和linger.ms的限制。
buffer.memory的默认数值是32 MB,对于单个 Producer 来说,可以保证足够的性能。 需要注意的是,如果您在同一个JVM中启动多个 Producer,那么每个 Producer 都有可能占用 32 MB缓存空间,此时便有可能触发 OOM。

batch.size

当多个消息被发送同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。当批次内存被填满后,批次里的所有消息会被发送出去。但是生产者不一定都会等到批次被填满才发送,半满甚至只包含一个消息的批次也有可能被发送(linger.ms控制)。缺省16384(16k) ,如果一条消息超过了批次的大小,会写不进去。

linger.ms

指定了生产者在发送批次前等待更多消息加入批次的时间。它和batch.size以先到者为先。也就是说,一旦我们获得消息的数量够batch.size的数量了,他将会立即发送而不顾这项设置,然而如果我们获得消息字节数比batch.size设置要小的多,我们需要“linger”特定的时间以获取更多的消息。这个设置默认为0,即没有延迟。设定linger.ms=5,例如,将会减少请求数目,但是同时会增加5ms的延迟,但也会提升消息的吞吐量。

为何要设计缓冲机制

1、减少IO的开销(单个 ->批次)但是这种情况基本上也只是linger.ms配置>0的情况下才会有,因为默认inger.ms=0的,所以基本上有消息进来了就发送了,跟单条发送是差不多!!

2、减少Kafka中Java客户端的GC。

比如缓冲池大小是32MB。然后把32MB划分为N多个内存块,比如说一个内存块是16KB(batch.size),这样的话这个缓冲池里就会有很多的内存块。

你需要创建一个新的Batch,就从缓冲池里取一个16KB的内存块就可以了,然后这个Batch就不断的写入消息

下次别人再要构建一个Batch的时候,再次使用缓冲池里的内存块就好了。这样就可以利用有限的内存,对他不停的反复重复的利用。因为如果你的Batch使用完了以后是把内存块还回到缓冲池中去,那么就不涉及到垃圾回收了。

image.png

7.消费者偏移量提交

一般情况下,我们调用poll方法的时候,broker返回的是生产者写入Kafka同时kafka的消费者提交偏移量,这样可以确保消费者消息消费不丢失也不重复,所以一般情况下Kafka提供的原生的消费者是安全的,但是事情会这么完美吗?

自动提交

最简单的提交方式是让消费者自动提交偏移量。 如果enable.auto.commit被设为 true,消费者会自动把从poll()方法接收到的最大偏移量提交上去。提交时间间隔由auto.commit.interval.ms控制,默认值是5s。

自动提交是在轮询里进行的,消费者每次在进行轮询时会检査是否该提交偏移量了,如果是,那么就会提交从上一次轮询返回的偏移量。

不过,在使用这种简便的方式之前,需要知道它将会带来怎样的结果。

假设我们仍然使用默认的5s提交时间间隔, 在最近一次提交之后的3s发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了3s,所以在这3s内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量, 减小可能出现重复消息的时间窗, 不过这种情况是无法完全避免的。

在使用自动提交时,每次调用轮询方法都会把上一次调用返回的最大偏移量提交上去,它并不知道具体哪些消息已经被处理了,所以在再次调用之前最好确保所有当前调用返回的消息都已经处理完毕(enable.auto.comnit被设为 true时,在调用 close()方法之前也会进行自动提交)。一般情况下不会有什么问题,不过在处理异常或提前退出轮询时要格外小心。

/*** 类说明:消费者 手动提交:  自定义的方式*/
public class ConsumerSpecial {public static void main(String[] args) {// 设置属性Properties properties = new Properties();// 指定连接的kafka服务器的地址properties.put("bootstrap.servers","127.0.0.1:9092");// 设置String的反序列化properties.put("key.deserializer", StringDeserializer.class);properties.put("value.deserializer", StringDeserializer.class);properties.put(ConsumerConfig.GROUP_ID_CONFIG,"ConsumerOffsets");/*取消自动提交*/properties.put("enable.auto.commit",false);// 构建kafka消费者对象KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);//当前偏移量Map<TopicPartition, OffsetAndMetadata> currOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();int count = 0;try {consumer.subscribe(Collections.singletonList("llp-topic1"));// 调用消费者拉取消息while(true){// 每隔1秒拉取一次消息ConsumerRecords<String, String> records= consumer.poll(Duration.ofSeconds(1));for(ConsumerRecord<String, String> record:records){String key = record.key();String value = record.value();System.out.println("接收到消息: key = " + key + ", value = " + value);currOffsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()+1,"no meta"));count++;if(count%10==0){//每10条提交一次(特定的偏移量),下一次10条数据会接着在上一次提交的偏移量后进行消费consumer.commitAsync(currOffsets,null);}}}}catch (CommitFailedException e) {System.out.println("Commit failed:");e.printStackTrace();}finally {try {consumer.commitSync(currOffsets);//同步提交(这里也可以)} finally {consumer.close();}}}}
消费者的配置参数

auto.offset.reset

earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

只要group.Id不变,不管auto.offset.reset 设置成什么值,都从上一次的消费结束的地方开始消费。

/*** 类说明:消费者入门:自动提交的消费模式*/
public class GroupBConusmer1 {public static void main(String[] args) {// 设置属性Properties properties = new Properties();// 指定连接的kafka服务器的地址properties.put("bootstrap.servers","127.0.0.1:9092");// 设置String的反序列化properties.put("key.deserializer", StringDeserializer.class);properties.put("value.deserializer", StringDeserializer.class);properties.put(ConsumerConfig.GROUP_ID_CONFIG,"groupB");properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");  //最早:从头开始消费//properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest"); // 已提交的offset开始消费// 构建kafka消费者对象KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);try {consumer.subscribe(Collections.singletonList("llp-topic1"));// 调用消费者拉取消息while(true){// 每隔1秒拉取一次消息ConsumerRecords<String, String> records= consumer.poll(Duration.ofSeconds(1));for(ConsumerRecord<String, String> record:records){String key = record.key();String value = record.value();int partition =  record.partition();System.out.println("接收到消息:"+",partition ="+partition +", key = " + key + ", value = " + value);}}} finally {// 释放连接consumer.close();}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/629807.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

笔试面试题——继承和多态

&#x1f4d8;北尘_&#xff1a;个人主页 &#x1f30e;个人专栏:《Linux操作系统》《经典算法试题 》《C》 《数据结构与算法》 ☀️走在路上&#xff0c;不忘来时的初心 文章目录 一、什么是多态&#xff1f;二、什么是重载、重写(覆盖)、重定义(隐藏)&#xff1f;三、 inli…

QT发布成exe不能运行解决方案

原因 qt发布成exe后不会把依赖的dll自动拷贝到文件夹中. 解决方案&#xff1a; 输入&#xff1a;windeployqt 拖拉 生产的exe到命令行. 会自动copy依赖到文件夹中&#xff1a; 然后就可以单击运行了&#xff01;

电路原理1-线性电阻

前言&#xff1a;整理笔记基于清华大学于歆杰老师的《电路原理》&#xff0c;电路原理是基于无源负载和电源组成电路的分析方法。 1.基础数学知识 算术&#xff1a;数字之间的运算 代数&#xff1a;用变量和函数来代替数字 微积分&#xff1a;描述函数的累积效应&#xff0…

贝叶斯方法家族

贝叶斯方法 机器学习框架贝叶斯方法贝叶斯和其他推断方法的区别朴素贝叶斯分类五个 NB 分类器 贝叶斯推断马尔科夫-蒙特卡洛方法变分推断隐马尔科夫模型 贝叶斯网络贝叶斯置信网络 贝叶斯深度学习贝叶斯神经网络贝叶斯卷积神经网络贝叶斯图神经网络贝叶斯优化方法 机器学习框架…

shell简单截取curl GET返回的body消息体

目录 需求背景&#xff1a; 示例&#xff1a; 解决方式&#xff1a; 需求背景&#xff1a; 用shell解析 curl命令GET到的消息体&#xff0c;获取body消息体里的某个字段的值,只是个简单的示例&#xff0c;可以在此基础上更改满足自己的需求 示例&#xff1a; curl一个API…

纳米软件射频测试系统助力放大器静态参数测试

放大器是可以把输入讯号的电压或功率放大&#xff0c;增加信号幅度的一种装置&#xff0c;可以提高信号质量、增加功率、提高灵敏度等&#xff0c;被应用于通讯、广播、雷达、电视、自动控制等领域。对放大器进行测试是为了确保放大器的性能和稳定性&#xff0c;以保证其可以正…

【Linux】nc 网络诊断 | 文件传输 命令详解

目录 一、命令简介 二、命令使用 2.1 测试服务器 2.2 端口连通性测试 2.2.1tcp端口连通性测试 2.2.2udp端口连通性测试 2.3 文件及目录的传输 2.3.1 文件传输(TCP端口) 2.3.2 文件传输(UDP端口) 相关文章&#xff1a; 【网络】抓包工具Wireshark下载安装和基本使用教…

SpringBoot参数校验@Validated、@Valid

SpringBoot参数校验Validated、Valid&#xff08;javax.validation&#xff09; 一、应用场景 在实际开发中&#xff0c;前端校验并不安全&#xff0c;任何人都可以通过接口来调用我们的服务&#xff0c;就算加了一层token的校验&#xff0c;有心人总会转空子&#xff0c;来传…

Linux下安装jdk、tomcat

linux下安装jdk、tomcat 一、linux下安装jdk1.1.下载Linux版本的JDK1.2.Linux安装JDk1.3.设置环境变量1.4.卸载JDK 二、linux下安装tomcat2.1.下载Linux版本的Tomcat2.2.在usr目录下新建tomcat目录2.3.进入到tomcat目录中解压下载的tomcat安装包2.4.配置环境变量-前提是已经安装…

助力工业园区作业违规行为检测预警,基于YOLOv3开发构建工业园区场景下作业人员违规行为检测识别系统

在很多工业园区生产作业场景下保障合规合法进行作业生产操作&#xff0c;对于保护工人生命安全降低安全隐患有着非常重要的作用&#xff0c;但是往往在实际的作业生产中&#xff0c;因为一个安全观念的淡薄或者是粗心大意&#xff0c;对于纪律约束等意思薄弱&#xff0c;导致在…

unity C#什么是线程锁,以及使用案例

文章目录 原理1. **互斥**&#xff08;Mutual Exclusion&#xff09;:2. **缓存一致性与内存屏障**&#xff1a;3. **操作系统的支持**&#xff1a;4. **编程语言级别的实现**&#xff1a;5. **避免死锁**&#xff1a;图示 实例1实例2 原理 线程锁的原理主要是为了在多线程环境…

AI对决:ChatGPT与文心一言的比较

. 个人主页&#xff1a;晓风飞 专栏&#xff1a;数据结构|Linux|C语言 路漫漫其修远兮&#xff0c;吾将上下而求索 文章目录 引言ChatGPT与文心一言的比较Chatgpt的看法文心一言的看法Copilot的观点chatgpt4.0的回答 模型的自我评价自我评价 ChatGPT的优势在这里插入图片描述 文…

【送书活动七期】CMeet系列 技术生态沙龙:技术人职业交流会·杭州场-转鸿蒙 对应用开发来说是否是职业发展新机会

CSDN致力于促进城市区域内尖端新兴技术开发者交流,提供开放自由的切磋平台。在近期热议的话题中,“华为鸿蒙系统不再兼容安卓应用”的消息成了程序员们广泛关注并引发思考的问题。 因此便有了我们此次的活动探讨议题! 目录 题外话开场简单介绍活动主办方介绍活动话题讨论升职加…

微信小程序-----全局配置与页面配置

目录 前言 全局配置文件 一、window 1. 小程序窗口的组成部分 2. window 节点常用的配置项 3. 设置导航栏的标题 4. 设置导航栏的背景色 5. 设置导航栏的标题颜色 6. 全局开启下拉刷新功能 7. 设置下拉刷新时窗口的背景色 8. 设置下拉刷新时 loading 的样式 9. 设置…

dolphinscheduler分布式集群部署指南(小白版)

1.Apache DolphinScheduler概述 官方文档地址&#xff1a;https://dolphinscheduler.apache.org/zh-cn/docs/3.1.9 1.1.DolphinScheduler简介 摘自官网&#xff1a;Apache DolphinScheduler 是一个分布式易扩展的可视化DAG工作流任务调度开源系统。适用于企业级场景&#xf…

Hcie datacom实验手册哪里下载!

一、官方下载 首先&#xff0c;最直接的方式就是从华为官方网站下载Hcie Datacom实验手册。作为华为认证体系的核心资料&#xff0c;官方下载的内容是最全面、最准确的。您只需要访问华为官方网站&#xff0c;在搜索框中输入“Hcie Datacom实验手册”&#xff0c;即可找到相应…

十、Qt 操作PDF文件

《一、QT的前世今生》 《二、QT下载、安装及问题解决(windows系统)》《三、Qt Creator使用》 ​​​ 《四、Qt 的第一个demo-CSDN博客》 《五、带登录窗体的demo》 《六、新建窗体时&#xff0c;几种窗体的区别》 《七、Qt 信号和槽》 《八、Qt C 毕业设计》 《九、Qt …

考研英语打卡

[爱心]长难句分享第三十一天解析 [玫瑰]【词汇】&#xff1a;• astonishing [əˈstɑːnɪʃɪŋ] adj. 令人惊讶的• purchase [ˈpɜːrtʃəs] n. 购买• upmarket [ˌʌpˈmɑːrkɪt] adj. 高级的• grocery [ˈɡroʊsəri] n. 食品杂货店• chain [tʃeɪn] n. 连锁店…

【Linux驱动】Linux的中断系统 | 中断的重要数据结构

&#x1f431;作者&#xff1a;一只大喵咪1201 &#x1f431;专栏&#xff1a;《Linux驱动》 &#x1f525;格言&#xff1a;你只管努力&#xff0c;剩下的交给时间&#xff01; 目录 &#x1f3c0;Linux系统的中断⚽中断分类软中断和硬中断中断的上半部和下半部 ⚽tasklet⚽工…

普冉32位单片机 PY32C642,M0+内核,1.7 V ~ 5.5 V宽工作电压

PY32C642 单片机采用高性能的 32 位 ARM Cortex-M0内核&#xff0c;宽电压工作范围。嵌入 24Kbytes Flash 和 3 Kbytes SRAM 存储器&#xff0c;最高工作频率 24 MHz。包含多种不同封装类型产品。工作温度范围为-40C ~ 85C&#xff0c;工作电压范围 1.7 V ~ 5.5 V。1 路 12 位A…