线程使用Runnable还是Callable

Callable接口

Runnable的缺陷

在创建线程的时候不管是继承Thread类(Thread本身也是实现的Runnable接口)还是实现Runnable接口,所实现的run()方法都没有返回值,使得需要将返回值写到map中,等到线程结束再去从map中取数据,特别的不方便

而且Runnable还有另一个问题是不能抛出任何异常,必须在run()方法中自己处理异常。

由于Runnable存在的两个问题,所以Callable接口和Future接口应运而生,这里来介绍一下Callable接口和Future接口

Callable的改善

Callable与Runnable不同,提供的方法为call()方法,大家看到,该方法是有返回值的,且可以抛出异常

获取Callable的返回值需要FutureTask的支持

public interface Callable<V{
    
    call() throws Exception;
}

public interface Runnable {
    
    public abstract void run();
}

Runnable和Callable接口的区别

  • Runnable中提供的是run()方法,Callable中提供的是call()方法
  • Runnable中的run()方法返回值为void,Callable中的call()方法有返回值
  • Runnable的run()方法不能抛出异常,Callable中的call方法可以抛出异常
  • Callable可以和Future、FutureTask配合获取异步执行的结果

自旋锁执行时间短、线程数少的时候使用(由于占用CPU) -----> Atomic和lock都是使用的自旋锁

FutureTask

Future接口

使用ExecutorService来执行Callable对象

// submit方法既可以传入Callable,也可以传入Runnable,如果传入Callable的话,可以使用get方法获取到值;如果传入Runnable的话,get方法不可以获取到值
<T> Future<T> submit(Callable<T> task);

Future<?> submit(Runnable task);
// result参数来作为执行结果的返回值
<T> Future<T> submit(Runnable task, T result);

看到该方法返回的是一个Future对象,Future对象是什么呢?

public interface Future<V{

    // 在任务正常结束之前可以尝试取消任务,如果任务已完成或已取消,则次尝试失败。如果调用成功,且此任务还没有启动,则该任务将不会执行;如果任务已经启动,则mayInterruptIfRunning参数确定是否应该以试图停止任务的方式来中断此任务的线程
  // 中断需要判断 if(Thread.currentThread().isInterrupted())
    boolean cancel(boolean mayInterruptIfRunning);

    // 如果任务正常结束之前被取消,isCancelled会返回true
    boolean isCancelled();

    // 检测任务是否结束,任务完成返回true,由于正常终止、异常或取消而完成,也会返回true
    boolean isDone();

    // get方法会将结果返回给调用方,会阻塞
    get() throws InterruptedException, ExecutionException;

    // get方法会将结果返回给调用方,可以限制限制超时时间
    get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException
;
}

但是线程中没有可以使用Callable接口和Future接口的地方,Thread唯一可用的参数是Runnable对象,那么如何才能用到这个好用的技术呢?

答案是FutureTask

FutureTask类

FutureTask类同时实现了Runnable接口和Future接口,同时拥有异步获取结果以及取消任务的能力,可以创建出可取消的任务。

通过传入Runnable或者Callable的任务给FutureTask,通过FutureTask的get方法异步获取执行结果


public class FutureTask<Vimplements RunnableFuture<V{
  //NEW -> COMPLETING -> NORMAL  异步任务的正常结束,COMPLETING是在执行set方法为outcome赋值时的一个过渡状态,赋值完成后状态变为NORMAL
  //NEW -> COMPLETING -> EXCEPTIONAL   异步任务执行过程中抛出异常,COMPLETING是在执行setException方法为outcome赋值时的一个过渡状态,赋值完成后状态变为EXCEPTIONAL
  //NEW -> CANCELLED 调用cancel(false)将状态设置为CANCELLED
  //NEW -> INTERRUPTING -> INTERRUPTED 调用了cancel(true)
  // 新建状态
  private static final int NEW          = 0;
  // 任务正在完成状态
    private static final int COMPLETING   = 1;
  // 正常执行结束
    private static final int NORMAL       = 2;
  // 非正常结束
    private static final int EXCEPTIONAL  = 3;
  // 任务被取消,对应cancel(false)
    private static final int CANCELLED    = 4;
  // 任务中断
    private static final int INTERRUPTING = 5;
  // 任务被中断,对应cancel(true)
    private static final int INTERRUPTED  = 6;
  
  /** The underlying callable; nulled out after running */
    private Callable<V> callable;
  // 异步任务的结果
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
  // 执行Callable任务的线程
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
  // 线程等待节点
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;
  
  // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long stateOffset;
    private static final long runnerOffset;
    private static final long waitersOffset;
  
  // get方法阻塞
  public V get() throws InterruptedException, ExecutionException {
        int s = state;
      // state小于等于COMPLETING表示还没开始执行
        if (s <= COMPLETING)
          // 休眠等待执行结果
            s = awaitDone(false0L);
     // 休眠返回,调用report获取结果
        return report(s);
    }
  
