Kafka之【生产消息】

消息(Record)

在kafka中传递的数据我们称之为消息(message)或记录(record),所以Kafka发送数据前,需要将待发送的数据封装为指定的数据模型:
在这里插入图片描述

在这里插入图片描述

相关属性必须在构建数据模型时指定,其中主题和value的值是必须要传递的。如果配置中开启了自动创建主题,那么Topic主题可以不存在。value就是我们需要真正传递的数据了,而在未指定分区器或者未指定分区得情况下,Key可以用于数据的分区定位


根据前面提供的配置信息创建生产者对象,通过这个生产者对象向Kafka服务器节点发送数据,而具体的发送是由生产者对象创建时,内部构建的多个组件实现的,多个组件的关系有点类似于生产者消费者模式。

生产者(Producer)是一个关键组件,负责将消息发送到Kafka集群。Kafka生产者主要由三个核心部分组成:

  • KafkaProducer
  • RecordAccumulator
  • Sender

在这里插入图片描述


数据生产者(KafkaProducer)

作用:

KafkaProducer是生产者客户端的核心接口,为生产者对象,用于对我们的数据进行必要的转换和处理,将处理后的数据放入到数据收集器中,类似于生产者消费者模式下的生产者,负责提供向Kafka集群发布消息的功能。

组成:

KafkaProducer由以下关键部分组成:

  • 配置(ProducerConfig):用于初始化和配置生产者客户端的参数。
  • 拦截器(Interceptor):用于在消息发送前后进行自定义处理。如果配置拦截器栈(interceptor.classes),那么将数据进行拦截处理。某一个拦截器出现异常并不会影响后续的拦截器处理。
  • 序列化器(Serializer):因为发送的数据为KV数据,所以需要根据配置信息中的序列化对象对数据中Key和Value分别进行序列化处理。
  • 分区器(Partitioner):计算数据所发送的分区位置。

工作流程:

  • 初始化:根据配置参数初始化客户端,包括序列化器、分区器等。
  • 消息发送:消息经过拦截器处理、序列化、分区选择后,放入RecordAccumulator中。
  • 事务支持:处理事务消息的发送和事务边界(如果配置了事务,如幂等)。

数据收集器(RecordAccumulator)

作用:

RecordAccumulator用于收集,转换我们产生的数据,类似于生产者消费者模式下的缓冲区。为了优化数据的传输,Kafka并不是生产一条数据就向Broker发送一条数据,而是通过合并单条消息,进行批量(批次)发送,提高吞吐量,减少带宽消耗。

组成:

RecordAccumulator由以下部分组成:

  • 内存缓冲区(BufferPool):管理消息缓冲区的内存分配。
  • 消息队列(Deque):每个分区对应一个消息队列,用于存储批次消息。
  • 批次(ProducerBatch):用于存储一批待发送的消息。

内部工作:

  • 默认情况下,一个发送批次的数据容量为16K,这个可以通过参数batch.size进行改善。
  • 批次是和分区进行绑定的。也就是说发往同一个分区的数据会进行合并,形成一个批次。
  • 将消息追加到对应分区的批次中,如果当前批次已满达到时间限制,创建新的批次。
  • 这个队列使用的是Java的双端队列Deque。旧的批次关闭不再接收新的数据,等待发送

重要参数:

  • batch.size:每个批次的大小,默认16K。
  • linger.ms:发送前的等待时间限制,默认0s。
  • buffer.memory:内存缓冲区的总大小默认32M。

数据发送器(Sender)

作用:

Sender是一个后台线程,负责从RecordAccumulator中取出消息批次,向服务节点发送。类似于生产者消费者模式下的消费者。因为是线程对象,所以启动后会不断轮询获取数据收集器中已经关闭的批次数据。对批次进行整合后再发送到Broker节点中

组成:

Sender由以下部分组成:

  • 网络客户端(NetworkClient):负责与Kafka Broker进行网络通信。
  • 元数据管理(Metadata):获取和更新Kafka集群的元数据信息。
  • 请求管理(ClientRequest/ClientResponse):管理发送的请求和接收的响应。

内部工作:

  • 因为数据真正发送的地方是Broker节点,不是分区。所以需要将从数据收集器中收集到的批次数据按照可用Broker节点重新组合成List集合。
  • 将组合后的<节点,List<批次>>的数据封装成客户端请求发送到网络客户端对象的缓冲区,由网络客户端对象通过网络发送给Broker节点。
  • Broker节点获取客户端请求,并根据请求键进行后续的数据处理:向分区中增加数据。

