28 - 生产者消费者模式:电商库存设计优化

生产者消费者模式,在之前的一些案例中,我们是有使用过的,相信你有一定的了解。这个模式是一个十分经典的多线程并发协作模式,生产者与消费者是通过一个中间容器来解决强耦合关系,并以此来实现不同的生产与消费速度,从而达到缓冲的效果。

使用生产者消费者模式,可以提高系统的性能和吞吐量,今天我们就来看看该模式的几种实现方式,还有其在电商库存中的应用。

1、Object 的 wait/notify/notifyAll 实现生产者消费者

在[第 16 讲]中,我就曾介绍过使用 Object 的 wait/notify/notifyAll 实现生产者消费者模式,这种方式是基于 Object 的 wait/notify/notifyAll 与对象监视器(Monitor)实现线程间的等待和通知。

还有,在[第 12 讲]中我也详细讲解过 Monitor 的工作原理,借此我们可以得知,这种方式实现的生产者消费者模式是基于内核来实现的,有可能会导致大量的上下文切换,所以性能并不是最理想的。

2、Lock 中 Condition 的 await/signal/signalAll 实现生产者消费者

相对 Object 类提供的 wait/notify/notifyAll 方法实现的生产者消费者模式,我更推荐使用 java.util.concurrent 包提供的 Lock && Condition 实现的生产者消费者模式。

在接口 Condition 类中定义了 await/signal/signalAll 方法,其作用与 Object 的 wait/notify/notifyAll 方法类似,该接口类与显示锁 Lock 配合,实现对线程的阻塞和唤醒操作。

我在[第 13 讲]中详细讲到了显示锁,显示锁 ReentrantLock 或 ReentrantReadWriteLock 都是基于 AQS 实现的,而在 AQS 中有一个内部类 ConditionObject 实现了 Condition 接口。

我们知道 AQS 中存在一个同步队列(CLH 队列),当一个线程没有获取到锁时就会进入到同步队列中进行阻塞,如果被唤醒后获取到锁,则移除同步队列。

除此之外,AQS 中还存在一个条件队列,通过 addWaiter 方法,可以将 await() 方法调用的线程放入到条件队列中,线程进入等待状态。当调用 signal 以及 signalAll 方法后,线程将会被唤醒,并从条件队列中删除,之后进入到同步队列中。条件队列是通过一个单向链表实现的,所以 Condition 支持多个等待队列。

由上可知,Lock 中 Condition 的 await/signal/signalAll 实现的生产者消费者模式,是基于 Java 代码层实现的,所以在性能和扩展性方面都更有优势。

下面来看一个案例,我们通过一段代码来实现一个商品库存的生产和消费。

public class LockConditionTest {private LinkedList<String> product = new LinkedList<String>();private int maxInventory = 10; // 最大库存private Lock lock = new ReentrantLock();// 资源锁private Condition condition = lock.newCondition();// 库存非满和非空条件/*** 新增商品库存* @param e*/public void produce(String e) {lock.lock();try {while (product.size() == maxInventory) {condition.await();}product.add(e);System.out.println(" 放入一个商品库存,总库存为:" + product.size());condition.signalAll();} catch (Exception ex) {ex.printStackTrace();} finally {lock.unlock();}}/*** 消费商品* @return*/public String consume() {String result = null;lock.lock();try {while (product.size() == 0) {condition.await();}result = product.removeLast();System.out.println(" 消费一个商品,总库存为:" + product.size());condition.signalAll();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}return result;}/*** 生产者* @author admin**/private class Producer implements Runnable {public void run() {for (int i = 0; i < 20; i++) {produce(" 商品 " + i);}}}/*** 消费者* @author admin**/private class Customer implements Runnable {public void run() {for (int i = 0; i < 20; i++) {consume();}}}public static void main(String[] args) {LockConditionTest lc = new LockConditionTest();new Thread(lc.new Producer()).start();new Thread(lc.new Customer()).start();new Thread(lc.new Producer()).start();new Thread(lc.new Customer()).start();}
}

