从串行线程封闭到对象池、线程池


今天讲一个牛逼而实用的概念,串行线程封闭。对象池是串行线程封闭的典型应用场景;线程池糅合了对象池技术,但核心实现不依赖于对象池,很容易产生误会。


本文从串行线程封闭和对象池入手,最后通过源码分析线程池的核心原理,厘清对象池与线程池之间的误会。


线程封闭与串行线程封闭


线程封闭


线程封闭是一种常见的线程安全设计策略:仅在固定的一个线程内访问对象,不对其他线程共享


使用线程封闭技术,对象O始终只对一个线程T1可见,“单线程”中自然不存在线程安全的问题。


ThreadLocal是常用的线程安全工具。线程封闭在Servlet及高层的web框架Spring等中应用不少。

https://monkeysayhi.github.io/2016/11/27/源码%7CThreadLocal的实现原理/


串行线程封闭


线程封闭虽然好用,却限制了对象的共享。串行线程封闭改进了这一点:对象O只能由单个线程T1拥有,但可以通过安全的发布对象O来转移O的所有权;在转移所有权后,也只有另一个线程T2能获得这个O的所有权,并且发布O的T1不会再访问O。


所谓“所有权”,指修改对象的权利。


相对于线程封闭,串行线程封闭使得任意时刻,最多仅有一个线程拥有对象的所有权。当然,这不是绝对的,只要线程T1事实不会再修改对象O,那么就相当于仅有T2拥有对象的所有权。串行线层封闭让对象变得可以共享(虽然只能串行的拥有所有权),灵活性得到大大提高;相对的,要共享对象就涉及安全发布的问题,依靠BlockingQueue等同步工具很容易实现这一点。


对象池是串行线程封闭的经典应用场景,如数据库连接池等。


对象池


对象池利用了串行封闭:将对象O“借给”一个请求线程T1,T1使用完再交还给对象池,并保证“未擅自发布该对象”且“以后不再使用”;对象池收回O后,等T2来借的时候再把它借给T2,完成对象所有权的传递。


猴子撸了一个简化版的线程池,用户只需要覆写newObject()方法:


public abstract class AbstractObjectPool<T> {

  protected final int min;

  protected final int max;

  protected final List<T> usings = new LinkedList<>();

  protected final List<T> buffer = new LinkedList<>();

  private volatile boolean inited = false;

  public AbstractObjectPool(int min, int max) {

    this.min = min;

    this.max = max;

    if (this.min < 0 || this.min > this.max) {

      throw new IllegalArgumentException(String.format(

          "need 0 <= min <= max <= Integer.MAX_VALUE, given min: %s, max: %s", this.min, this.max));

    }

  }

  public void init() {

    for (int i = 0; i < min; i++) {

      buffer.add(newObject());

    }

    inited = true;

  }

  protected void checkInited() {

    if (!inited) {

      throw new IllegalStateException("not inited");

    }

  }

  abstract protected T newObject();

  public synchronized T getObject() {

    checkInited();

    if (usings.size() == max) {

      return null;

    }

    if (buffer.size() == 0) {

      T newObj = newObject();

      usings.add(newObj);

      return newObj;

    }

    T oldObj = buffer.remove(0);

    usings.add(oldObj);

    return oldObj;

  }

  public synchronized void freeObject(T obj) {

    checkInited();

    if (!usings.contains(obj)) {

      throw new IllegalArgumentException(String.format("obj not in using queue: %s", obj));

    }

    usings.remove(usings.indexOf(obj));

    buffer.add(obj);

  }

}


AbstractObjectPool具有以下特性

支持设置最小、最大容量

对象一旦申请就不再释放,避免了GC


虽然很简单,但大可以用于一些时间敏感、资源充裕的场景。如果时间进一步敏感,可将getObject()、freeObject()改写为并发程度更高的版本,但记得保证安全发布安全回收;如果资源不那么充裕,可以适当增加对象回收策略。


可以看到,一个对象池的基本行为包括:

创建对象newObject()

借取对象getObject()

归还对象freeObject()


