线程和线程池
- AsyncTask
- IntentService
- 线程池
- ThreadPoolExecutor
- FixedThreadPool
- CachedThreadPool
- ScheduledExecutor
- SingleThreadExecutor
AsyncTask
使用案例可看Android基础——异步消息处理,需要注意
- AsyncTask必须在主线程中加载,在ActivityThread的main()中会调用AsyncTask的init()
- AsyncTask对象必须在主线程中创建及调用execute(),且一个对象只能调用一次
- 不要直接调用onPreExecute()、onPostExecute()、doInBackground()、onProgressUpdate()
- execute()串行执行,executeOnexecutor()传入THREAD_POOL_EXECUTOR并行执行
execute()调用executeOnExecutor()传入sDefaultExecutor,将params封装到WorkerRunnable、FutureTask
public final AsyncTask<Params, Progress, Result> execute(Params... params) {return executeOnExecutor(sDefaultExecutor, params);
}public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,Params... params) {......mStatus = Status.RUNNING;onPreExecute();mWorker.mParams = params;exec.execute(mFuture);return this;
}
sDefaultExecutor是一个串行线程池,调用其execute()
- 将上面封装的FutureTask插入到mTasks
- 如果当前没有正在运行的AsyncTask任务,则调用scheduleNext()
- scheduleNext()将mTasks中的任务取出,在线程池中执行FutureTask的run()
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();private static class SerialExecutor implements Executor {final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();Runnable mActive;public synchronized void execute(final Runnable r) {mTasks.offer(new Runnable() {public void run() {try {r.run();} finally {scheduleNext();}}});if (mActive == null) {scheduleNext();}}protected synchronized void scheduleNext() {if ((mActive = mTasks.poll()) != null) {THREAD_POOL_EXECUTOR.execute(mActive);}}
}public static final Executor THREAD_POOL_EXECUTOR;
static {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(), sThreadFactory);threadPoolExecutor.setRejectedExecutionHandler(sRunOnSerialPolicy);THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
FutureTask的run()会调用到WorkerRunnable的call()
- 调用doInBackground()
- 然后调用postResult(),通过Handler发送MESSAGE_POST_RESULT,这里的Handler实际是InternalHandler
public AsyncTask() {this((Looper) null);
}public AsyncTask(@Nullable Handler handler) {this(handler != null ? handler.getLooper() : null);
}public AsyncTask(@Nullable Looper callbackLooper) {mHandler = callbackLooper == null || callbackLooper == Looper.getMainLooper()? getMainHandler(): new Handler(callbackLooper);mWorker = new WorkerRunnable<Params, Result>() {public Result call() throws Exception {mTaskInvoked.set(true);Result result = null;try {Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);result = doInBackground(mParams);Binder.flushPendingCommands();} catch (Throwable tr) {mCancelled.set(true);throw tr;} finally {postResult(result);}return result;}};......};
}private Result postResult(Result result) {Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,new AsyncTaskResult<Result>(this, result));message.sendToTarget();return result;
}private Handler getHandler() {return mHandler;
}private static Handler getMainHandler() {synchronized (AsyncTask.class) {if (sHandler == null) {sHandler = new InternalHandler(Looper.getMainLooper());}return sHandler;}
}
InternalHandler会回调onProgressUpdate() / onCancelled() / onPostExecute,为了能够让执行环境切换到主线程,InternalHandler必须在主线程中创建
private static class InternalHandler extends Handler {public InternalHandler(Looper looper) {super(looper);}@Overridepublic void handleMessage(Message msg) {AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;switch (msg.what) {case MESSAGE_POST_RESULT:result.mTask.finish(result.mData[0]);break;case MESSAGE_POST_PROGRESS:result.mTask.onProgressUpdate(result.mData);break;}}
}private void finish(Result result) {if (isCancelled()) {onCancelled(result);} else {onPostExecute(result);}mStatus = Status.FINISHED;
}
IntentService
IntentService继承了Service,且是抽象类,用于执行后台耗时任务,执行完后自动停止
public class LocalIntentService extends IntentService {private static final String TAG = "LocalIntentService";public LocalIntentService() {super(TAG);}@Overrideprotected void onHandleIntent(@Nullable Intent intent) {String action = intent.getStringExtra("action");Log.d(TAG, "onHandleIntent: receive action = " + action);SystemClock.sleep(5000);if ("com.demo.demo0".equals(action)) {Log.d(TAG, "onHandleIntent: handle action = " + action);}}@Overridepublic void onDestroy() {super.onDestroy();Log.d(TAG, "onDestroy: ");}
}
如上,在onHandleIntent()中处理intent中传过来的数据,如下通过startService()调用
public class MainActivity extends AppCompatActivity {private static final String TAG = "MainActivity";protected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);Intent service = new Intent(this,LocalIntentService.class);service.putExtra("action","com.demo.demo0");startService(service);service.putExtra("action","com.demo.demo1");startService(service);}
}
打印如下
onHandleIntent: receive action = com.demo.demo0
onHandleIntent: handle action = com.demo.demo0
onHandleIntent: receive action = com.demo.demo1
onDestroy:
下面分析其原理,onCrete()如下,创建HandlerThread,其调用start(),并用其Looper创建一个ServiceHandler
@Override
public void onCreate() {super.onCreate();HandlerThread thread = new HandlerThread("IntentService[" + mName + "]");thread.start();mServiceLooper = thread.getLooper();mServiceHandler = new ServiceHandler(mServiceLooper);
}private volatile ServiceHandler mServiceHandler;private final class ServiceHandler extends Handler {public ServiceHandler(Looper looper) {super(looper);}@Overridepublic void handleMessage(Message msg) {onHandleIntent((Intent)msg.obj);stopSelf(msg.arg1);}
}
HandlerThread继承了Thread,start()会调用其run(),内部创建Looper,当无需使用时,需调用quit() / quitSafely()终止Looper来终止线程
@Override
public void run() {mTid = Process.myTid();Looper.prepare();synchronized (this) {mLooper = Looper.myLooper();notifyAll();}Process.setThreadPriority(mPriority);onLooperPrepared();Looper.loop();mTid = -1;
}
启动服务调用onStartCommand()、onStart(),通过mServiceHandler发送Message,handleMessage()会在HandlerThread中执行(会到创建Handler所在的Looper中执行),回调onHandleIntent()处理耗时任务,处理完后调用stopSelf()停止服务
@Override
public int onStartCommand(@Nullable Intent intent, int flags, int startId) {onStart(intent, startId);return mRedelivery ? START_REDELIVER_INTENT : START_NOT_STICKY;
}@Override
public void onStart(@Nullable Intent intent, int startId) {Message msg = mServiceHandler.obtainMessage();msg.arg1 = startId;msg.obj = intent;mServiceHandler.sendMessage(msg);
}
线程池
- 重用线程池中的线程,可以避免创建和销毁带来的性能开销
- 控制最大并发数,避免线程之间抢占资源导致阻塞
- 对线程进行管理,提供定时执行、循环执行等功能
ThreadPoolExecutor
通过一系列参数配置线程池
- corePoolSize:核心线程数,当allowsCoreThreadTimeOut=true且核心线程闲置时间超过keepAliveTime就会终止,否则一直存活
- maximumPoolSize:最大线程数,超过后接下来的任务会被阻塞
- keepAliveTime:非核心线程闲置时间,超过后非核心线程会被回收,当allowsCoreThreadTimeOut=true同样作用于核心线程
- unit:指定keepAliveTime的时间单位
- workQueue:线程池的任务队列,存储execute()提交的Runnable对象
- threadFactory:线程工厂,为线程池创建新线程
- handler:当线程池已满或无法成功执行时,调用其中的rejectedExecution()抛出异常
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;
}
- 当线程数量 < 核心线程数量,启动核心线程执行任务
- 当线程数量 >= 核心线程数量,任务插入队列等待
- 当任务队列已满,若未达到最大线程数量,启动非核心线程执行任务,若已达到最大线程数量,则调用rejectedExecution()
FixedThreadPool
只有核心线程且数量固定,闲置时不会被回收(可以更快速响应请求),当线程池的线程都在工作时,新任务处于等待状态直到线程空闲出来
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
CachedThreadPool
无核心线程,最大线程数量无限制,闲置60s会被回收,当线程池中的线程都在工作时,会创建新线程处理新任务(任何任务都会被立即执行,适合执行大量耗时少的任务),无任务时不占用系统资源
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
ScheduledExecutor
核心线程数量固定,非核心线程数量无限制,闲置10ms后回收,主要用于执行定时任务和具有固定周期的重复任务,schedule()可以延迟运行,scheduleAtFixedRate()可以在延迟后每隔一段时间运行
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);
}
private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE,DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,new DelayedWorkQueue());}
SingleThreadExecutor
只有一个核心线程,确保所有任务都在同一线程中按顺序执行,统一所有任务到一个线程,不需要处理线程同步问题
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}