😊引言
🎖️本篇博文约8000字,阅读大约30分钟,亲爱的读者,如果本博文对您有帮助,欢迎点赞关注!😊😊😊
🖥️什么是分布式系统的一致性 ?
- ✅分布式系统的一致性
- ✅线性一致性 & 顺序一致性
- ✅顺序一致性 & 最终一致性
✅分布式系统的一致性
所谓一致性,是指数据在多个副本之间是否能够保持一致的特性,再聊一致性的时间,其实要搞清楚一致性模型。
分布式系统中的一致性模型是一组管理分布式系统行为的规则。它决定了在分布式系统中如何访问和更新数据,以及如何将这些更新提供给客户端。面对网络延迟和局部故障等分布式计算难题,分布式系统的一致性模型对保证系统的一致性和可靠性起着关键作用。在分布式系统中有多种一致性模型可用,每个模型都有其优点和缺点,选择模型取决于系统的具体要求。
大的分类上面,主要有三种,分别是强一致性、弱一致性和最终一致性。
-
强一致性(Strong Consistency):
在强一致性模型下,系统保证每个读操作都将返回最近的写操作的结果,即任何时间点,客户端都将看到相同的数据视图。这包括线性一致性(Linearizability) 、顺序一致性(Sequential Consistency) 和严格可串行性 (Strict Serializability) 等子模型。强 致性模型通常牺牲了可用性来实现数据一致性。 -
弱一致性(Weak Consistency):
弱一致性模型放宽了一致性保证,它允许在不同节点之间的数据访问之间存在一定程度的不一致性,以换取更高的性能和可用性。这包括因果致性(Causal Consistency)会话一致性(Session Consistency) 和单调一致性(Monotonic Consistency) 等子模型。弱一致性模型常更注重可用性,允许一定程度的数据不一致性。 -
最终一致性(Eventual Consistency) :
最终一致性模型是一种最大程度放宽了一致性要求的模型。它允许在系统发生分区或网络故障后,经过一段时间,系统将最终达到一致状态。这个模型在某些情况下提供了很高的可用性,但在一段时间内可能会出现数据不一致的情况。
我们看下代码巩固一下我们理论:
⛳第一个:先看一个分布式系统一致性的简单Demo
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; /**
* @author 昕宝爸爸爱编程
* 分布式系统一致性的简单Demo
*/
public class DistributedConsistencyExample { private static final int NUM_OF_NODES = 3; // 假设有3个节点 private static final int MAX_TIME = 5; // 最大等待时间为5秒 public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(NUM_OF_NODES); for (int i = 0; i < NUM_OF_NODES; i++) { executorService.execute(new Node(i)); } executorService.shutdown(); executorService.awaitTermination(MAX_TIME, TimeUnit.SECONDS); } static class Node implements Runnable { private int id; public Node(int id) { this.id = id; } @Override public void run() { try { // 执行任务,这里只是简单地打印节点ID System.out.println("节点" + id + "正在执行任务..."); Thread.sleep(1000); // 模拟任务执行时间 } catch (InterruptedException e) { e.printStackTrace(); } } }
}
可以看到,我创建了一个包含3个节点的分布式系统。每个节点都是一个线程,它们在启动后开始执行任务。这里我们只是简单地打印节点ID,并模拟任务执行时间。
⛳第二个:增加分布式锁和事务处理来实现一致性
这一个就相对上面一段代码来讲,增加了些复杂度,使用了分布式锁和事务处理来实现一致性:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author 昕宝爸爸爱编程
* 增加分布式锁和事务处理来实现一致性
*/
public class DistributedConsistencyExample { private static final int NUM_OF_NODES = 3; // 假设有3个节点 private static final int MAX_TIME = 5; // 最大等待时间为5秒 private static final Lock lock = new ReentrantLock(); // 分布式锁 public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(NUM_OF_NODES); for (int i = 0; i < NUM_OF_NODES; i++) { executorService.execute(new Node(i)); } executorService.shutdown(); executorService.awaitTermination(MAX_TIME, TimeUnit.SECONDS); } static class Node implements Runnable { private int id; public Node(int id) { this.id = id; } @Override public void run() { try { // 执行任务,这里模拟一个需要加锁的操作 lock.lock(); // 加锁 try { // 执行任务逻辑,这里只是简单地打印节点ID和加锁情况 System.out.println("节点" + id + "正在执行任务..."); System.out.println("节点" + id + "获取了锁..."); Thread.sleep(1000); // 模拟任务执行时间 } finally { lock.unlock(); // 释放锁 } } catch (InterruptedException e) { e.printStackTrace(); } } }
}
我们使用了ReentrantLock来实现分布式锁。当一个节点获取了锁之后,其他节点必须等待该节点释放锁后才能执行任务。这样可以保证同一时刻只有一个节点可以执行任务,从而实现了分布式系统的一致性。同时,我们还使用了事务处理的方式,确保任务的原子性和一致性。在执行任务时,我们首先获取锁,然后执行任务逻辑,最后释放锁。这样可以保证在任务执行过程中不会被其他节点干扰,从而保证了数据的一致性。
⛳第三个:增加使用了分布式锁、事务处理和消息队列来实现一致性保证
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; /**
* @author 昕宝爸爸爱编程
*/
public class DistributedConsistencyExample { private static final int NUM_OF_NODES = 3; // 假设有3个节点 private static final int MAX_TIME = 5; // 最大等待时间为5秒 private static final Lock lock = new ReentrantLock(); // 分布式锁 private static final String QUEUE_NAME = "distributed-queue"; // 消息队列名称 public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(NUM_OF_NODES); for (int i = 0; i < NUM_OF_NODES; i++) { executorService.execute(new Node(i)); } executorService.shutdown(); executorService.awaitTermination(MAX_TIME, TimeUnit.SECONDS); } static class Node implements Runnable { private int id; public Node(int id) { this.id = id; } @Override public void run() { try { // 执行任务,这里模拟一个需要加锁和消息队列的操作 lock.lock(); // 加锁 try { // 发送消息到消息队列 String message = "节点" + id + "正在执行任务..."; System.out.println(message); // 模拟发送消息到消息队列的时间 Thread.sleep(1000); } finally { lock.unlock(); // 释放锁 } } catch (InterruptedException e) { e.printStackTrace(); } } }
}
可以看到,我使用了ReentrantLock来实现分布式锁。当一个节点获取了锁之后,其他节点必须等待该节点释放锁后才能执行任务。这样可以保证同一时刻只有一个节点可以执行任务,从而实现了分布式系统的一致性。同时,我们还使用了消息队列来协调节点之间的通信。在执行任务时,我们首先获取锁,然后发送消息到消息队列,最后释放锁。这样可以保证在任务执行过程中不会被其他节点干扰,从而保证了数据的一致性。同时,通过消息队列的传递机制,可以确保消息的可靠性和顺序性,进一步保证了分布式系统的一致性。
✅线性一致性 & 顺序一致性
线性一致性(Linearizability) 和顺序 致性(Sequential Consistency) 是两种强 致性模型。
线性一致性是一种最强的一致性模型,它强调在分布式系统中的任何时间点,读操作都应该返回最近的写操作的结果。
顺序一致性也是一种强一致性模型,但相对于线性一致性而言,它放宽了一些限制。在顺序一致性模型中,系统维护一个全局的操作顺序,以确保每个客户端看到的操作顺序都是一致的。
与线性一致性不同,顺序一致性不强调实时性,只要操作的顺序是一致的,就可以接受一些延迟。
他们的主要区别在于强调实时性。线性一致性要求操作在实际时间上的顺序保持一致,而顺序一致性只要求操作的顺序是一致的,但不一定要求操作的实际时间顺序。
⛳同样的我们结合代码来看一看。
1、⛳线性一致性代码展示:
import java.util.concurrent.atomic.AtomicInteger; /**
* @author 昕宝爸爸爱编程
*/
public class LinearConsistencyExample { private static final AtomicInteger counter = new AtomicInteger(0); public static void main(String[] args) throws InterruptedException { Thread thread1 = new Thread(() -> { counter.incrementAndGet(); System.out.println("Thread 1 incremented counter to: " + counter.get()); }); Thread thread2 = new Thread(() -> { counter.incrementAndGet(); System.out.println("Thread 2 incremented counter to: " + counter.get()); }); thread1.start(); thread2.start(); thread1.join(); thread2.join(); }
}
用到了AtomicInteger类,该类提供了原子性的操作来保证线性一致性。两个线程同时递增计数器,并打印计数器的值。由于AtomicInteger的递增操作是原子的,因此可以保证两个线程之间的操作是线性一致的。无论线程的执行顺序如何,最终打印的值都是递增的。
2、⛳顺序一致性代码展示:
import java.util.concurrent.*; /**
* @author 昕宝爸爸爱编程
*/
public class SequentialConsistencyExample { private static final int NUM_OF_THREADS = 3; private static final ExecutorService executorService = Executors.newFixedThreadPool(NUM_OF_THREADS); private static final Semaphore semaphore = new Semaphore(1); // 用于实现互斥的信号量 private static int sharedData = 0; // 共享数据 public static void main(String[] args) throws InterruptedException { for (int i = 0; i < NUM_OF_THREADS; i++) { executorService.execute(() -> { try { semaphore.acquire(); // 获取信号量,实现互斥访问 sharedData++; // 更新共享数据 System.out.println("Thread " + Thread.currentThread().getId() + " updated shared data to: " + sharedData); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); // 释放信号量,允许其他线程访问 } }); } executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.HOURS); // 等待所有任务执行完毕 }
}
使用Semaphore类来实现互斥访问共享数据,以保证顺序一致性。多个线程通过获取信号量来访问共享数据,并在更新共享数据后释放信号量。
由于只有一个线程可以同时访问共享数据,因此可以保证线程之间的操作是按照它们启动的顺序执行的。无论线程的执行顺序如何,最终打印的值都是递增的。
所有线程看到的数据依照他们操作执行的顺序而变化。
✅顺序一致性 & 最终一致性
先看一下最终一致性的代码:
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference; /**
* @author 昕宝爸爸爱编程
*/
public class EventualConsistencyExample { private static final CopyOnWriteArrayList<String> eventLog = new CopyOnWriteArrayList<>(); private static final AtomicReference<String> latestEvent = new AtomicReference<>(); public static void main(String[] args) { Thread writerThread = new Thread(() -> { for (int i = 0; i < 10; i++) { eventLog.add("Event " + i); latestEvent.set("Event " + i); try { Thread.sleep(1000); // 模拟异步操作,延迟1秒 } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread readerThread = new Thread(() -> { while (true) { String latestEvent = latestEvent.get(); if (latestEvent != null) { System.out.println("Reader read event: " + latestEvent); } else { System.out.println("Reader read event: null (no event available)"); } try { Thread.sleep(500); // 模拟异步操作,延迟0.5秒 } catch (InterruptedException e) { e.printStackTrace(); } } }); writerThread.start(); readerThread.start(); }
}
CopyOnWriteArrayList和AtomicReference来模拟最终一致性的场景。eventLog是一个线程安全的列表,用于存储事件日志。latestEvent是一个原子引用,用于存储最新的事件。
在主线程中,我们创建了一个写线程和一个读线程。写线程将模拟写入事件到eventLog中,并将最新的事件存储到latestEvent中。读线程将不断读取latestEvent中的最新事件,并打印出来。由于写线程和读线程是异步执行的,因此它们之间的操作可能会存在一定的延迟。但是,由于使用了线程安全的列表和原子引用,最终一致性得到了保证。无论何时读取latestEvent,都总是能够获得最新的事件,或者在读取时事件还未发生而返回null。这正是最终一致性的特点:在没有更新数据的一段时间里,系统将通过广播保证副本之间的数据一致性。
很多人看完线性一致性和顺序一致性的区别之后,会容易懵,看上去顺序一致性和我们理解的最终一致性有点像?
💡那么他们的区别是啥呢?
在时间上,虽然顺序一致性和最终一致性都不强要求实时性,但是最终一致性的时间放的会更宽。并且最终一致性其实并不强调顺序,他只需要保证最终的结果一致就行了,而顺序一致性要求操作顺序必须一致。
并且,顺序一致性还是一种强一致性,比如在Zookeeper中,其实就是通过ZAB算法来保证的顺序一致性,即各人节点之间的写入顺序要求一致。并且要半数以上的节点写入成功才算成功。所以,顺序一致性的典型应用场景就是数据库管理系统以及分布式系统。
而最终一致性通常适用于互联网三高架构的业务开发,如电商网站,社交媒体网站等.