典型的对象池有各种连接池、常量池等,应用非常多,模型也大同小异,不做解析。令人迷惑的是线程池,很容易让人误以为线程池的核心原理也是对象池,下面来追一遍源码。


线程池


首先摆出结论:线程池糅合了对象池模型,但核心原理是生产者-消费者模型


继承结构如下:


用户可以将Runnable(或Callables)实例提交给线程池,线程池会异步执行该任务,返回响应的结果(完成/返回值)。


猴子最喜欢的是submit(Callable<T> task)方法。我们从该方法入手,逐步深入函数栈,探究线程池的实现原理。


submit()


submit()方法在ExecutorService接口中定义,AbstractExecutorService实现,ThreadPoolExecutor直接继承。


public abstract class AbstractExecutorService implements ExecutorService {

...

    public <T> Future<T> submit(Callable<T> task) {

        if (task == null) throw new NullPointerException();

        RunnableFuture<T> ftask = newTaskFor(task);

        execute(ftask);

        return ftask;

    }

...



AbstractExecutorService#newTaskFor()创建一个RunnableFuture类型的FutureTask。


核心是execute()方法。


execute()


execute()方法在Executor接口中定义,ThreadPoolExecutor实现。


public class ThreadPoolExecutor extends AbstractExecutorService {

...

    public void execute(Runnable command) {

        if (command == null)

            throw new NullPointerException();

        int c = ctl.get();

        if (workerCountOf(c) < corePoolSize) {

            if (addWorker(command, true))

                return;

            c = ctl.get();

        }

        if (isRunning(c) && workQueue.offer(command)) {

            int recheck = ctl.get();

            if (! isRunning(recheck) && remove(command))

                reject(command);

            else if (workerCountOf(recheck) == 0)

                addWorker(null, false);

        }

        else if (!addWorker(command, false))

            reject(command);

    }

...

}


我们暂且忽略线程池的池化策略。关注一个最简单的场景,看能不能先回答一个问题:线程池中的任务如何执行?


核心是addWorker()方法。以8行的参数为例,此时,线程池中的线程数未达到最小线程池大小corePoolSize,通常可以直接在9行返回。


addWorker()


简化如下:


public class ThreadPoolExecutor extends AbstractExecutorService {

...

    private boolean addWorker(Runnable firstTask, boolean core) {

        boolean workerStarted = false;

        boolean workerAdded = false;

        Worker w = null;

        try {

            w = new Worker(firstTask);

            final Thread t = w.thread;

            if (t != null) {

                final ReentrantLock mainLock = this.mainLock;

                mainLock.lock();

                try {

                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN) {

                        workers.add(w);

                        workerAdded = true;

                    }

                } finally {

                    mainLock.unlock();

                }

                if (workerAdded) {

                    t.start();

                    workerStarted = true;

                }

            }

        } finally {

            if (! workerStarted)

                addWorkerFailed(w);

        }

        return workerStarted;

    }

...

}


我去掉了很多用于管理线程池、维护线程安全的代码。假设线程池未关闭,worker(即w,下同)添加成功,则必然能够将worker添加至workers中。workers是一个HashSet:


private final HashSet<Worker> workers = new HashSet<Worker>();


哪里是对象池?


如果说与对象池有关,那么workers即相当于示例代码中的using,应用了对象池模型;只不过这里的using是一直增长的,直到达到最大线程池大小maximumPoolSize。


但是很明显,线程池并没有将线程发布出去,workers也仅仅完成using“保存线程”的功能。那么,线程池中的任务如何执行呢?跟线程池有没有关系?


哪里又不是?


注意9、17、24行:


9行将我们提交到线程池的firstTask封装入一个worker。

17行将worker加入workers,维护起来

24行则启动了worker中的线程t


核心在与这三行,但线程池并没有直接在addWorker()中启动任务firstTask,代之以启动一个worker。最终任务必然被启动,那么我们继续看Worker如何启动这个任务。


Worker


Worker实现了Runnable接口:


private final class Worker

    extends AbstractQueuedSynchronizer

    implements Runnable