重要参数:

  • retries:重试次数。
  • retry.backoff.ms:重试间隔时间。
  • request.timeout.ms:请求超时时间。

协作机制

  1. 消息发送流程

    • 用户通过KafkaProducer.send()方法发送消息。
    • 消息经过序列化、分区选择和拦截器处理后,进入RecordAccumulator
    • RecordAccumulator将消息存入对应分区的消息队列中,形成批次。
  2. 消息传输流程

    • Sender后台线程不断从RecordAccumulator中取出已准备好的消息批次。
    • Sender通过NetworkClient将消息批次发送到Kafka Broker。
    • 如果发送成功,Sender接收响应并通知KafkaProducer;如果发送失败,根据重试策略进行重试。

通过这种协作机制,Kafka生产者实现了高效、可靠的消息发送。KafkaProducer负责接口和配置管理,RecordAccumulator负责消息缓存和批量处理,Sender负责消息的实际传输和重试逻辑。


生产者代码

// TODO 配置属性集合
Map<String, Object> configMap = new HashMap<>();
// TODO 配置属性:Kafka服务器集群地址
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// TODO 配置属性:Kafka生产的数据为KV对,所以在生产数据进行传输前需要分别对K,V进行对应的序列化操作
configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
// TODO 创建Kafka生产者对象,建立Kafka连接
//      构造对象时,需要传递配置参数
KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
// TODO 准备数据,定义泛型
//      构造对象时需要传递 【Topic主题名称】,【Key】,【Value】三个参数
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key1", "value1"
);
// TODO 生产(发送)数据
producer.send(record);
// TODO 关闭生产者连接
producer.close();

拦截器

生产者API在数据准备好发送给Kafka服务器之前,允许我们对生产的数据进行统一的处理,比如校验,整合数据等等。这些处理我们是可以通过Kafka提供的拦截器完成。因为拦截器不是生产者必须配置的功能,所以可以根据实际的情况自行选择使用。

但是要注意,这里的拦截器是可以配置多个的。执行时,会按照声明顺序执行完一个后,再执行下一个。并且某一个拦截器如果出现异常,只会跳出当前拦截器逻辑,并不会影响后续拦截器的处理。所以开发时,需要将拦截器的这种处理方法考虑进去。

在这里插入图片描述

自定义拦截器

要想自定义拦截器,只需要创建一个类,然后实现Kafka提供的分区类接口ProducerInterceptor,接下来重写方法。这里我们只关注onSend方法即可。


import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;/*** TODO 自定义数据拦截器*      1. 实现Kafka提供的生产者接口ProducerInterceptor*      2. 定义数据泛型 <K, V>*      3. 重写方法*         onSend*         onAcknowledgement*         close*         configure*/
public class KafkaInterceptorMock implements ProducerInterceptor<String, String> {@Override// 数据发送前,会执行此方法,进行数据发送前的预处理public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {return record;}@Override	// 数据发送后,获取应答时,会执行此方法public void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Override// 生产者关闭时,会执行此方法,完成一些资源回收和释放的操作public void close() {}@Override// 创建生产者对象的时候,会执行此方法,可以根据场景对生产者对象的配置进行统一修改或转换。public void configure(Map<String, ?> configs) {}
}

使用拦截器

// 仅需在配置properties的时候,指定自定义拦截器器即可
configMap.put( ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, KafkaInterceptorMock.class.getName());

同步发送和异步发送

1. 异步发送

如果Kafka通过主线程代码将一条数据放入到缓冲区后,无需等待数据的后续发送过程,就直接发送一下条数据的场合,我们就称之为异步发送。
在这里插入图片描述

import org.apache.kafka.clients.producer.*;import java.util.HashMap;
import java.util.Map;public class KafkaProducerASynTest {public static void main(String[] args) {Map<String, Object> configMap = new HashMap<>();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);// TODO 循环生产数据for ( int i = 0; i < 10; i++ ) {// TODO 创建数据ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);// TODO 发送数据producer.send(record, new Callback() {// TODO 回调对象public void onCompletion(RecordMetadata recordMetadata, Exception e) {// TODO 当数据发送成功后,会回调此方法System.out.println("数据发送成功:" + recordMetadata.timestamp());}});// TODO 发送当前数据System.out.println("发送数据");}producer.close();}
}

