使用 WorkManager API 可以轻松地调度那些必须可靠运行的可延期异步任务。通过这些 API,您可以创建任务并提交给 WorkManager,以便在满足工作约束条件时运行。
一、使用
1.1 基本使用
定义任务:
public class MainWorker1 extends Worker {private static final String TAG = MainWorker1.class.getSimpleName();// 任务参数private WorkerParameters workerParameters;public MainWorker1(@NonNull Context context, @NonNull WorkerParameters workerParams) {super(context, workerParams);workerParameters = workerParams;}// 执行异步的后台任务,Result 表示结果@SuppressLint("RestrictedApi")@NonNull@Overridepublic Result doWork() {// 获取给当前任务输入的参数String value = workerParameters.getInputData().getString("key");Log.d(TAG, "doWork: " + value);// 任务对外输出的参数封装在 Result 中返回Data outputData = new Data.Builder().putString("result", "result").build();return new Result.Success(outputData);}
}
任务的使用:
public void onTest1Click(View view) {// 输入给任务的参数Data inputData = new Data.Builder().putString("key", "value111").build();OneTimeWorkRequest request = new OneTimeWorkRequest.Builder(MainWorker1.class).setInputData(inputData).build();// 获取 WorkInfo,包括数据和运行状态WorkManager.getInstance(this).getWorkInfoByIdLiveData(request.getId()).observe(this, new Observer<WorkInfo>() {@Overridepublic void onChanged(WorkInfo workInfo) {// 获取 Worker 返回的数据Log.d(TAG, "任务回传的数据: " + workInfo.getOutputData().getString("result"));// 输出 Worker 的工作状态:ENQUEUED、RUNNING、SUCCEEDED 以及结束,此外还有// FAILED,BLOCKED,CANCELLEDLog.d(TAG, "onChanged: " + workInfo.getState().name());if (workInfo.getState().isFinished()) {Log.d(TAG, "onChanged: 任务结束!");}}});// 任务入队WorkManager.getInstance(this).enqueue(request);}
输出 log:
D/MainWorker1: doWork: value111
D/BaseActivity: 任务回传的数据: null
D/BaseActivity: onChanged: ENQUEUED
D/BaseActivity: 任务回传的数据: null
D/BaseActivity: onChanged: RUNNING
D/BaseActivity: 任务回传的数据: result
D/BaseActivity: onChanged: SUCCEEDED
D/BaseActivity: onChanged: 任务结束!
可以看到 Worker 回传的结果数据在其处于 RUNNING 状态之后才会生成。
1.2 任务组合
WorkManager.getInstance(myContext)// Candidates to run in parallel.beginWith(Arrays.asList(plantName1, plantName2, plantName3))// Dependent work (only runs after all previous work in chain).then(cache).then(upload)// Call enqueue to kick things off.enqueue();
1.3 周期任务
public void onTestPeriodicWorkRequest(View view) {// 即便你设置了 10 秒中执行一次周期任务,但是 Google 对此进行了严格的限制,周期任务的执行间隔为 15 分钟PeriodicWorkRequest periodicWorkRequest = new PeriodicWorkRequest.Builder(MainWorker1.class,10, TimeUnit.SECONDS).build();WorkManager.getInstance(this).getWorkInfoByIdLiveData(periodicWorkRequest.getId()).observe(this, new Observer<WorkInfo>() {@Overridepublic void onChanged(WorkInfo workInfo) {Log.d(TAG, "任务状态: " + workInfo.getState().name());if (workInfo.getState().isFinished()) {Log.d(TAG, "任务结束!");}}});WorkManager.getInstance(this).enqueue(periodicWorkRequest);}
周期任务要注意两点:
- 如果设置的 PeriodicWorkRequest 的执行间隔小于 15 分钟,系统会自动将这个间隔设置为 15 分钟。
- 由于是循环式的执行任务,所以输出的任务状态是 ENQUEUED、RUNNING 然后执行任务,过了 15 分钟之后循环上述状态,不会输出 SUCCESS 状态。
1.4 约束条件
@RequiresApi(api = Build.VERSION_CODES.M)public void testBackgroundWork5(View view) {// 约束条件,必须满足我的条件,才能执行后台任务 (在连接网络,插入电源 并且 处于空闲时)Constraints constraints = new Constraints.Builder().setRequiredNetworkType(NetworkType.CONNECTED) // 网络链接中....setRequiresCharging(true) // 充电中...setRequiresDeviceIdle(true) // 空闲时.. .build();/*** 除了上面设置的约束外,WorkManger还提供了以下的约束作为Work执行的条件:* setRequiredNetworkType:网络连接设置* setRequiresBatteryNotLow:是否为低电量时运行 默认false* setRequiresCharging:是否要插入设备(接入电源),默认false* setRequiresDeviceIdle:设备是否为空闲,默认false* setRequiresStorageNotLow:设备可用存储是否不低于临界阈值*/// 请求对象OneTimeWorkRequest request = new OneTimeWorkRequest.Builder(MainWorker3.class).setConstraints(constraints) // Request关联 约束条件.build();// 加入队列WorkManager.getInstance(this).enqueue(request);}
任务的所有信息都会被保存到 Room 数据库,所以应用提交任务后被卸载,只要满足该任务条件也会执行这个任务。使得任务可以脱离应用本身。
二、原理
2.1 初始化过程
使用时都是通过 WorkManager.getInstance() 先拿到一个 WorkManager 实例然后再将任务入队:
public static @NonNull WorkManager getInstance(@NonNull Context context) {return WorkManagerImpl.getInstance(context);}
WorkManagerImpl 会返回一个单例对象:
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)public static @NonNull WorkManagerImpl getInstance(@NonNull Context context) {synchronized (sLock) {WorkManagerImpl instance = getInstance();if (instance == null) {Context appContext = context.getApplicationContext();// 创建 instance 所需的 appContext 必须实现了 Configuration.Provider 接口if (appContext instanceof Configuration.Provider) {initialize(appContext,((Configuration.Provider) appContext).getWorkManagerConfiguration());instance = getInstance(appContext);} else {throw new IllegalStateException("WorkManager is not initialized properly. You "+ "have explicitly disabled WorkManagerInitializer in your manifest, "+ "have not manually called WorkManager#initialize at this point, and "+ "your Application does not implement Configuration.Provider.");}}return instance;}}
这个初始化过程实际上涉及 WorkManagerImpl 中的两个单例:sDefaultInstance 和 sDelegatedInstance。首先看空参的 getInstance(),如果 sDelegatedInstance 不为空就返回它,否则返回 sDefaultInstance:
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)public static @Nullable WorkManagerImpl getInstance() {synchronized (sLock) {if (sDelegatedInstance != null) {return sDelegatedInstance;}return sDefaultInstance;}}
如果该方法返回了 null 则需要通过 initialize() 进行初始化,不过有个前提就是这里用到的 ApplicationContext 必须实现了 Configuration.Provider 接口:
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)public static void initialize(@NonNull Context context, @NonNull Configuration configuration) {synchronized (sLock) {if (sDelegatedInstance != null && sDefaultInstance != null) {throw new IllegalStateException("WorkManager is already initialized. Did you "+ "try to initialize it manually without disabling "+ "WorkManagerInitializer? See "+ "WorkManager#initialize(Context, Configuration) or the class level "+ "Javadoc for more information.");}if (sDelegatedInstance == null) {context = context.getApplicationContext();if (sDefaultInstance == null) {sDefaultInstance = new WorkManagerImpl(context,configuration,new WorkManagerTaskExecutor(configuration.getTaskExecutor()));}sDelegatedInstance = sDefaultInstance;}}}
实际上,执行这个初始化过程的并不是我们在使用时执行的,而是由 WorkManager 内部的 WorkManagerInitializer 这个 ContentProvider 来做的。在 apk 文件的 AndroidManifest 中能找到其声明:
<providerandroid:name="androidx.work.impl.WorkManagerInitializer"android:exported="false"android:multiprocess="true"android:authorities="com.demo.workmanager.workmanager-init"android:directBootAware="false" />
第一次执行初始化是在 WorkManagerInitializer 的 onCreate() 中:
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public class WorkManagerInitializer extends ContentProvider {@Overridepublic boolean onCreate() {// 实际上执行 WorkManagerImpl.initialize()// Initialize WorkManager with the default configuration.WorkManager.initialize(getContext(), new Configuration.Builder().build());return true;}
}
然后我们再来看看 WorkManagerImpl 初始化的具体内容:
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)public WorkManagerImpl(@NonNull Context context,@NonNull Configuration configuration,@NonNull TaskExecutor workTaskExecutor,@NonNull WorkDatabase database) {Context applicationContext = context.getApplicationContext();Logger.setLogger(new Logger.LogcatLogger(configuration.getMinimumLoggingLevel()));// 创建 Scheduler,其中一个是 GreedyScheduler —— 贪婪执行器,会马上执行任务List<Scheduler> schedulers = createSchedulers(applicationContext, workTaskExecutor);Processor processor = new Processor(context,configuration,workTaskExecutor,database,schedulers);internalInit(context, configuration, workTaskExecutor, database, schedulers, processor);}private void internalInit(@NonNull Context context,@NonNull Configuration configuration,@NonNull TaskExecutor workTaskExecutor,@NonNull WorkDatabase workDatabase,@NonNull List<Scheduler> schedulers,@NonNull Processor processor) {context = context.getApplicationContext();mContext = context;// 保存配置,包括线程池任务等mConfiguration = configuration;mWorkTaskExecutor = workTaskExecutor;// 保存了 Room 数据库,用来保存任务mWorkDatabase = workDatabase;mSchedulers = schedulers;mProcessor = processor;mPreferenceUtils = new PreferenceUtils(workDatabase);mForceStopRunnableCompleted = false;// Checks for app force stops.mWorkTaskExecutor.executeOnBackgroundThread(new ForceStopRunnable(context, this));}
2.2 入队过程
我们调用 WorkManagerImpl 的 enqueue() 将任务 WorkRequest 入队时,会先交给 WorkContinuationImpl:
@Override@NonNullpublic Operation enqueue(@NonNull List<? extends WorkRequest> workRequests) {// This error is not being propagated as part of the Operation, as we want the// app to crash during development. Having no workRequests is always a developer error.if (workRequests.isEmpty()) {throw new IllegalArgumentException("enqueue needs at least one WorkRequest.");}return new WorkContinuationImpl(this, workRequests).enqueue();}
先创建一个 WorkContinuationImpl:
WorkContinuationImpl(@NonNull WorkManagerImpl workManagerImpl,String name,ExistingWorkPolicy existingWorkPolicy,@NonNull List<? extends WorkRequest> work,@Nullable List<WorkContinuationImpl> parents) {// 保存 WorkManagerImplmWorkManagerImpl = workManagerImpl;mName = name;// 入口构造方法给的 ExistingWorkPolicy.KEEPmExistingWorkPolicy = existingWorkPolicy;// 保存 WorkRequest 列表mWork = work;// 入口构造方法给的 nullmParents = parents;mIds = new ArrayList<>(mWork.size());mAllIds = new ArrayList<>();if (parents != null) {for (WorkContinuationImpl parent : parents) {mAllIds.addAll(parent.mAllIds);}}for (int i = 0; i < work.size(); i++) {String id = work.get(i).getStringId();mIds.add(id);mAllIds.add(id);}}
接下来执行 WorkContinuationImpl 的 enqueue():
@Overridepublic @NonNull Operation enqueue() {// Only enqueue if not already enqueued.if (!mEnqueued) {// 使用 Configuration 中默认的线程池的 SerialExecutor 执行 EnqueueRunnable// 任务,并且获得其中的 mOperation 对象,然后返回// The runnable walks the hierarchy of the continuations// and marks them enqueued using the markEnqueued() method, parent first.EnqueueRunnable runnable = new EnqueueRunnable(this);mWorkManagerImpl.getWorkTaskExecutor().executeOnBackgroundThread(runnable);mOperation = runnable.getOperation();} else {Logger.get().warning(TAG,String.format("Already enqueued work ids (%s)", TextUtils.join(", ", mIds)));}return mOperation;}
而 EnqueueRunnable 会执行任务:
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public class EnqueueRunnable implements Runnable {private final WorkContinuationImpl mWorkContinuation;private final OperationImpl mOperation;public EnqueueRunnable(@NonNull WorkContinuationImpl workContinuation) {mWorkContinuation = workContinuation;mOperation = new OperationImpl();}@Overridepublic void run() {try {// 检测是否有环if (mWorkContinuation.hasCycles()) {throw new IllegalStateException(String.format("WorkContinuation has cycles (%s)", mWorkContinuation));}// boolean needsScheduling = addToDatabase();if (needsScheduling) {// Enable RescheduleReceiver, only when there are Worker's that need scheduling.final Context context =mWorkContinuation.getWorkManagerImpl().getApplicationContext();PackageManagerHelper.setComponentEnabled(context, RescheduleReceiver.class, true);// 执行任务调度scheduleWorkInBackground();}mOperation.setState(Operation.SUCCESS);} catch (Throwable exception) {mOperation.setState(new Operation.State.FAILURE(exception));}}
}
addToDatabase():
@VisibleForTestingpublic boolean addToDatabase() {WorkManagerImpl workManagerImpl = mWorkContinuation.getWorkManagerImpl();WorkDatabase workDatabase = workManagerImpl.getWorkDatabase();workDatabase.beginTransaction();try {boolean needsScheduling = processContinuation(mWorkContinuation);workDatabase.setTransactionSuccessful();return needsScheduling;} finally {workDatabase.endTransaction();}}private static boolean processContinuation(@NonNull WorkContinuationImpl workContinuation) {boolean needsScheduling = false;List<WorkContinuationImpl> parents = workContinuation.getParents();if (parents != null) {for (WorkContinuationImpl parent : parents) {// When chaining off a completed continuation we need to pay// attention to parents that may have been marked as enqueued before.if (!parent.isEnqueued()) {needsScheduling |= processContinuation(parent);} else {Logger.get().warning(TAG, String.format("Already enqueued work ids (%s).",TextUtils.join(", ", parent.getIds())));}}}needsScheduling |= enqueueContinuation(workContinuation);return needsScheduling;}
关键是在 scheduleWorkInBackground() 中调度任务:
@VisibleForTestingpublic void scheduleWorkInBackground() {WorkManagerImpl workManager = mWorkContinuation.getWorkManagerImpl();Schedulers.schedule(workManager.getConfiguration(),workManager.getWorkDatabase(),workManager.getSchedulers());}public static void schedule(@NonNull Configuration configuration,@NonNull WorkDatabase workDatabase,List<Scheduler> schedulers) {if (schedulers == null || schedulers.size() == 0) {return;}WorkSpecDao workSpecDao = workDatabase.workSpecDao();List<WorkSpec> eligibleWorkSpecs;// 开始数据库操作workDatabase.beginTransaction();try {eligibleWorkSpecs = workSpecDao.getEligibleWorkForScheduling(configuration.getMaxSchedulerLimit());if (eligibleWorkSpecs != null && eligibleWorkSpecs.size() > 0) {long now = System.currentTimeMillis();// 遍历队列中所有的任务,添加到数据库中// Mark all the WorkSpecs as scheduled.// Calls to Scheduler#schedule() could potentially result in more schedules// on a separate thread. Therefore, this needs to be done first.for (WorkSpec workSpec : eligibleWorkSpecs) {workSpecDao.markWorkSpecScheduled(workSpec.id, now);}}workDatabase.setTransactionSuccessful();} finally {workDatabase.endTransaction();}if (eligibleWorkSpecs != null && eligibleWorkSpecs.size() > 0) {WorkSpec[] eligibleWorkSpecsArray = eligibleWorkSpecs.toArray(new WorkSpec[0]);// 交给 GreedyScheduler 执行任务// Delegate to the underlying scheduler.for (Scheduler scheduler : schedulers) {scheduler.schedule(eligibleWorkSpecsArray);}}}
来到 GreedyScheduler 内:
@Overridepublic void schedule(@NonNull WorkSpec... workSpecs) {if (mIsMainProcess == null) {// The default process name is the package name.mIsMainProcess = TextUtils.equals(mContext.getPackageName(), getProcessName());}if (!mIsMainProcess) {Logger.get().info(TAG, "Ignoring schedule request in non-main process");return;}registerExecutionListenerIfNeeded();// Keep track of the list of new WorkSpecs whose constraints need to be tracked.// Add them to the known list of constrained WorkSpecs and call replace() on// WorkConstraintsTracker. That way we only need to synchronize on the part where we// are updating mConstrainedWorkSpecs.List<WorkSpec> constrainedWorkSpecs = new ArrayList<>();List<String> constrainedWorkSpecIds = new ArrayList<>();for (WorkSpec workSpec : workSpecs) {if (workSpec.state == WorkInfo.State.ENQUEUED&& !workSpec.isPeriodic()&& workSpec.initialDelay == 0L&& !workSpec.isBackedOff()) {// 有约束条件if (workSpec.hasConstraints()) {if (SDK_INT >= 23 && workSpec.constraints.requiresDeviceIdle()) {// Ignore requests that have an idle mode constraint.Logger.get().debug(TAG,String.format("Ignoring WorkSpec %s, Requires device idle.",workSpec));} else if (SDK_INT >= 24 && workSpec.constraints.hasContentUriTriggers()) {// Ignore requests that have content uri triggers.Logger.get().debug(TAG,String.format("Ignoring WorkSpec %s, Requires ContentUri triggers.",workSpec));} else {constrainedWorkSpecs.add(workSpec);constrainedWorkSpecIds.add(workSpec.id);}// 没有约束条件} else {Logger.get().debug(TAG, String.format("Starting work for %s", workSpec.id));mWorkManagerImpl.startWork(workSpec.id);}}}// onExecuted() which is called on the main thread also modifies the list of mConstrained// WorkSpecs. Therefore we need to lock here.synchronized (mLock) {if (!constrainedWorkSpecs.isEmpty()) {Logger.get().debug(TAG, String.format("Starting tracking for [%s]",TextUtils.join(",", constrainedWorkSpecIds)));mConstrainedWorkSpecs.addAll(constrainedWorkSpecs);mWorkConstraintsTracker.replace(mConstrainedWorkSpecs);}}}
没有约束条件的情况下会把任务丢给 WorkManagerImpl 执行:
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)public void startWork(@NonNull String workSpecId) {startWork(workSpecId, null);}public void startWork(@NonNull String workSpecId,@Nullable WorkerParameters.RuntimeExtras runtimeExtras) {mWorkTaskExecutor.executeOnBackgroundThread(new StartWorkRunnable(this, workSpecId, runtimeExtras));}
StartWorkRunnable:
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public class StartWorkRunnable implements Runnable {private WorkManagerImpl mWorkManagerImpl;private String mWorkSpecId;private WorkerParameters.RuntimeExtras mRuntimeExtras;public StartWorkRunnable(WorkManagerImpl workManagerImpl,String workSpecId,WorkerParameters.RuntimeExtras runtimeExtras) {mWorkManagerImpl = workManagerImpl;mWorkSpecId = workSpecId;mRuntimeExtras = runtimeExtras;}@Overridepublic void run() {// Processor mWorkManagerImpl.getProcessor().startWork(mWorkSpecId, mRuntimeExtras);}
}
执行任务:
public boolean startWork(@NonNull String id,@Nullable WorkerParameters.RuntimeExtras runtimeExtras) {WorkerWrapper workWrapper;synchronized (mLock) {// Work may get triggered multiple times if they have passing constraints// and new work with those constraints are added.if (mEnqueuedWorkMap.containsKey(id)) {Logger.get().debug(TAG,String.format("Work %s is already enqueued for processing", id));return false;}// Worker 的包装对象workWrapper =new WorkerWrapper.Builder(mAppContext,mConfiguration,mWorkTaskExecutor,this,mWorkDatabase,id).withSchedulers(mSchedulers).withRuntimeExtras(runtimeExtras).build();ListenableFuture<Boolean> future = workWrapper.getFuture();future.addListener(new FutureListener(this, id, future),mWorkTaskExecutor.getMainThreadExecutor());mEnqueuedWorkMap.put(id, workWrapper);}// 执行任务mWorkTaskExecutor.getBackgroundExecutor().execute(workWrapper);Logger.get().debug(TAG, String.format("%s: processing %s", getClass().getSimpleName(), id));return true;}
WorkerWrapper 是一个 Runnable:
@WorkerThread@Overridepublic void run() {mTags = mWorkTagDao.getTagsForWorkSpecId(mWorkSpecId);mWorkDescription = createWorkDescription(mTags);runWorker();}private void runWorker() {if (tryCheckForInterruptionAndResolve()) {return;}mWorkDatabase.beginTransaction();try {mWorkSpec = mWorkSpecDao.getWorkSpec(mWorkSpecId);if (mWorkSpec == null) {Logger.get().error(TAG,String.format("Didn't find WorkSpec for id %s", mWorkSpecId));resolve(false);return;}// Do a quick check to make sure we don't need to bail out in case this work is already// running, finished, or is blocked.if (mWorkSpec.state != ENQUEUED) {resolveIncorrectStatus();mWorkDatabase.setTransactionSuccessful();Logger.get().debug(TAG,String.format("%s is not in ENQUEUED state. Nothing more to do.",mWorkSpec.workerClassName));return;}// Case 1:// Ensure that Workers that are backed off are only executed when they are supposed to.// GreedyScheduler can schedule WorkSpecs that have already been backed off because// it is holding on to snapshots of WorkSpecs. So WorkerWrapper needs to determine// if the ListenableWorker is actually eligible to execute at this point in time.// Case 2:// On API 23, we double scheduler Workers because JobScheduler prefers batching.// So is the Work is periodic, we only need to execute it once per interval.// Also potential bugs in the platform may cause a Job to run more than once.if (mWorkSpec.isPeriodic() || mWorkSpec.isBackedOff()) {long now = System.currentTimeMillis();// Allow first run of a PeriodicWorkRequest// to go through. This is because when periodStartTime=0;// calculateNextRunTime() always > now.// For more information refer to b/124274584boolean isFirstRun = mWorkSpec.periodStartTime == 0;if (!isFirstRun && now < mWorkSpec.calculateNextRunTime()) {Logger.get().debug(TAG,String.format("Delaying execution for %s because it is being executed "+ "before schedule.",mWorkSpec.workerClassName));// For AlarmManager implementation we need to reschedule this kind of Work.// This is not a problem for JobScheduler because we will only reschedule// work if JobScheduler is unaware of a jobId.resolve(true);return;}}// Needed for nested transactions, such as when we're in a dependent work request when// using a SynchronousExecutor.mWorkDatabase.setTransactionSuccessful();} finally {mWorkDatabase.endTransaction();}// Merge inputs. This can be potentially expensive code, so this should not be done inside// a database transaction.Data input;if (mWorkSpec.isPeriodic()) {input = mWorkSpec.input;} else {InputMergerFactory inputMergerFactory = mConfiguration.getInputMergerFactory();String inputMergerClassName = mWorkSpec.inputMergerClassName;InputMerger inputMerger =inputMergerFactory.createInputMergerWithDefaultFallback(inputMergerClassName);if (inputMerger == null) {Logger.get().error(TAG, String.format("Could not create Input Merger %s",mWorkSpec.inputMergerClassName));setFailedAndResolve();return;}List<Data> inputs = new ArrayList<>();inputs.add(mWorkSpec.input);inputs.addAll(mWorkSpecDao.getInputsFromPrerequisites(mWorkSpecId));input = inputMerger.merge(inputs);}WorkerParameters params = new WorkerParameters(UUID.fromString(mWorkSpecId),input,mTags,mRuntimeExtras,mWorkSpec.runAttemptCount,mConfiguration.getExecutor(),mWorkTaskExecutor,mConfiguration.getWorkerFactory(),new WorkProgressUpdater(mWorkDatabase, mWorkTaskExecutor),new WorkForegroundUpdater(mForegroundProcessor, mWorkTaskExecutor));// Not always creating a worker here, as the WorkerWrapper.Builder can set a worker override// in test mode.if (mWorker == null) {mWorker = mConfiguration.getWorkerFactory().createWorkerWithDefaultFallback(mAppContext,mWorkSpec.workerClassName,params);}if (mWorker == null) {Logger.get().error(TAG,String.format("Could not create Worker %s", mWorkSpec.workerClassName));setFailedAndResolve();return;}if (mWorker.isUsed()) {Logger.get().error(TAG,String.format("Received an already-used Worker %s; WorkerFactory should return "+ "new instances",mWorkSpec.workerClassName));setFailedAndResolve();return;}mWorker.setUsed();// Try to set the work to the running state. Note that this may fail because another thread// may have modified the DB since we checked last at the top of this function.if (trySetRunning()) {if (tryCheckForInterruptionAndResolve()) {return;}final SettableFuture<ListenableWorker.Result> future = SettableFuture.create();// Call mWorker.startWork() on the main thread.mWorkTaskExecutor.getMainThreadExecutor().execute(new Runnable() {@Overridepublic void run() {try {Logger.get().debug(TAG, String.format("Starting work for %s",mWorkSpec.workerClassName));// 执行 Worker 中的任务mInnerFuture = mWorker.startWork();future.setFuture(mInnerFuture);} catch (Throwable e) {future.setException(e);}}});// Avoid synthetic accessors.final String workDescription = mWorkDescription;future.addListener(new Runnable() {@Override@SuppressLint("SyntheticAccessor")public void run() {try {// If the ListenableWorker returns a null result treat it as a failure.ListenableWorker.Result result = future.get();if (result == null) {Logger.get().error(TAG, String.format("%s returned a null result. Treating it as a failure.",mWorkSpec.workerClassName));} else {Logger.get().debug(TAG, String.format("%s returned a %s result.",mWorkSpec.workerClassName, result));mResult = result;}} catch (CancellationException exception) {// Cancellations need to be treated with care here because innerFuture// cancellations will bubble up, and we need to gracefully handle that.Logger.get().info(TAG, String.format("%s was cancelled", workDescription),exception);} catch (InterruptedException | ExecutionException exception) {Logger.get().error(TAG,String.format("%s failed because it threw an exception/error",workDescription), exception);} finally {onWorkFinished();}}}, mWorkTaskExecutor.getBackgroundExecutor());} else {resolveIncorrectStatus();}}
最后在 Worker 的 startWork() 中会调用我们重写的 doWork():
@Overridepublic final @NonNull ListenableFuture<Result> startWork() {mFuture = SettableFuture.create();getBackgroundExecutor().execute(new Runnable() {@Overridepublic void run() {try {// 在线程池内执行 doWork() 并获取结果Result result = doWork();mFuture.set(result);} catch (Throwable throwable) {mFuture.setException(throwable);}}});return mFuture;}
2.3 条件执行
假如一个任务需要网络作为约束条件,在将任务入队后还未执行前断开网络,过一阵子后连接网络,该任务会被立即执行。其实现原理如下图:
WorkManager 内部有一个广播接收者 ConstraintProxy,当网络由断开变成连接时,约束条件发生变化,ConstraintProxy 会收到广播:
abstract class ConstraintProxy extends BroadcastReceiver {@Overridepublic void onReceive(Context context, Intent intent) {Intent constraintChangedIntent = CommandHandler.createConstraintsChangedIntent(context);context.startService(constraintChangedIntent);}
}
-----------------------------------------------------------------
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public class CommandHandler implements ExecutionListener {static Intent createConstraintsChangedIntent(@NonNull Context context) {// SystemAlarmService 是系统级别的服务,不会被干掉Intent intent = new Intent(context, SystemAlarmService.class);intent.setAction(ACTION_CONSTRAINTS_CHANGED);return intent;}
}
onReceive() 会带着 ACTION_CONSTRAINTS_CHANGED 启动 SystemAlarmService:
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public class SystemAlarmService extends LifecycleServiceimplements SystemAlarmDispatcher.CommandsCompletedListener {@Overridepublic void onCreate() {super.onCreate();initializeDispatcher();mIsShutdown = false;}@Overridepublic int onStartCommand(Intent intent, int flags, int startId) {super.onStartCommand(intent, flags, startId);if (mIsShutdown) {Logger.get().info(TAG,"Re-initializing SystemAlarmDispatcher after a request to shut-down.");// Destroy the old dispatcher to complete it's lifecycle.mDispatcher.onDestroy();// Create a new dispatcher to setup a new lifecycle.initializeDispatcher();// Set mIsShutdown to false, to correctly accept new commands.mIsShutdown = false;}if (intent != null) {// Intent 添加到 mDispatchermDispatcher.add(intent, startId);}// If the service were to crash, we want all unacknowledged Intents to get redelivered.return Service.START_REDELIVER_INTENT;}
}
在 SystemAlarmDispatcher 中会处理相应的指令:
@MainThreadpublic boolean add(@NonNull final Intent intent, final int startId) {...intent.putExtra(KEY_START_ID, startId);synchronized (mIntents) {boolean hasCommands = !mIntents.isEmpty();mIntents.add(intent);if (!hasCommands) {// Only call processCommand if this is the first command.// The call to dequeueAndCheckForCompletion will process the remaining commands// in the order that they were added.processCommand();}}return true;}
processCommand() 会将处理任务再交给 CommandHandler:
@MainThread@SuppressWarnings("FutureReturnValueIgnored")private void processCommand() {assertMainThread();PowerManager.WakeLock processCommandLock =WakeLocks.newWakeLock(mContext, PROCESS_COMMAND_TAG);try {processCommandLock.acquire();// Process commands on the background thread.mWorkManager.getWorkTaskExecutor().executeOnBackgroundThread(new Runnable() {@Overridepublic void run() {synchronized (mIntents) {mCurrentIntent = mIntents.get(0);}if (mCurrentIntent != null) {final String action = mCurrentIntent.getAction();final int startId = mCurrentIntent.getIntExtra(KEY_START_ID,DEFAULT_START_ID);Logger.get().debug(TAG,String.format("Processing command %s, %s", mCurrentIntent,startId));final PowerManager.WakeLock wakeLock = WakeLocks.newWakeLock(mContext,String.format("%s (%s)", action, startId));try {Logger.get().debug(TAG, String.format("Acquiring operation wake lock (%s) %s",action,wakeLock));wakeLock.acquire();// 专门处理所有 IntentmCommandHandler.onHandleIntent(mCurrentIntent, startId,SystemAlarmDispatcher.this);} catch (Throwable throwable) {Logger.get().error(TAG,"Unexpected error in onHandleIntent",throwable);} finally {Logger.get().debug(TAG,String.format("Releasing operation wake lock (%s) %s",action,wakeLock));wakeLock.release();// Check if we have processed all commandspostOnMainThread(new DequeueAndCheckForCompletion(SystemAlarmDispatcher.this));}}}});} finally {processCommandLock.release();}}
在 CommandHandler 中真正的对 Intent 的 action 做分类处理:
@WorkerThreadvoid onHandleIntent(@NonNull Intent intent,int startId,@NonNull SystemAlarmDispatcher dispatcher) {String action = intent.getAction();if (ACTION_CONSTRAINTS_CHANGED.equals(action)) {handleConstraintsChanged(intent, startId, dispatcher);} else if (ACTION_RESCHEDULE.equals(action)) {handleReschedule(intent, startId, dispatcher);} else {Bundle extras = intent.getExtras();if (!hasKeys(extras, KEY_WORKSPEC_ID)) {Logger.get().error(TAG,String.format("Invalid request for %s, requires %s.",action,KEY_WORKSPEC_ID));} else {if (ACTION_SCHEDULE_WORK.equals(action)) {handleScheduleWorkIntent(intent, startId, dispatcher);} else if (ACTION_DELAY_MET.equals(action)) {handleDelayMet(intent, startId, dispatcher);} else if (ACTION_STOP_WORK.equals(action)) {handleStopWork(intent, dispatcher);} else if (ACTION_EXECUTION_COMPLETED.equals(action)) {handleExecutionCompleted(intent, startId);} else {Logger.get().warning(TAG, String.format("Ignoring intent %s", intent));}}}}
其中第一个 if 判断中的 ACTION_CONSTRAINTS_CHANGED 正是前面传入的,它会进入 handleConstraintsChanged():
private void handleConstraintsChanged(@NonNull Intent intent, int startId,@NonNull SystemAlarmDispatcher dispatcher) {// Constraints changed command handler is synchronous. No cleanup// is necessary.ConstraintsCommandHandler changedCommandHandler =new ConstraintsCommandHandler(mContext, startId, dispatcher);changedCommandHandler.handleConstraintsChanged();}
ChangedCommandHandler 会把 Intent 原有的 Action ACTION_CONSTRAINTS_CHANGED 更换成 ACTION_DELAY_MET:
@WorkerThreadvoid handleConstraintsChanged() {List<WorkSpec> candidates = mDispatcher.getWorkManager().getWorkDatabase().workSpecDao().getScheduledWork();// Update constraint proxy to potentially disable proxies for previously// completed WorkSpecs.ConstraintProxy.updateAll(mContext, candidates);// This needs to be done to populate matching WorkSpec ids in every constraint controller.mWorkConstraintsTracker.replace(candidates);List<WorkSpec> eligibleWorkSpecs = new ArrayList<>(candidates.size());// Filter candidates should have already been scheduled.long now = System.currentTimeMillis();for (WorkSpec workSpec : candidates) {String workSpecId = workSpec.id;long triggerAt = workSpec.calculateNextRunTime();if (now >= triggerAt && (!workSpec.hasConstraints()|| mWorkConstraintsTracker.areAllConstraintsMet(workSpecId))) {eligibleWorkSpecs.add(workSpec);}}for (WorkSpec workSpec : eligibleWorkSpecs) {String workSpecId = workSpec.id;// 更换标记为 ACTION_DELAY_METIntent intent = CommandHandler.createDelayMetIntent(mContext, workSpecId);mDispatcher.postOnMainThread(new SystemAlarmDispatcher.AddRunnable(mDispatcher, intent, mStartId));}mWorkConstraintsTracker.reset();}
CommandHandler 创建一个 Action 为 ACTION_DELAY_MET 的 Intent:
static Intent createDelayMetIntent(@NonNull Context context, @NonNull String workSpecId) {Intent intent = new Intent(context, SystemAlarmService.class);intent.setAction(ACTION_DELAY_MET);intent.putExtra(KEY_WORKSPEC_ID, workSpecId);return intent;}
再回到 CommandHandler 的 onHandleIntent(),刚好有处理 ACTION_DELAY_MET 的情况,会调用 handleDelayMet():
private void handleDelayMet(@NonNull Intent intent,int startId,@NonNull SystemAlarmDispatcher dispatcher) {Bundle extras = intent.getExtras();synchronized (mLock) {String workSpecId = extras.getString(KEY_WORKSPEC_ID);// Check to see if we are already handling an ACTION_DELAY_MET for the WorkSpec.// If we are, then there is nothing for us to do.if (!mPendingDelayMet.containsKey(workSpecId)) {DelayMetCommandHandler delayMetCommandHandler =new DelayMetCommandHandler(mContext, startId, workSpecId, dispatcher);mPendingDelayMet.put(workSpecId, delayMetCommandHandler);delayMetCommandHandler.handleProcessWork();} else {// log...}}}
在 DelayMetCommandHandler 的 handleProcessWork() 中,会调用 Processor 的 startWork() 最终执行到我们重写的 Worker 的 doWork():
@WorkerThreadvoid handleProcessWork() {...if (!mHasConstraints) {onAllConstraintsMet(Collections.singletonList(mWorkSpecId));} else {// Allow tracker to report constraint changesmWorkConstraintsTracker.replace(Collections.singletonList(workSpec));}}@Overridepublic void onAllConstraintsMet(@NonNull List<String> workSpecIds) {// WorkConstraintsTracker will call onAllConstraintsMet with list of workSpecs whose// constraints are met. Ensure the workSpecId we are interested is part of the list// before we call Processor#startWork().if (!workSpecIds.contains(mWorkSpecId)) {return;}synchronized (mLock) {if (mCurrentState == STATE_INITIAL) {mCurrentState = STATE_START_REQUESTED;// 调用 Processor 的 startWork()// Constraints met, schedule execution// Not using WorkManagerImpl#startWork() here because we need to know if the// processor actually enqueued the work here.boolean isEnqueued = mDispatcher.getProcessor().startWork(mWorkSpecId);if (isEnqueued) {// setup timers to enforce quotas on workers that have// been enqueuedmDispatcher.getWorkTimer().startTimer(mWorkSpecId, WORK_PROCESSING_TIME_IN_MS, this);} else {// if we did not actually enqueue the work, it was enqueued before// cleanUp and pretend this never happened.cleanUp();}} else {Logger.get().debug(TAG, String.format("Already started work for %s", mWorkSpecId));}}}
参考资料:
使用 WorkManager 调度任务
WorkManager