{

...

    Worker(Runnable firstTask) {

        setState(-1); // inhibit interrupts until runWorker

        this.firstTask = firstTask;

        this.thread = getThreadFactory().newThread(this);

    }

    /** Delegates main run loop to outer runWorker  */

    public void run() {

        runWorker(this);

    }

...

}


为什么要将构造Worker时的参数命名为firstTask?因为当且仅当需要建立新的Worker以执行任务task时,才会调用构造函数。因此,任务task对于新Worker而言,是第一个任务firstTask。


Worker的实现非常简单:将自己作为Runable实例,构造时在内部创建并持有一个线程thread。Thread和Runable的使用大家很熟悉了,核心是Worker的run方法,它直接调用了runWorker()方法。


runWorker()


敲黑板!!!


重头戏来了。简化如下:


public class ThreadPoolExecutor extends AbstractExecutorService {

...

    final void runWorker(Worker w) {

        Thread wt = Thread.currentThread();

        Runnable task = w.firstTask;

        w.firstTask = null;

        w.unlock(); // allow interrupts

        boolean completedAbruptly = true;

        try {

            while (task != null || (task = getTask()) != null) {

                w.lock();

                try {

                    beforeExecute(wt, task);

                    Throwable thrown = null;

                    try {

                        task.run();

                    } catch (RuntimeException x) {

                        thrown = x; throw x;

                    } catch (Error x) {

                        thrown = x; throw x;

                    } catch (Throwable x) {

                        thrown = x; throw new Error(x);

                    } finally {

                        afterExecute(task, thrown);

                    }

                } finally {

                    task = null;

                    w.completedTasks++;

                    w.unlock();

                }

            }

            completedAbruptly = false;

        } finally {

            processWorkerExit(w, completedAbruptly);

        }

    }

...

}


我们在前面将要执行的任务赋值给firstTask,5-6行首先取出任务task,并将firstTask置为null。因为接下来要执行task,firstTask字段就没有用了。


重点是10-31行的while循环。下面分情况讨论。


case1:第一次进入循环,task不为null


case1对应前面作出的诸多假设。


第一次进入循环时,task==firstTask,不为null,使10行布尔短路直接进入循环;从而16行执行的是firstTask的run()方法;异常处理不表;最后,finally代码块中,task会被置为null,导致下一轮循环会进入case2。


case2:非第一次进入循环,task为null


case2是更普遍的情况,也就是线程池的核心


case1中,task被置为了null,使10行布尔表达式执行第二部分(task = getTask()) != null(getTask()稍后再讲,它返回一个用户已提交的任务)。假设task得到了一个已提交的任务,从而16行执行的是新获得的任务task的run()方法。后同case1,最后task仍然会被置为null,以后循环都将进入case2。


GETTASK()


任务从哪来呢?简化如下:


public class ThreadPoolExecutor extends AbstractExecutorService {

...

    private Runnable getTask() {

        boolean timedOut = false;

        for (;;) {

            try {

                Runnable r = timed ?

                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

                    workQueue.take();

                if (r != null)

                    return r;

                timedOut = true;

            } catch (InterruptedException retry) {

                timedOut = false;

            }

        }

    }

...

}


我们先看最简单的,19-28行。


首先,workQueue是一个线程安全的BlockingQueue,大部分时候使用的实现类是LinkedBlockingQueue:


private final BlockingQueue<Runnable> workQueue;


假设timed为false,则调用阻塞的take()方法,返回的r一定不是null,从而12行退出,将任务交给了某个worker线程。


一个小细节有点意思:前面每个worker线程runWorker()方法时,在循环中加锁粒度在worker级别,直接使用的lock同步;但因为每一个woker都会调用getTask(),考虑到性能因素,源码中getTask()中使用乐观的CAS+SPIN实现无锁同步。


关于乐观锁和CAS,可以参考https://monkeysayhi.github.io/2017/10/22/源码%7C并发一枝花之ConcurrentLinkedQueue【伪】/


workQueue中的元素从哪来呢?这就要回顾execute()方法了。


EXECUTE()