2. 同步发送

如果Kafka通过主线程代码将一条数据放入到缓冲区后,需等待数据的后续发送操作的应答状态,才能发送一下条数据的场合,我们就称之为同步发送。所以这里的所谓同步,就是生产数据的线程需要等待发送线程的应答(响应)结果。

在这里插入图片描述

import org.apache.kafka.clients.producer.*;import java.util.HashMap;
import java.util.Map;public class KafkaProducerASynTest {public static void main(String[] args) throws Exception {Map<String, Object> configMap = new HashMap<>();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);// TODO 循环生产数据for ( int i = 0; i < 10; i++ ) {// TODO 创建数据ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);// TODO 发送数据producer.send(record, new Callback() {// TODO 回调对象public void onCompletion(RecordMetadata recordMetadata, Exception e) {// TODO 当数据发送成功后,会回调此方法System.out.println("数据发送成功:" + recordMetadata.timestamp());}}).get();// TODO 发送当前数据System.out.println("发送数据");}producer.close();}
}

分区器

  1. 如果指定了分区,直接使用
  2. 如果指定了自己的分区器,通过分区器计算分区编号,如果有效,直接使用
  3. 如果指定了数据Key,且使用Key选择分区的场合,采用murmur2非加密散列算法(类似于hash)计算数据Key序列化后的值的散列值,然后对主题分区数量模运算取余,最后的结果就是分区编号。hash(key)%numPartitions = 分区号
  4. 如果未指定数据Key,或不使用Key选择分区,那么Kafka会自动分区

自定义分区器