看完案例,请你思考下,我们对此还有优化的空间吗?

从代码中应该不难发现,生产者和消费者都在竞争同一把锁,而实际上两者没有同步关系,由于 Condition 能够支持多个等待队列以及不响应中断, 所以我们可以将生产者和消费者的等待条件和锁资源分离,从而进一步优化系统并发性能,代码如下:

	private LinkedList<String> product = new LinkedList<String>();private AtomicInteger inventory = new AtomicInteger(0);// 实时库存private int maxInventory = 10; // 最大库存private Lock consumerLock = new ReentrantLock();// 资源锁private Lock productLock = new ReentrantLock();// 资源锁private Condition notEmptyCondition = consumerLock.newCondition();// 库存满和空条件private Condition notFullCondition = productLock.newCondition();// 库存满和空条件/*** 新增商品库存* @param e*/public void produce(String e) {productLock.lock();try {while (inventory.get() == maxInventory) {notFullCondition.await();}product.add(e);System.out.println(" 放入一个商品库存,总库存为:" + inventory.incrementAndGet());if(inventory.get()<maxInventory) {notFullCondition.signalAll();}} catch (Exception ex) {ex.printStackTrace();} finally {productLock.unlock();}if(inventory.get()>0) {try {consumerLock.lockInterruptibly();notEmptyCondition.signalAll();} catch (InterruptedException e1) {// TODO Auto-generated catch blocke1.printStackTrace();}finally {consumerLock.unlock();}}}/*** 消费商品* @return*/public String consume() {String result = null;consumerLock.lock();try {while (inventory.get() == 0) {notEmptyCondition.await();}result = product.removeLast();System.out.println(" 消费一个商品,总库存为:" + inventory.decrementAndGet());if(inventory.get()>0) {notEmptyCondition.signalAll();}} catch (Exception e) {e.printStackTrace();} finally {consumerLock.unlock();}if(inventory.get()<maxInventory) {try {productLock.lockInterruptibly();notFullCondition.signalAll();} catch (InterruptedException e1) {// TODO Auto-generated catch blocke1.printStackTrace();}finally {productLock.unlock();}}return result;}/*** 生产者* @author admin**/private class Producer implements Runnable {public void run() {for (int i = 0; i < 20; i++) {produce(" 商品 " + i);}}}/*** 消费者* @author admin**/private class Customer implements Runnable {public void run() {for (int i = 0; i < 20; i++) {consume();}}}public static void main(String[] args) {LockConditionTest2 lc = new LockConditionTest2();new Thread(lc.new Producer()).start();new Thread(lc.new Customer()).start();}
}

我们分别创建 productLock 以及 consumerLock 两个锁资源,前者控制生产者线程并行操作,后者控制消费者线程并发运行;同时也设置两个条件变量,一个是 notEmptyCondition,负责控制消费者线程状态,一个是 notFullCondition,负责控制生产者线程状态。这样优化后,可以减少消费者与生产者的竞争,实现两者并发执行。

我们这里是基于 LinkedList 来存取库存的,虽然 LinkedList 是非线程安全,但我们新增是操作头部,而消费是操作队列的尾部,理论上来说没有线程安全问题。而库存的实际数量 inventory 是基于 AtomicInteger(CAS 锁)线程安全类实现的,既可以保证原子性,也可以保证消费者和生产者之间是可见的。

3、BlockingQueue 实现生产者消费者

相对前两种实现方式,BlockingQueue 实现是最简单明了的,也是最容易理解的。

因为 BlockingQueue 是线程安全的,且从队列中获取或者移除元素时,如果队列为空,获取或移除操作则需要等待,直到队列不为空;同时,如果向队列中添加元素,假设此时队列无可用空间,添加操作也需要等待。所以 BlockingQueue 非常适合用来实现生产者消费者模式。还是以一个案例来看下它的优化,代码如下:

public class BlockingQueueTest {private int maxInventory = 10; // 最大库存private BlockingQueue<String> product = new LinkedBlockingQueue<>(maxInventory);// 缓存队列/*** 新增商品库存* @param e*/public void produce(String e) {try {product.put(e);System.out.println(" 放入一个商品库存,总库存为:" + product.size());} catch (InterruptedException e1) {// TODO Auto-generated catch blocke1.printStackTrace();}}/*** 消费商品* @return*/public String consume() {String result = null;try {result = product.take();System.out.println(" 消费一个商品,总库存为:" + product.size());} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}return result;}/*** 生产者* @author admin**/private class Producer implements Runnable {public void run() {for (int i = 0; i < 20; i++) {produce(" 商品 " + i);}}}/*** 消费者* @author admin**/private class Customer implements Runnable {public void run() {for (int i = 0; i < 20; i++) {consume();}}}public static void main(String[] args) {BlockingQueueTest lc = new BlockingQueueTest();new Thread(lc.new Producer()).start();new Thread(lc.new Customer()).start();new Thread(lc.new Producer()).start();new Thread(lc.new Customer()).start();}
}

在这个案例中,我们创建了一个 LinkedBlockingQueue,并设置队列大小。之后我们创建一个消费方法 consume(),方法里面调用 LinkedBlockingQueue 中的 take() 方法,消费者通过该方法获取商品,当队列中商品数量为零时,消费者将进入等待状态;我们再创建一个生产方法 produce(),方法里面调用 LinkedBlockingQueue 中的 put() 方法,生产方通过该方法往队列中放商品,如果队列满了,生产者就将进入等待状态。

4、生产者消费者优化电商库存设计

了解完生产者消费者模式的几种常见实现方式,接下来我们就具体看看该模式是如何优化电商库存设计的。

电商系统中经常会有抢购活动,在这类促销活动中,抢购商品的库存实际是存在库存表中的。为了提高抢购性能,我们通常会将库存存放在缓存中,通过缓存中的库存来实现库存的精确扣减。在提交订单并付款之后,我们还需要再去扣除数据库中的库存。如果遇到瞬时高并发,我们还都去操作数据库的话,那么在单表单库的情况下,数据库就很可能会出现性能瓶颈。

而我们库存表如果要实现分库分表,势必会增加业务的复杂度。试想一个商品的库存分别在不同库的表中,我们在扣除库存时,又该如何判断去哪个库中扣除呢?

如果随意扣除表中库存,那么就会出现有些表已经扣完了,有些表中还有库存的情况,这样的操作显然是不合理的,此时就需要额外增加逻辑判断来解决问题。

在不分库分表的情况下,为了提高订单中扣除库存业务的性能以及吞吐量,我们就可以采用生产者消费者模式来实现系统的性能优化。

创建订单等于生产者,存放订单的队列则是缓冲容器,而从队列中消费订单则是数据库扣除库存操作。其中存放订单的队列可以极大限度地缓冲高并发给数据库带来的压力。

我们还可以基于消息队列来实现生产者消费者模式,如今 RabbitMQ、RocketMQ 都实现了事务,我们只需要将订单通过事务提交到 MQ 中,扣除库存的消费方只需要通过消费 MQ 来逐步操作数据库即可。

5、总结

使用生产者消费者模式来缓冲高并发数据库扣除库存压力,类似这样的例子其实还有很多。

例如,我们平时使用消息队列来做高并发流量削峰,也是基于这个原理。抢购商品时,如果所有的抢购请求都直接进入判断是否有库存和冻结缓存库存等逻辑业务中,由于这些逻辑业务操作会增加资源消耗,就可能会压垮应用服务。此时,为了保证系统资源使用的合理性,我们可以通过一个消息队列来缓冲瞬时的高并发请求。

生产者消费者模式除了可以做缓冲优化系统性能之外,它还可以应用在处理一些执行任务时间比较长的场景中。

例如导出报表业务,用户在导出一种比较大的报表时,通常需要等待很长时间,这样的用户体验是非常差的。通常我们可以固定一些报表内容,比如用户经常需要在今天导出昨天的销量报表,或者在月初导出上个月的报表,我们就可以提前将报表导出到本地或内存中,这样用户就可以在很短的时间内直接下载报表了。