public class ThreadPoolExecutor extends AbstractExecutorService {

...

    public void execute(Runnable command) {

        if (command == null)

            throw new NullPointerException();

        int c = ctl.get();

        if (workerCountOf(c) < corePoolSize) {

            if (addWorker(command, true))

                return;

            c = ctl.get();

        }

        if (isRunning(c) && workQueue.offer(command)) {

            int recheck = ctl.get();

            if (! isRunning(recheck) && remove(command))

                reject(command);

            else if (workerCountOf(recheck) == 0)

                addWorker(null, false);

        }

        else if (!addWorker(command, false))

            reject(command);

    }

...

}


前面以8行的参数为例,此时,线程池中的线程数未达到最小线程池大小corePoolSize,通常可以直接在9行返回。进入8行的条件是“当前worker数小于最小线程池大小corePoolSize”。


如果不满足,会继续执行到12行。isRunning(c)判断线程池是否未关闭,我们关注未关闭的情况;则会继续执行布尔表达式的第二部分workQueue.offer(command),尝试将任务command放入队列workQueue。


workQueue.offer()的行为取决于线程池持有的BlockingQueue实例。


Executors.newFixedThreadPool()、Executors.newSingleThreadExecutor()创建的线程池使用LinkedBlockingQueue,而Executors.newCachedThreadPool()创建的线程池则使用SynchronousQueue。


以LinkedBlockingQueue为例,创建时不配置容量,即创建为无界队列,则LinkedBlockingQueue#offer()永远返回true,从而进入12-18行。


更细节的内容不必关心了,当workQueue.offer()返回true时,已经将任务command放入了队列workQueue。当未来的某个时刻,某worker执行完某一个任务之后,会从workQueue中再取出一个任务继续执行,直到线程池关闭,直到海枯石烂。


CachedThreadPool是一种无界线程池,使用SynchronousQueue能进一步提升性能,简化代码结构。留给读者分析。


CASE2小结


可以看到,实际上,线程池的核心原理与对象池模型无关,而是生产者-消费者模型


  • 生产者(调用submit()或execute()方法)将任务task放入队列

  • 消费者(worker线程)循环从队列中取出任务处理任务(执行task.run())


钩子方法


回到runWorker()方法,在执行任务的过程中,线程池保留了一些钩子方法,如beforeExecute()、afterExecute()。用户可以在实现自己的线程池时,可以通过覆写钩子方法为线程池添加功能。


但猴子不认为钩子方法是一种好的设计。因为钩子方法大多依赖于源码实现,那么除非了解源码或API声明绝对的严谨正确,否则很难正确使用钩子方法。等发生错误时再去了解实现,可能就太晚了。说到底,还是不要使用类似extends这种表达“扩展”语义的语法来实现继承,详见Java中如何恰当的表达“继承”与“扩展”的语义?。


当然,钩子方法也是极其方便的。权衡看待。


总结


相对于线程封闭,串行线程封闭离用户的距离更近一些,简单灵活,实用性强,很容易掌握。而线程封闭更多沦为单纯的设计策略,单纯使用线程封闭的场景不多。


线程池与串行线程封闭、对象池的关系不大,但经常被混为一谈;没看过源码的很难想到其实现方案,面试时也能立分高下。


线程池的实现很有意思。在追源码之前,猴子一直以为线程池就是把线程存起来,用的时候取出来执行任务;看了源码才知道实现如此之妙,简洁优雅效率高。


源码才是最好的老师。


来源:ImportNew



本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/304908.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

netty springmvc_springmvc源码架构解析之HandlerMapping

说在前面前期回顾sharding-jdbc源码解析 更新完毕spring源码解析 更新完毕spring-mvc源码解析 更新完毕spring-tx源码解析 更新完毕spring-boot源码解析 更新完毕rocketmq源码解析 更新完毕dubbbo源码解析 更新完毕netty源码解析 更新完毕spring源码架构更新完毕springmvc源码架…

腾讯牛逼,我酸了!!