  private int awaitDone(boolean timed, long nanos)
        throws InterruptedException 
{
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // 线程等待节点
        WaitNode q = null;
    // 是否入队
        boolean queued = false;
    // 自旋
        for (;;) {
           // 中断之后移除自身对应的等待的节点
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
          // 状态大于COMPLETING,任务完成或者取消或者中断,则返回
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
          // 状态等于COMPLETING,任务正在执行,获取任务结果的线程让出cpu
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
          // 若当前线程还没有进入线程等待链表的WaitNode节点,此时新建一个WaitNode节点,并把当前线程赋值给WaitNode节点的thread属性,创建等待节点,在下一轮自旋时会入队
            else if (q == null)
                q = new WaitNode();
          // 还没有入队,则入队等待执行结果
            else if (!queued) 
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
          // 定时,则观察时间是否到了
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
              // 被finishCompletion唤醒,自旋回到for循环重新往下执行
                LockSupport.park(this);
        }
    }
  
  // 执行
  public void run() {
    // 不是新建的任务 并且不能将执行线程由null变为当前线程,则直接返回
    // 如果任务状态为NEW且runner为null,说明还未有线程执行过异步任务,则满足条件,可以执行
    // 如果任务状态不为NEW,说明已经有线程执行过异步任务,没有必要再次执行,直接返回
    // 如果任务状态为NEW且runner不为null,说明异步任务正在执行,直接返回
   
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
              // 异步任务结果
                V result;
              // 异步任务执行成功还是失败标志
                boolean ran;
                try {
                  // 执行任务的call方法
                  // Callable.call
                    result = c.call();
                  // 正常执行完成,设置为true
                    ran = true;
                } catch (Throwable ex) { // 执行失败,也会修改状态,然后唤醒get方法阻塞的线程
                    result = null;
                  // 抛出异常,设置为false
                    ran = false;
                  // 异常结果赋值给outcome
                    setException(ex);
                }
              // 执行成功,则设置结果
                if (ran)
                  // 赋值给outcome
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
          // 异步任务正在执行过程中,runner一直都是非空的,防止并发执行
          // 任务执行结束后,不管成功还是失败,都将runner设置为null
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
          // 表示被中断了,调用了cancel(true),调用handlePossibleCancellationInterrupt处理中断
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
  // 设置结果并进行通知
  protected void set(V v) {
    // 将状态由NEW 变为COMPLETING
    // 任务不会被并发执行,导致状态不为NEW的原因是中断
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
      // 将结果赋给outcome
      outcome = v;
      // 将状态变为NORMAL
      UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
      // 唤醒调用get方法阻塞等待结果的线程
      finishCompletion();
    }
  }
  
  // 设置异常
  protected void setException(Throwable t) {
    // 任务不会被并发执行,导致状态不为NEW的原因是中断
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
          // 将异常赋给outcome
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }
  
  private void finishCompletion() {
        // assert state > COMPLETING;
    // waiters中存储的是调用了get方法的线程集合,遍历
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                  // waitNode节点的线程
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                      // 唤醒阻塞的线程
                        LockSupport.unpark(t);
                    }
                  // 继续取下一个节点
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null// unlink to help gc
                    q = next;
                }
                break;
            }
        }
   // 正常执行完成还是抛出异常都会执行done方法
        done();

        callable = null;        // to reduce footprint
    }
}
  
public interface RunnableFuture<Vextends RunnableFuture<V{
    
    void run();
}

示例:

public class TestCallable {

    public static void main(String[] args) {
        CallDemo call = new CallDemo();
        FutureTask<Integer> futureTask = new FutureTask<>(call);
        new Thread(futureTask).start();

        try {
            // get方法是一个阻塞方法  在没有执行结束之前一直阻塞,直到执行完毕
            int sum = futureTask.get();
            System.out.println("---------");
            System.out.println(sum);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

    }
}

/**
 * Callable相较于Runnable有返回值和异常
 */

class CallDemo implements Callable<Integer>{