6、思考题

我们可以用生产者消费者模式来实现瞬时高并发的流量削峰,然而这样做虽然缓解了消费方的压力,但生产方则会因为瞬时高并发,而发生大量线程阻塞。面对这样的情况,你知道有什么方式可以优化线程阻塞所带来的性能问题吗?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/168384.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

NX二次开发UF_CURVE_ask_curve_struct_data 函数介绍

文章作者&#xff1a;里海 来源网站&#xff1a;https://blog.csdn.net/WangPaiFeiXingYuan UF_CURVE_ask_curve_struct_data Defined in: uf_curve.h int UF_CURVE_ask_curve_struct_data(UF_CURVE_struct_p_t curve_struct, int * type, double * * curve_data ) overview…

情感对话机器人的任务体系

人类在处理对话中的情感时&#xff0c;需要先根据对话场景中的蛛丝马迹判断出对方的情感&#xff0c;继而根据对话的主题等信息思考自身用什么情感进行回复&#xff0c;最后结合推理出的情感形成恰当的回复。受人类处理情感对话的启发&#xff0c;情感对话机器人需要完成以下几…

百战python03-分支结构

文章目录 if语句的使用练习1:英制单位英寸与公制单位厘米互换2:百分制成绩转换为等级制成绩3:输入三条边长,如果能构成三角形就计算周长和面积海伦公式4:掷骰子随机做事注:需要对python有基本了解,可查看本作者python基础专栏,有任何问题欢迎私信或评论(本专栏每章内容…

从0开始学习JavaScript--深入了解JavaScript框架

JavaScript框架在现代Web开发中扮演着关键角色&#xff0c;为开发者提供了丰富的工具和抽象层&#xff0c;使得构建复杂的、高性能的Web应用变得更加容易。本文将深入探讨JavaScript框架的核心概念、常见框架的特点以及它们在实际应用中的使用。 JavaScript框架的作用 JavaSc…

STM32 寄存器配置笔记——USART配置中断接收乒乓缓存处理

一、概述 本文主要介绍如何配置USART接收中断&#xff0c;使用乒乓缓存的设计接收数据并将其回显在PC 串口工具上。以stm32f10为例&#xff0c;配置USART1 9600波特率。具体配置参考上一章节STM32 寄存器配置笔记——USART配置 打印。 乒乓缓存的设计应用场景&#xff1a;当后面…

【ceph】如何打印一个osd的op流程,排查osd在干什么

本站以分享各种运维经验和运维所需要的技能为主 《python零基础入门》&#xff1a;python零基础入门学习 《python运维脚本》&#xff1a; python运维脚本实践 《shell》&#xff1a;shell学习 《terraform》持续更新中&#xff1a;terraform_Aws学习零基础入门到最佳实战 《k8…

canvas高级动画001:文字瀑布流

canvas实例应用100 专栏提供canvas的基础知识&#xff0c;高级动画&#xff0c;相关应用扩展等信息。 canvas作为html的一部分&#xff0c;是图像图标地图可视化的一个重要的基础&#xff0c;学好了canvas&#xff0c;在其他的一些应用上将会起到非常重要的帮助。 文章目录 示例…

elk 简单操作手册

1.1. 基础概念 EFK不是一个软件,而是一套解决方案,开源软件之间的互相配合使用,高效的满足了很多场合的应用,是目前主流的一种日志系统。 EFK是三个开源软件的缩写,分别表示:Elasticsearch , Filebeat, Kibana , 其中Elasticsearch负责日志保存和搜索,Filebeat负责收集日志,Ki…

Spring Boot 3.2.0 现已推出

Spring Boot 3.2.0 已经发布&#xff0c;并且可以从 Maven Central 获取。 此版本添加了大量新功能和改进。有关完整的[升级说明](https://github.com/spring-projects/spring-boot/wiki/Spring-Boot-3.2.0-Release-Notesupgrading-from-spring-boot-31)以及[值得注意的新](ht…

EI期刊完整程序:MEA-BP思维进化法优化BP神经网络的回归预测算法,可作为对比预测模型,丰富内容,直接运行,免费

适用平台&#xff1a;Matlab 2020及以上 本程序参考中文EI期刊《基于MEA⁃BP神经网络的建筑能耗预测模型》&#xff0c;程序注释清晰&#xff0c;干货满满&#xff0c;下面对文章和程序做简要介绍。 适用领域&#xff1a;风速预测、光伏功率预测、发电功率预测、碳价预测等多…

eclipse项目移到idea上部署运行

1.配置web模块 另外&#xff0c;模块这里&#xff0c;也要加上Spring 2.配置Artifact &#xff08;用于tomcat&#xff09; 就是从上面配置的web模块&#xff0c;产生的工件 3.添加lib 一般是在web-inf/lib &#xff0c; 遇到的坑&#xff1a; jdk版本问题&#xff0c;这里…

proto语法学习笔记

proto语法学习笔记 Protocol Buffers&#xff08;Proto是由谷歌开发的一种数据序列化格式。 Proto 不是一种编程语言&#xff0c;而是一种接口描述语言&#xff08;IDL&#xff09;&#xff0c;用于定义数据结构和消息格式。 它的设计目标是提供一种简单、高效、可扩展的方法来…

使用STM32+SPI Flash模拟U盘

试验目的&#xff1a;使用STM32F103C8T6 SPI Flash&#xff08;WSQ16&#xff09;实现模拟U盘的功能 SPI Flash读写说明&#xff1a; Step1 设置SPI1 用于读取SPI Flash&#xff1b; Step2&#xff1a;设置SPI Flash 的使能信号 Step3&#xff1a;使能USB通信 Step4&#xf…

人机交互2——任务型多轮对话的控制和生成

1.自然语言理解模块 2.对话管理模块 3.自然语言生成模块

C++模拟如何实现vector的方法

任意位置插入&#xff0c;insert的返回值为新插入的第一个元素位置的迭代器&#xff1b;因为插入可能会进行扩容&#xff0c;导致start的值改变&#xff0c;所以先定义一个变量保存pos与start的相对位置&#xff1b;判断是否需要扩容&#xff1b;从插入位置开始&#xff0c;将所…

WorldWind Android上加载白模数据

这篇文章介绍下如何加载白模数据。这个白模数据的格式是shapefile格式的文件。白模数据拷贝到手机本地&#xff0c;然后读取白模数据&#xff0c;进行加载展示。 worldwind android本身是不支持加载白模数据的&#xff0c;但是可以根据现有提供的加载Polygons的方式&#xff0c…

【JUC】一篇通关JUC并发之共享模型

目录 1. 共享带来的问题1-1. 临界区 Critical Section1-2. 竞态条件 Race Condition1-3. synchronized 解决方案 1. 共享带来的问题 1-1. 临界区 Critical Section 一个程序运行多个线程本身是没有问题的问题出在多个线程访问共享资源 多个线程读共享资源其实也没有问题在多个…

基于单片机的消防巡逻小车设计

智能小车循迹与避障运动控制系统的设计 摘 要:本设计主要由STC89C52单片机来进行控制&#xff0c;通过输入输出两个端口控制驱动模块来调节电机的工作状态。本设计预利用机器视觉&#xff0c;通过识别条带状路标实现自主导航且利用超声波模块实时检测距离以实现避障功能&…

Linux:Ubuntu实现远程登陆

1、查看sshd服务是否存在 Ubuntu默认是没有安装sshd服务的&#xff0c;所以&#xff0c;无法远程登陆。 检查22端口是否存在 netstat -anp 该命令执行后&#xff0c;查看不到22端口的进程。 如果netstat无法使用&#xff0c;我们需要安装一下netstat服务 sudo apt-get install…

模板初阶(1):函数模板,类模板

一、函数模板 1.1 概念 函数模板代表了一个函数家族&#xff0c;该函数模板与类型无关&#xff0c;在使用时被参数化&#xff0c;根据实参类型产生函数的特定类型版本。 格式&#xff1a; template <typename T>或template <class T> template <class T>…