阅读本文大概需要8分钟。腾讯这两天搞了个业内爆炸沸腾的事情&#xff1a;全员阳光普照发放100股&#xff0c;解禁期一年。腾讯股价近年来一直在疯狂上涨&#xff0c;100股折合人民币6万多&#xff1a;关键是员工什么都没做&#xff0c;直接拿到价值6万的股票。作用可以说是相当…

这本造价500万的“黑科技”日历,用377张爆美插画给你365天理想生活

以前&#xff0c;每个人家里&#xff0c; 都挂着一本日历。 爷爷戴着老花镜&#xff0c; 盘看着黄道吉日&#xff1b; 奶奶一字一句&#xff0c; 念叨着每日禁忌&#xff1b; 我们跟着日历过日子&#xff0c; 时光缓慢&#xff0c;记忆清晰。 那时候&#xff0c;日历本上的日子…

eladmin代码自动生成_如何让Mybatis自动生成代码

点击上方“Java知音”&#xff0c;选择“置顶公众号”技术文章第一时间送达&#xff01;作者&#xff1a;阿进的写字台cnblogs.com/homejim/p/9782403.html在使用 mybatis 过程中&#xff0c; 当手写 JavaBean 和 XML 写的越来越多的时候&#xff0c; 就越来越容易出错。这种重…

.NET微服务

前文传送门&#xff1a;什么是云原生&#xff1f;现代云原生设计理念Microservices微服务是一种构建现代应用程序的流行的体系结构&#xff0c;云原生系统拥抱微服务。微服务是由一组(使用共享结构交互的、独立的小块服务)搭建的分布式集&#xff0c;具有以下特征&#xff1a;在…

还是找程序员做老公,最靠谱!

很多MM在选老公的时候&#xff0c;都会选择帅气多金&#xff0c;职业又稳定的男生做老公&#xff0c;像医生啊、律师啊、老师啊这类职业。 但是&#xff0c;你有没有想过&#xff1f; 医生的身边都围着好多护士小姐&#xff0c;而且天天加班是常态&#xff0c;相处的时间太少&a…

14个超级牛X的免费开源小工具!

最近整理了一些在用的&#xff0c;感觉还不错的开源小工具&#xff0c;有的仅适用MacOS&#xff0c;但多数跨平台。 Homebrew Homebrew — The missing package manager for macOS&#xff1a;https://brew.sh&#xff0c;Mac上非常好用的包管理工具&#xff0c;很多常见的安装…

硬件加速下webview切换闪屏_网页渲染性能优化 —— 性能优化下

博客 有更多精品文章哟。Composite 的优化 终于&#xff0c;我们到了像素管道的末尾。对于这一部分的优化策略&#xff0c;我们可以从为什么需要 Composited Layer&#xff08;Graphics Layer&#xff09;来入手。这个问题我们在构建 Graphics Layer Tree 的时候&#xff0c;已…

计算机和网络邻居都不见 了,网上邻居看不到自己和别人电脑怎么办

不少网友都试过&#xff0c;在局域网里面&#xff0c;打开网上邻居&#xff0c;结果里面一台电脑都没有&#xff0c;或者只能看到自己的&#xff0c;看不到其他人的&#xff0c;这是怎么回事呢&#xff1f;学习啦小编在这里教大家如何解决这个问题&#xff0c;希望能帮到大家。…

Python 开发者的 6 个必备库

无论你是正在使用 Python 进行快速开发&#xff0c;还是在为 Python 桌面应用制作原生 UI &#xff0c;或者是在优化现有的 Python 代码&#xff0c;以下这些 Python 项目都是应该使用的。 Python 凭借其易用的特点&#xff0c;已经被工业界和学术界广泛采用。另一方面&#x…

如何在注册表里计算机用户名,可以通过注册表修改电脑的密码 ?怎么做的?

可以通过注册表修改电脑的密码 ?怎么做的?以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容&#xff0c;让我们赶快一起来看一下吧&#xff01;可以通过注册表修改电脑的密码 ?怎么做的?不用怎么麻烦的运行输入cmd在这里输入 user 用户名 密…