    @Override
    public Integer call() throws Exception {
        int sum = 0;
        for(int i = 0;i<1000;i++){
            sum +=i;
        }
        return sum;
    }
}

CompletionService

Future具有阻塞同步性的特点,由于阻塞代码的运行效率会比较差,而CompletionService可以解决这样的问题

CompletionService的功能是以异步的方式一边生产新的任务,一边处理已完成任务的结果,这样可以将执行任务与处理任务分离开进行处理,使用submit执行任务,使用take取得已完成的任务,并按照完成这些任务的时间顺序处理它们的结果,把线程池Executor和阻塞队列BlockingQueue融合在一起

ExecutorCompletionService是CompletionService的唯一实现类

// 依赖于Executor对象,completionQueue作为完成队列
public ExecutorCompletionService(Executor executor) {
    if (executor == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
源码
take

用来取得completionQueue队列中的Future对象,最先完成的会先进入completionQueue队列中,执行时间最短的最先返回,虽然调用completionService.take().get()也会进行阻塞,但是并不会阻塞后续的任务,哪个任务先执行完,哪个任务的返回值就先打印

public Future<V> take() throws InterruptedException {
    return completionQueue.take();
}
poll

获取并移除表示下一个已完成任务的Future,如果不存在这样的任务,则返回null,无阻塞效果

public Future<V> poll() {
    return completionQueue.poll();
}
示例
public class TestCompletionService {
    public static void main(String[] args) {

        try{
            ExecutorService executorService = Executors.newFixedThreadPool(2);

            ExecutorCompletionService<String> executorCompletionService = new ExecutorCompletionService<>(executorService);

            for (int i = 0; i < 10; i++) {
                executorCompletionService.submit(new Callable<String>() {
                    @Override
                    public String call() throws Exception {
                        long sleep = (long) (Math.random() * 1000);
                        System.out.println("sleep=" + sleep + " " + Thread.currentThread().getName());
                        Thread.sleep(sleep);
                        return Thread.currentThread().getName() + " " + sleep;
                    }
                });
            }

            for (int i = 0; i < 10; i++) {
                System.out.println(executorCompletionService.take().get());
            }
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

    }

}

https://zhhll.icu/2020/多线程/基础/2.Callable接口/

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/729140.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

网站被插入虚假恶意链接怎么办?

在当前的电信和网络环境中&#xff0c;诈骗案件频发&#xff0c;许多受害者不幸上当&#xff0c;主要原因是他们点击了诈骗者发送的假链接。这些诈骗网站经常模仿真实网站的外观&#xff0c;使人难以分辨真伪。那么&#xff0c;我们应如何鉴别这些诈骗链接呢&#xff1f; 下面…

多个数据库驱动并存情况下无法找到数据库驱动问题解决

最近在打fat-jar包时&#xff0c;出现一个问题&#xff0c;运行fat-jar时报错&#xff1a; 2024-03-08 10:08:08.772 WARN [Thread-5] c.k.s.f.ExternalFuncClassLoader : 创建了未包含任何路径的自定义类加载器实例 2024-03-08 10:08:08.781 INFO [Thread-5] c.k.…

leetcode 2834.找出美丽数组的最小和

这道题作者一开始用的暴力解法&#xff0c;效果还不错&#xff0c;但是对于特别大的数据是过不去的。 先讲一下我暴力的思路&#xff1a;作者用了双指针的解法&#xff0c;我们可以先定义一个数组&#xff0c;就是把数组从1开始考虑&#xff0c;n个连续整数输入到里面去。 然…

工业制氧机的使用与维护管理指南

工业制氧机是工业生产中不可或缺的重要设备&#xff0c;其高效稳定的供氧功能对于保障生产过程的顺利进行至关重要。为了确保工业制氧机能够持续高效地提供氧气&#xff0c;正确的使用方法和维护措施是必不可少的。 在使用工业制氧机时&#xff0c;我们首先要确保设备放置在通风…

网络聊天室的UDP实现以及数据库

网络聊天室UDP实现 服务器端&#xff1a; 头文件&#xff1a; #include <myhead.h>//定义客户信息结构体 typedef struct magtye {char type; //消息类型char name[100]; //客户姓名char text[1024]; //客户发送聊天信息 }msg_t;//定义结构体存储…

mysql的语法总结2

命令&#xff1a; mysql -u 用户名 -p mysql登录 命令&#xff1a;create database u1 创建数据库u1 查询数据库 使用数据库u1 创建表department 查询表department ALTER TABLE 表名 操作类型&#xff1b; 操作类型可以有以下的操作&#xff1a; 添加列&#x…

ZCC3808 低静态电流、可编程延迟监控电路 替代TPS3808

1 特性 • 上电复位发生器具有可调节延迟时间&#xff1a;1.25ms 至10s • 超低静态电流&#xff1a;2.4μA&#xff08;典型值&#xff09; • 高阈值精度&#xff1a;0.5% 典型值 • 提供适用于标准电压轨的 0.9V 至 5V 固定阈值电压且可调节电压低至 0.4V • 手动复位 …

CubeMX使用教程(2)——点亮LED

在上一章&#xff0c;我们完成了CubeMX的环境配置&#xff0c;这一章我们通过CubeMX来完成点亮LED的工作。 通过LED原理图可知&#xff0c;如果我们要点亮LD1&#xff08;第一个灯&#xff09;&#xff0c;它对应开发板的PC8端口&#xff0c;因此我们应该在CubeMX中将PC8配置为…

【并查集】一种简单而强大高效的数据结构

目录 一、并查集原理 二、并查集实现 三、并查集应用 1. LeetCode并查集相关OJ题 2. 并查集的其他应用及总结 一、并查集原理 并查集&#xff08;Disjoint Set&#xff09;是一种用来管理元素分组和查找元素所属组别的数据结构。它主要支持两种操作&#xff1a;查找&…

公司内部的手机拍照管理办法建议

公司内部的手机拍照管理是一项重要的工作&#xff0c;因为它涉及到公司的信息安全、知识产权保护和隐私保护等方面。以下是一些建议的公司内部手机拍照管理办法&#xff1a; 明确拍照目的和范围&#xff1a;首先&#xff0c;公司应明确员工使用手机拍照的目的和范围。例如&…

JavaScript进阶 (1)

封装 构造函数存在问题 js可以通过构造函数进行封装&#xff0c;但存在浪费内存问题 每创建新的对象引用数据类型就开辟新的空间 原型 构造函数通过原型分配函数是所有对象所共享的 每一个构造函数都有一个prototype属性&#xff0c;指向另一个对象&#xff0c;也称为原型…

小型内衣裤洗衣机哪个牌子好?四款高热度内衣洗衣机力荐

相信很多用户从小就有个观念&#xff0c;内衣裤不能跟其他衣物一起混合洗&#xff0c;否则会感染细菌&#xff0c;所以不少人的内衣裤一直都是自己手洗的&#xff0c;清洗内衣裤不算麻烦&#xff0c;但日常都要换洗&#xff0c;对一个白天上班已经很累的人来说&#xff0c;真是…

COM(Component Object Model)通信技术

COM&#xff08;Component Object Model&#xff09;通信技术是一种用于组件之间通信的二进制接口标准&#xff0c;它允许在不同进程或计算机上的组件进行交互。以下是 COM 通信技术的一些基本原理&#xff1a; 1. 二进制接口&#xff08;Binary Interface&#xff09;&#x…

如何 借助 AI + bat,1分钟内建立100个自定义文件和文件夹?

01 你好&#xff0c;我是云桃桃。 最近&#xff0c;我在写web系列的知识&#xff0c;做的过程中遇到过不少问题。今天&#xff0c;就来说说&#xff0c;我解决的一个批量新建文件/文件夹的问题。 事情是这样的。首先&#xff0c;我的大纲基本在幕布里已经弄好了&#xff0c;…

简析内部审计数字化转型的方法和路径【小落送书(第6期)】

个人名片&#xff1a; &#x1f43c;作者简介&#xff1a;一名大三在校生&#xff0c;喜欢AI编程&#x1f38b; &#x1f43b;‍❄️个人主页&#x1f947;&#xff1a;落798. &#x1f43c;个人WeChat&#xff1a;hmmwx53 &#x1f54a;️系列专栏&#xff1a;&#x1f5bc;️…

mysqld_exporter安装

1.介绍 主要监控Mysql数据库的稳定性、吞吐量、连接情况、缓冲池使用情况、查询性能等各项指标&#xff0c;是我们压测时常常需要监控的一些指标。2.安装 官方网站下载安装包&#xff0c;区分操作系统3.步骤 &#xff08;1&#xff09;直接解压 &#xff08;2&#xff09;在当…

基于ACM32 MCU的电动滑板车方案介绍

随着智能科技的快速发展&#xff0c;电动滑板车的驱动系统也得到了长足的发展。国内外的电动滑板车用电机驱动系统分为传统刷式电机和无刷电机两种类型。其中&#xff0c;传统的刷式电机已经逐渐被无刷电机所取代&#xff0c;无刷电机的性能和寿命都更出色&#xff0c;已成为电…

玩转AI大模型应用开发,轻松打造热门APPai数字人直播软件!

AI大模型应用在数字人直播领域的应用愈发成熟&#xff0c;为开发者提供了更多创意和可能性。数字人直播软件是当前热门的应用之一&#xff0c;它结合了虚拟主播和人工智能技术&#xff0c;为用户带来全新的互动体验。想要打造一个火爆的数字人直播软件&#xff0c;就需要玩转AI…

从0到1快速搭建一个jeecg 企业级应用管理后台

一. 基本介绍 官网地址&#xff1a;https://jeecg.com/ JeecgBoot 是一款企业级的低代码平台&#xff01;前后端分离架构 SpringBoot2.x&#xff0c;SpringCloud&#xff0c;Ant Design&Vue3&#xff0c;Mybatis-plus&#xff0c;Shiro&#xff0c;JWT 支持微服务。强大的…

vue 使用谷歌地图 @googlemaps/js-api-loader 进行模糊搜索

<template><div class"map"><div class"mapLeftStyle"><el-inputv-model"input"placeholder"请输入内容"class"controls"input"chnageinput"><i slot"prefix" class"e…