只需要创建一个类,然后实现Kafka提供的分区类接口Partitioner,接下来重写方法。这里我们只关注partition方法即可,因为此方法的返回结果就是需要的分区编号。

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;/*** TODO 自定义分区器实现步骤:*      1. 实现Partitioner接口*      2. 重写方法*         partition : 返回分区编号,从0开始*         close*         configure*/
public class KafkaPartitionerMock implements Partitioner {/*** 分区算法 - 根据业务自行定义即可* @param topic The topic name* @param key The key to partition on (or null if no key)* @param keyBytes The serialized key to partition on( or null if no key)* @param value The value to partition on or null* @param valueBytes The serialized value to partition on or null* @param cluster The current cluster metadata* @return 分区编号,从0开始*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {return 0;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

使用分区器

// 仅需在配置properties的时候,指定自定义分区器即可
configMap.put( ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaPartitionerMock.class.getName());

消息可靠性(Acknowledgement)

ACK = 0

当生产数据时,生产者对象将数据通过网络客户端将数据发送到网络数据流中的时候,Kafka就对当前的数据请求进行了响应(确认应答),如果是同步发送数据,此时就可以发送下一条数据了。如果是异步发送数据,回调方法就会被触发。

ACK = 1

当生产数据时,Kafka Leader副本将数据接收到并写入到了日志文件后,就会对当前的数据请求进行响应(确认应答),如果是同步发送数据,此时就可以发送下一条数据了。如果是异步发送数据,回调方法就会被触发。

ACK = -1/all (默认) 最可靠

当生产数据时,Kafka Leader副本和Follower副本都已经将数据接收到并写入到了日志文件后,再对当前的数据请求进行响应(确认应答),如果是同步发送数据,此时就可以发送下一条数据了。如果是异步发送数据,回调方法就会被触发。


数据重试导致的数据重复

由于网络或服务节点的故障,Kafka在传输数据时,可能会导致数据丢失,所以我们才会设置ACK应答机制,尽可能提高数据的可靠性。

  1. 在某些场景中,数据并不是真正地丢失,比如将ACK应答设置为1,Leader副本将数据写入文件后,Kafka就可以对请求进行响应。
  2. 此时,假设网络故障的原因,Kafka并没有成功将ACK应答信息发送给Producer,那么此时对于Producer来讲,以为kafka没有收到数据,所以就会一直等待响应,一旦超过某个时间阈值,就会发生超时错误,也就是说在Kafka Producer眼里,数据已经丢了
  3. 所以在这种情况下,kafka Producer会尝试对超时的请求数据进行重试(retry)操作。通过重试操作尝试将数据再次发送给Kafka。
  4. 如果此时发送成功,Kafka就又收到了数据,两条数据一样,也就是说数据的重复。

数据乱序

数据重试(retry)功能除了可能会导致数据重复以外,还可能会导致数据乱序。

  1. 假设需要将编号为1,2,3的三条连续数据发送给Kafka。
  2. 如果在发送过程中,1因为网络原因发送失败,2、3发生成功
  3. 则此时在Broker的缓存中,为消息2、3
  4. 生产者重发消息1
  5. 则此时在Broker的缓存中,为消息2、3、1

这就产生了数据的乱序


幂等

为了解决Kafka传输数据时,所产生的数据重复和乱序问题,Kafka引入了幂等性操作,所谓的幂等性,就是Producer同样的一条数据,无论向Kafka发送多少次,kafka都只会存储一条。注意,这里的同样的一条数据,指的不是内容一致的数据,而是指的不断重试的数据。

// 幂等需要手动开启
enable.idempotence配置为true
  1. 开启幂等性后,为了保证数据不会重复,那么就需要给每一个请求批次的数据增加唯一性标识,kafka中,这个标识采用的是连续的序列号数字sequencenum,但是不同的生产者Producer可能序列号是一样的,所以仅仅靠seqnum还无法唯一标记数据,所以还需要同时对生产者进行区分,所以Kafka采用申请生产者ID(producerid)的方式对生产者进行区分。这样,在发送数据前,我们就需要提前申请producerid以及序列号sequencenum

  2. Broker中会给每一个分区记录生产者的生产状态:采用队列的方式缓存最近的5个批次数据。队列中的数据按照seqnum进行升序排列。这里的数字5是经过压力测试,均衡空间效率和时间效率所得到的值,所以为固定值,无法配置且不能修改。

  3. 如果Borker当前新的请求批次数据在缓存的5个旧的批次中存在相同的,如果有相同的,那么说明有重复,当前批次数据不做任何处理。
    在这里插入图片描述

  4. 如果Broker当前的请求批次数据在缓存中没有相同的,那么判断当前新的请求批次的序列号是否为缓存的最后一个批次的序列号加1,如果是,说明是连续的,顺序没乱。那么继续,如果不是,那么说明数据已经乱了,发生异常。
    在这里插入图片描述

  5. Broker根据异常返回响应,通知Producer进行重试。Producer重试前,需要在缓冲区中将数据重新排序,保证正确的顺序后。再进行重试即可。
    在这里插入图片描述

  6. 如果请求批次不重复,且有序,那么更新缓冲区中的批次数据。将当前的批次放置再队列的结尾,将队列的第一个移除,保证队列中缓冲的数据最多5个。

从上面的流程可以看出,Kafka的幂等性是通过消耗时间和性能的方式提升了数据传输的有序和去重,在一些对数据敏感的业务中是十分重要的。但是通过原理,咱们也能明白,这种幂等性还是有缺陷的

  • 幂等性的producer仅做到单分区上的幂等性,即单分区消息有序不重复,多分区无法保证幂等性。
  • 只能保持生产者单个会话的幂等性,无法实现跨会话的幂等性(也就是说如果一个producer挂掉再重启,那么重启前和重启后的producer对象会被当成两个独立的生产者,从而获取两个不同的独立的生产者ID,导致broker端无法获取之前的状态信息,所以无法实现跨会话的幂等)
  • 要想解决这个问题,就需要采用事务功能。

什么是生产者事务

在Kafka中,生产者事务(Producer Transactions)允许生产者以原子方式向一个或多个主题写入消息。事务可以确保消息要么全部成功写入,要么全部失败,从而保证数据的一致性。下面详细解释Kafka中生产者事务的原理:

事务组件

在Kafka中,事务涉及以下几个组件:

  • Transactional Producer(事务生产者):负责在事务中写入消息。
  • Transaction Coordinator(事务协调器):管理事务的生命周期,包括开始事务、提交事务和中止事务。
  • Broker(代理):Kafka集群中的服务器,存储消息并协助协调事务。

事务处理的详细步骤

  1. 初始化事务:事务生产者向事务协调器注册,事务协调器为该生产者分配一个唯一的Transactional IDProducer Epoch
  2. 开始事务:生产者调用beginTransaction时,事务协调器记录事务开始状态。
  3. 发送消息:生产者发送消息时,消息会标记为“待处理”状态并包含事务ID和epoch。
  4. 提交事务:生产者调用commitTransaction时,事务协调器将事务状态更新为“提交中”,然后通知所有相关分区代理提交消息。
  5. 完成提交:所有分区代理确认消息已写入日志后,事务协调器更新事务状态为“已提交”,通知生产者事务完成。
  6. 中止事务:在任何步骤发生错误时,生产者可以调用abortTransaction,事务协调器将事务状态更新为“中止中”,通知相关分区代理丢弃消息,最后更新事务状态为“已中止”。

实现事务的要点

  • 幂等性:确保消息的幂等性,以避免重复写入。Kafka通过Producer IDSequence Number实现幂等性。
  • 协调和日志:事务协调器负责管理事务状态,事务日志记录事务的所有状态变化。
  • 原子提交:确保事务提交的原子性,通过两阶段提交协议(2PC)实现。
    1. 第一阶段:预提交

      • 生产者发送消息时,代理记录Producer ID和Sequence Number。
      • 消息标记为“待提交”。
    2. 第二阶段:提交或中止

      • 当生产者提交事务时,事务协调器通知代理将消息标记为“已提交”。
      • 如果事务中止,代理丢弃消息,不进行处理。
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;public class ProducerTransactionTest {public static void main(String[] args) {Map<String, Object> configMap = new HashMap<>();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configMap.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// TODO 配置幂等性configMap.put( ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);// TODO 配置事务IDconfigMap.put( ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-tx-id");// TODO 配置事务超时时间configMap.put( ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 5);// TODO 创建生产者对象KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);// TODO 初始化事务producer.initTransactions();try {// TODO 启动事务producer.beginTransaction();// TODO 生产数据for ( int i = 0; i < 10; i++ ) {ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key" + i, "value" + i);final Future<RecordMetadata> send = producer.send(record);}// TODO 提交事务producer.commitTransaction();} catch ( Exception e ) {e.printStackTrace();// TODO 终止事务producer.abortTransaction();}// TODO 关闭生产者对象producer.close();}
}

如何确保跨会话中的幂等(生产者崩溃后,事务恢复生产者,即为跨会话)

确保幂等性的过程主要依赖于Kafka的Producer ID、Producer Epoch和Sequence Number机制。以下是详细的过程描述:

1. 初始生产者启动和消息发送

  1. 生产者初始化

    • 生产者在初始化时,Kafka为其分配一个唯一的Producer ID (PID)。
    • Producer ID标识当前生产者实例。
  2. 开始发送消息

    • 生产者发送消息时,每个消息附带一个Sequence Number。
    • Sequence Number在每个分区内是递增的,用于标识消息的顺序。

2. 记录Producer ID和Sequence Number

  • Kafka代理(Broker)会记录每个分区的Producer ID和最新的Sequence Number。
  • 每条消息发送时,代理会检查当前Producer ID和Sequence Number,确保消息的顺序和唯一性。

3. 生产者崩溃和重启

  1. 生产者崩溃

    • 假设生产者在事务过程中崩溃。
  2. 生产者重启

    • 生产者重启后,使用相同的Transactional ID重新初始化。
    • Kafka为重启的生产者分配一个新的Producer ID。
    • 同时,Producer Epoch递增,标识这是生产者的一个新的会话。

4. 恢复未完成的事务

  • 事务协调器:负责管理事务状态,恢复未完成的事务。
  • 当生产者重启并调用initTransactions时,事务协调器会:
    • 检查与Transactional ID相关的未完成事务。
    • 根据事务日志记录,决定是提交还是中止这些事务。

5. 幂等性保障机制

  1. 新的Producer ID和递增的Producer Epoch

    • Kafka代理识别新的Producer ID和递增的Producer Epoch,确保重启后的生产者实例与之前的实例区分开。
  2. 更新记录

    • 代理更新记录新的Producer ID和Sequence Number。
    • 新的Producer ID和递增的Sequence Number确保消息不重复处理。
  3. 消息去重

    • 代理通过检查Producer ID和Sequence Number,确保同一个Producer ID的消息不会被处理两次。
    • 即使生产者在重启后重新发送消息,代理能够识别并忽略重复的消息。

6. 事务性消息处理

  1. 第一阶段:预提交

    • 生产者发送消息时,代理记录Producer ID和Sequence Number。
    • 消息标记为“待提交”。
  2. 第二阶段:提交或中止

    • 当生产者提交事务时,事务协调器通知代理将消息标记为“已提交”。
    • 如果事务中止,代理丢弃消息,不进行处理。

总结

通过以上机制,Kafka确保生产者在不同会话中的幂等性:

  • Producer ID和Producer Epoch:标识生产者实例和会话阶段。
  • Sequence Number:确保每个分区中的消息顺序和唯一性。
  • 事务协调器:管理事务状态,确保事务的一致性和原子性。

即使生产者崩溃并重启,通过这些机制,Kafka能够保证消息的幂等性和事务一致性,避免重复处理消息,确保数据可靠性和一致性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/14567.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JavaEE技术之分布式事务(理论、解决方案、Seata解决分布式事务问题、Seata之原理简介、断点查看数据库表数据变化)

文章目录 JavaEE技术之分布式事务准备:1. 本地事务回顾1.1 什么是事务1.2 事务的作用1.3 事务ACID四大特性1.4 事务的并发问题1.5 MySQL事务隔离级别1.6 事务相关命令(了解)1.7 事务传播行为&#xff08;propagation behavior&#xff09;1.8 伪代码练习1.9 回滚策略1.10 超时事…

144.栈和队列:有效的括号(力扣)

题目描述 代码解决 class Solution { public:bool isValid(string s) {// 如果字符串长度为奇数&#xff0c;不可能是有效的括号字符串if(s.size() % 2 ! 0) return false;// 使用栈来存放括号stack<char> st;// 遍历字符串中的每一个字符for(int i 0; i < s.size();…

Error:(6, 43) java: 程序包org.springframework.data.redis.core不存在

目录 一、在做SpringBoot整合Redis的项目时&#xff0c;报错&#xff1a; 二、尝试 三、解决办法 一、在做SpringBoot整合Redis的项目时&#xff0c;报错&#xff1a; 二、尝试 给依赖加版本号&#xff0c;并且把版本换了个遍&#xff0c;也不行&#xff0c;也去update过ma…

Parasoft C++Test软件静态分析操作指南_软件质量度量

系列文章目录 Parasoft CTest软件安装指南 Parasoft CTest软件静态分析操作指南_编码规范/标准检查 Parasoft CTest软件静态分析操作指南_软件质量度量 Parasoft CTest软件静态分析_自动提取静态分析数据生成文档 Parasoft CTest软件单元测试_操作指南 Parasoft CTest软件单元…

C语言章节学习归纳--数据类型、运算符与表达式

3.1 C语言的数据类型&#xff08;理解&#xff09; 首先&#xff0c;对变量的定义可以包括三个方面&#xff1a; 数据类型 存储类型 作用域 所谓数据类型是按被定义变量的性质&#xff0c;表示形式&#xff0c;占据存储空间的多少&#xff0c;构造特点来划分的。在C语言中&…

2461. 长度为 K 子数组中的最大和(c++)

给你一个整数数组 nums 和一个整数 k 。请你从 nums 中满足下述条件的全部子数组中找出最大子数组和&#xff1a; 子数组的长度是 k&#xff0c;且子数组中的所有元素 各不相同 。 返回满足题面要求的最大子数组和。如果不存在子数组满足这些条件&#xff0c;返回 0 。 子数…

设计模式6——单例模式

写文章的初心主要是用来帮助自己快速的回忆这个模式该怎么用&#xff0c;主要是下面的UML图可以起到大作用&#xff0c;在你学习过一遍以后可能会遗忘&#xff0c;忘记了不要紧&#xff0c;只要看一眼UML图就能想起来了。同时也请大家多多指教。 单例模式&#xff08;Singleto…

完成商品属性分组和商品属性关联维护

文章目录 1.前端页面搭建1.复制attrgroup-attr-relation.vue到src/views/modules/commodity下2.加入超链接和引入组件 src/views/modules/commodity/attrgroup.vue1.加入超链接2.引入组件 3.数据池加入变量4.使用组件1.引用组件2.添加方法3.测试&#xff0c;点击关联&#xff0…

建站平台布局结构

建站平台布局结构对于网站的成功至关重要。一个良好的布局结构能够有效地吸引用户&#xff0c;提升用户体验&#xff0c;并且有助于网站的搜索引擎优化&#xff08;SEO&#xff09;。在设计网站布局结构时&#xff0c;需要考虑到用户导航、信息层次结构、页面加载速度等方面&am…

Android JetPack快速上手

学习地址 【Android Jetpack组件从入门到入坟&#xff0c;全家桶全面学习教程精讲&#xff0c;通俗易懂】 review 研究生期间接触过一部分android开发&#xff0c;近期有个小项目需要进行开发&#xff0c;临时恶补了一下Android相关知识点&#xff0c;突然发现Android新增了…

VBA语言専攻每周通知20240524

通知20240524 各位学员∶本周MF系列VBA技术资料增加611-615讲&#xff0c;T3学员看到通知后请免费领取,领取时间5月24日晚上18:00-5月26日晚上18:00。本次增加内容&#xff1a; MF611:用InputBox录入日期 MF612:信息提示10秒后关自动关闭 MF613:只是信息提示10秒 MF614:显…

如何解决Nginx反向代理不生效?

目录 背景 过程 日志 检查配置文件 重启服务 检查容器内的配置文件 容器和宿主机 其他 背景 用了两年的nginx新加的反向代理不生效 Docker挂载的配置文件启动的Nginx&#xff0c;配置一切正常&#xff0c;但是反向代理不生效&#xff0c;???先自查一波 过程 日志 …

RDDM论文阅读笔记

CVPR2024的残差去噪模型。把diffusion 模型的加噪过程分解为残差diffusion和noise diffusion&#xff0c;其中残差diffusion模拟从target image到degraded image的过程&#xff0c;而noise diffusion则是原来的diffusion过程&#xff0c;即从图片到高斯噪声的加噪过程。前者可以…

如何让社区版IDEA变得好用

如何让社区版IDEA变得好用 背景 收费版的idea功能非常强大&#xff0c;但是费用高。社区版的免费&#xff0c;但是功能被阉割了。如何才能让社区版Idea变得好用&#xff0c;就需要各种插件支持了。经过全局配置编码&#xff0c;maven&#xff0c;jdk版本&#xff0c;在加上各…

架构二。。

1、CAP 只能3选2 1&#xff09;一致性&#xff08;Consistency&#xff09; 客户每次读都是返回最新的写操作结果 2&#xff09;可用性&#xff08;Availability&#xff09; 非故障节点在合理的时间内返回合理的响应 3&#xff09;分区容忍性&#xff08;Partition Tolerance…

Ribbon负载均衡(自己总结的)

文章目录 Ribbon负载均衡负载均衡解决的问题不要把Ribbon负载均衡和Eureka-Server服务器集群搞混了Ribbon负载均衡代码怎么写ribbon负载均衡依赖是怎么引入的&#xff1f; Ribbon负载均衡 负载均衡解决的问题 首先Ribbon负载均衡配合Eureka注册中心一块使用。 在SpringCloud…

计算机网络之应用层知识点总结

6.1 网络应用模型 &#xff08;1&#xff09;应用层概述 &#xff08;2&#xff09;网络应用模型的介绍 客户/服务器&#xff08;C/S&#xff09;模型 P2P模型 6.2 域名解析系统DNS &#xff08;1&#xff09;DNS系统介绍 &#xff08;2&#xff09;域名 &#xff08;3&#…

学习平台|基于Springboot+vue的学习平台系统的设计与实现(源码+数据库+文档)

学习平台系统 目录 基于Springboot&#xff0b;vue的学习平台系统的设计与实现 一、前言 二、系统设计 三、系统功能设计 1系统功能模块 2管理员功能模块 3学生功能模块 4教师功能模块 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八…

不小心丢失mfc140u.dll文件怎么办?mfc140u.dll丢失的解决办法

当您发现mfc140u.dll文件不见了或者受损&#xff0c;别担心&#xff0c;我们可以一起解决这个问题&#xff01;首先&#xff0c;您可能会注意到一个小提示&#xff0c;当您尝试打开某些程序时&#xff0c;屏幕上会跳出一个消息说“找不到mfc140u.dll”或者“mfc140u.dll文件缺失…

解决docker中container运行闪退终止的问题

在运行bindmount-test时&#xff0c;点击完运行按钮后闪退结束运行。 第一步查看log日志&#xff1a; 2024-05-18 23:46:18 Error: Cannot find module /app/nodemon 2024-05-18 23:46:18 at Function.Module._resolveFilename (internal/modules/cjs/loader.js:668:15) …