c++游戏代码大全_还在学少儿编程?不如来玩工厂编程师,免费学编程逻辑的小游戏...

40000游戏爱好者已加入我们&#xff01;每天推荐好玩游戏&#xff01;加入我们&#xff0c;沐沐带你发现好游戏&#xff01;《工厂编程师》游戏小程序好玩吗&#xff1f;《工厂编程师》小游戏怎么玩&#xff1f;只有你想不到&#xff0c;没有我找不到的好游戏&#xff01;「良心…

Istio 1.9 发布——重点改善 Istio 的 Day2 操作

本文译自 Istio 官方博客&#xff0c;原文地址&#xff1a;https://istio.io/latest/news/releases/1.9.x/announcing-1.9/Istio 1.9 版本的重点是改善用户在生产中运行 Istio 的 Day2 操作。在用户体验工作组收集到的反馈意见的基础上&#xff0c;我们希望改善用户的稳定性和整…

Windows漏洞利用开发——利用ROP绕过DEP保护

实验6 Windows漏洞利用开发 6.1实验名称 Windows漏洞利用开发 6.2实验目的 学习windows漏洞利用开发&#xff0c;使用kali linux相关工具对windows内目标程序进行漏洞利用 6.3实验步骤及内容 第三阶段&#xff1a;利用ROP绕过DEP保护 了解DEP保护理解构造ROP链从而绕过DEP…

技术人必备的碎片化时间学习工具

工作、生活节奏超快的今天&#xff0c;想要不断提升自我&#xff0c;碎片化阅读学习是你最佳的选择&#xff0c;如果你有一颗学习的心&#xff0c;那这些学习型的公众号&#xff0c;绝对会让你受益匪浅。 小编为你精选了技术领域几个精品微信订阅号&#xff0c;涵盖了时下最热门…

微型计算机键盘上的shift键 汉语译为,PC计算机键盘上的Shift键称为什么键

满意答案shuzhongle2014.02.25采纳率&#xff1a;56% 等级&#xff1a;12已帮助&#xff1a;7306人朋友&#xff0c;下面是我的答案&#xff0c;希望可以帮得到你!Shift键:上档转换键&#xff0c;也可用于中英文转换。当然Shift键并不仅仅只是这些作用&#xff0c;下面介绍下…

华为的型号命名规则_华为最实惠5G手机来了!畅享Z 5G宣布:5月24日发

5月18日消息&#xff0c;华为宣布畅享系列首款5G手机畅享Z 5G将于5月24日发布。畅享Z系列的到来将使华为在千元市场实现5G覆盖&#xff0c;这也将成为华为价格最低的5G手机系列&#xff0c;值得期待。目前关于畅享Z的细节还很少&#xff0c;有消息称代号为Teller的5G新机隶属于…

程序员又背锅了!虾米音乐代码注释惊现“穷逼vip”

11 月 19 日&#xff0c;在 V2EX 社区的技术版一名网友发布了一篇名为《虾米 mac 客户端发现个好玩的注释》的帖子&#xff0c;文中贴出了阿里巴巴旗下虾米音乐 Mac 版客户端的代码&#xff0c;引发了网友争议。发帖人称&#xff0c;虾米音乐客户端的程序员竟然称一些短期VIP客…

计算机无法安装小丸工具箱,小丸工具箱电脑版

小丸工具箱电脑版是一款可以压制H264AAC视频的图形界面工具&#xff0c;内核是x264、neroaac、mp4box等开源软件。小丸工具箱电脑版能够封装mp4或抽取mp4的音频或视频&#xff0c;压制视频中的音频。它的功能非常丰富&#xff0c;是属于实用的视频压制器!小丸工具箱&#xff0c…

一个简单的dotnet tool

dotnet tool对应的工具&#xff0c;本质上是一个控制台应用&#xff0c;在调用这个应用时&#xff0c;会根据传入的参数&#xff0c;执行应用内部的逻辑。关于dotnet tool命令使用&#xff0c;参照https://docs.microsoft.com/zh-cn/dotnet/core/tools/dotnet-tool-install下面…