鸿蒙开发-UI-交互事件-键鼠事件
鸿蒙开发-UI-交互事件-焦点事件
鸿蒙开发-UI-交互事件-手势事件
鸿蒙开发-UI-web
鸿蒙开发-UI-web-页面
鸿蒙开发-ArkTS语言-基础类库
鸿蒙开发-ArkTS语言-并发
文章目录
前言
一、CPU密集型任务
1. 使用TaskPool进行图像直方图处理
2. 使用worker进行长时间数据分析
二、IO密集型任务
三、同步任务开发
1. 使用TaskPool处理同步任务
2. 使用worker处理关联的同步任务
总结
前言
上文详细学习ArkTS语言并发异步并发开发和多线程并发开发两种并发场景,了解了两种并发场景的区别,以及开发方法。同时详细学习了多线程并发开发的两种方式,以及各个使用场景。本文将学习多线程并发开发的示例分析
一、CPU密集型任务
CPU密集型任务是指需要占用系统资源处理大量计算能力的任务,需要长时间运行,这段时间会阻塞线程其它事件的处理,不适宜放在主线程进行。例如图像处理、视频编码、数据分析等。基于多线程并发机制处理CPU密集型任务可以提高CPU利用率,提升应用程序响应速度。当进行一系列同步任务时,推荐使用Worker;而进行大量或调度点较为分散的独立任务时,不方便使用8个Worker去做负载管理,推荐采用TaskPool
1. 使用TaskPool进行图像直方图处理
处理逻辑
代码示例
import taskpool from '@ohos.taskpool';//step1: 定义具体的图像处理操作及其他耗时操作,在使用TaskPool时,执行的并发函数需要使用@Concurrent装饰器修饰,否则无法通过相关校验@Concurrent
function imageProcessing(dataSlice: ArrayBuffer) {return dataSlice;
}function histogramStatistic(pixelBuffer: ArrayBuffer) {
// step2: 将要处理数据分成三段let number = pixelBuffer.byteLength / 3;let buffer1 = pixelBuffer.slice(0, number);let buffer2 = pixelBuffer.slice(number, number * 2);let buffer3 = pixelBuffer.slice(number * 2);
// step3: 定义每段数据处理的Tasklet task1 = new taskpool.Task(imageProcessing, buffer1);let task2 = new taskpool.Task(imageProcessing, buffer2);let task3 = new taskpool.Task(imageProcessing, buffer3);taskpool.execute(task1).then((ret: ArrayBuffer[]) => {
// step4.1: 定义第一段数据完成后的结果出来});taskpool.execute(task2).then((ret: ArrayBuffer[]) => {
// step4.2: 定义第二段数据完成后的结果出来});taskpool.execute(task3).then((ret: ArrayBuffer[]) => {
// step4.3: 定义第三段数据完成后的结果出来});
}@Entry
@Component
struct Index {@State message: string = 'Hello World'build() {Row() {Column() {Text(this.message).fontSize(50).fontWeight(FontWeight.Bold).onClick(() => {let data: ArrayBuffer;histogramStatistic(data);})}.width('100%')}.height('100%')}
}
2. 使用worker进行长时间数据分析
使用步骤
2.1 DevEco Studio提供了Worker创建的模板,新建一个Worker线程,例如命名为“MyWorker”
2. 在主线程ThreadMaster中调用ThreadWorker的constructor()方法创建Worker对象,ThreadMaster线程为宿主线程
import worker from '@ohos.worker';
const workerInstance = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts');
3. ThreadMaster线程中通过调用onmessage()方法接收Worker线程发送过来的消息,并通过调用postMessage()方法向Worker线程发送消息
// 接收Worker子线程的结果
workerInstance.onmessage = function(e) {// data:Worker线程发送的信息let data = e.data;console.info('MyWorker.ts onmessage');
}workerInstance.onerror = function (d) {// 接收Worker子线程的错误信息
}// 向Worker子线程发送训练消息
workerInstance.postMessage({ 'type': 0 });
// 向Worker子线程发送预测消息
workerInstance.postMessage({ 'type': 1, 'value': [90, 5] });
4. 在MyWorker.ts文件中绑定Worker对象,当前线程为Worker线程
import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker';
let workerPort: ThreadWorkerGlobalScope = worker.workerPort;
5. 在Worker线程中通过调用onmessage()方法接收宿主线程ThreadMaster发送的消息内容,并通过调用postMessage()方法向宿主线程发送消息
import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker';let workerPort: ThreadWorkerGlobalScope = worker.workerPort;// 定义训练模型及结果
let result;// 定义预测函数
function predict(x) {return result[x];
}// 定义优化器训练过程
function optimize() {result = {};
}// Worker线程的onmessage逻辑
workerPort.onmessage = function (e: MessageEvents) {let data = e.data// 根据传输的数据的type选择进行操作switch (data.type) {case 0:// 进行训练optimize();// 训练之后发送主线程训练成功的消息workerPort.postMessage({ type: 'message', value: 'train success.' });break;case 1:// 执行预测const output = predict(data.value);// 发送主线程预测的结果workerPort.postMessage({ type: 'predict', value: output });break;default:workerPort.postMessage({ type: 'message', value: 'send message is invalid' });break;}
}
6. 在Worker线程中完成任务之后,执行Worker线程销毁操作
6.1 在宿主线程中通过调用onexit()方法定义Worker线程销毁后的处理逻辑
// 宿主线程ThreadMaster中定义Worker线程销毁后,执行onexit回调方法
workerInstance.onexit = function() {console.info("main thread terminate");
}
6.2 Worker线程销毁的方式
方式一:宿主线程中通过调用terminate()方法销毁Worker线程,并终止Worker接收消息
// 销毁Worker线程
workerInstance.terminate();
方式二:Worker线程中通过调用close()方法主动销毁Worker线程,并终止Worker接收消息
// 销毁Worker线程
workerInstance.terminate();
二、IO密集型任务
I/O密集型任务的性能重点通常不在于CPU的处理能力,而在于I/O操作的速度和效率。这种任务通常需要频繁地进行磁盘读写、网络通信等操作。使用异步并发可以解决单次I/O任务阻塞的问题,但是如果遇到I/O密集型任务,同样会阻塞线程中其它任务的执行,这时需要使用多线程并发能力来进行解决。
示例:频繁读写系统文件来模拟I/O密集型并发任务的处理
import fs from '@ohos.file.fs';
import taskpool from '@ohos.taskpool';//step1 定义并发函数,密集执行I/O 操作
@Concurrent
async function concurrentTest(fileList: string[]) {// 写入文件的实现async function write(data, filePath) {let file = await fs.open(filePath, fs.OpenMode.READ_WRITE);await fs.write(file.fd, data);fs.close(file);}// 循环写文件操作for (let i = 0; i < fileList.length; i++) {write('Hello World!', fileList[i]).then(() => {console.info(`Succeeded in writing the file. FileList: ${fileList[i]}`);}).catch((err) => {console.error(`Failed to write the file. Code is ${err.code}, message is ${err.message}`)return false;})}return true;
}//step2 定义并concurrentFileOper函数,taskpool执行包含密集I/O的并发函数
function concurrentFileOper() {let filePath1 = ...; // 应用文件路径let filePath2 = ...;// 使用TaskPool执行包含密集I/O的并发函数// 数组较大时,I/O密集型任务任务分发也会抢占主线程,需要使用多线程能力taskpool.execute(concurrentTest, [filePath1, filePath2]).then((ret) => {// 调度结果处理console.info(`The result: ${ret}`);})
}@Entry
@Component
struct Index {@State message: string = 'Hello World'build() {Row() {Column() {Text(this.message).fontSize(50).fontWeight(FontWeight.Bold).onClick(() => {concurrentFileOper();})}.width('100%')}.height('100%')}
}
三、同步任务开发
同步任务是指在多个线程之间协调执行的任务,其目的是确保多个任务按照一定的顺序和规则执行,例如使用锁来防止数据竞争。同步任务的实现需要考虑多个线程之间的协作和同步,以确保数据的正确性和程序的正确执行。由于TaskPool偏向于单个独立的任务,因此当各个同步任务之间相对独立时推荐使用TaskPool,例如一系列导入的静态方法,或者单例实现的方法。如果同步任务之间有关联性,则需要使用Worker,例如无法单例创建的类对象实现的方法
1. 使用TaskPool处理同步任务
当调度独立的同步任务,或者一系列同步任务为静态方法实现,或者可以通过单例构造唯一的句柄或类对象,可在不同任务池之间使用时,推荐使用TaskPool
//step1:定义单实例类 Handle.ts
export default class Handle {static getInstance() {// 返回单例对象}static syncGet() {// 同步Get方法return;}static syncSet(num: number) {// 同步Set方法return;}
}//step2:定义Index.ets,使用TaskPool调用相关同步方法的代码import taskpool from '@ohos.taskpool';
import Handle from './Handle'; //step2.1: 定义并发函数,内部调用同步方法
@Concurrent
function func(num: number) {// 调用静态类对象中实现的同步等待调用Handle.syncSet(num);// 或者调用单例对象中实现的同步等待调用Handle.getInstance().syncGet();return true;
}//step2.2: 创建任务并执行
async function asyncGet() {// 创建task并传入函数funclet task = new taskpool.Task(func, 1);// 执行task任务,获取结果reslet res = await taskpool.execute(task);// 对同步逻辑后的结果进行操作console.info(String(res));
}@Entry
@Component
struct Index {@State message: string = 'Hello World';build() {Row() {Column() {Text(this.message).fontSize(50).fontWeight(FontWeight.Bold).onClick(() => {// 步骤3: 执行并发操作asyncGet();})}.width('100%').height('100%')}}
}
2. 使用worker处理关联的同步任务
当一系列同步任务需要使用同一个句柄调度,或者需要依赖某个类对象调度,无法在不同任务池之间共享时,需要使用Worker
2.1 在主线程中创建Worker对象,同时接收Worker线程发送回来的消息
import worker from '@ohos.worker';@Entry
@Component
struct Index {@State message: string = 'Hello World';build() {Row() {Column() {Text(this.message).fontSize(50).fontWeight(FontWeight.Bold).onClick(() => {let w = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts');w.onmessage = function (d) {// 接收Worker子线程的结果}w.onerror = function (d) {// 接收Worker子线程的错误信息}// 向Worker子线程发送Set消息w.postMessage({'type': 0, 'data': 'data'})// 向Worker子线程发送Get消息w.postMessage({'type': 1})// ...// 根据实际业务,选择时机以销毁线程w.terminate()})}.width('100%')}.height('100%')}
}
2.2 在Worker线程中绑定Worker对象,同时处理同步任务逻辑
//step1:定义单实例类 Handle.ts
export default class Handle {syncGet() {return;}syncSet(num: number) {return;}
}//step1:定义MyWorker.ts绑定worker对象
import worker, { ThreadWorkerGlobalScope, MessageEvents } from '@ohos.worker';
import Handle from './handle.ts' // 返回句柄var workerPort : ThreadWorkerGlobalScope = worker.workerPort;// 无法传输的句柄,所有操作依赖此句柄
var handler = new Handle()// Worker线程的onmessage逻辑
workerPort.onmessage = function(e : MessageEvents) {switch (e.data.type) {case 0:handler.syncSet(e.data.data);workerPort.postMessage('success set');case 1:handler.syncGet();workerPort.postMessage('success get');}
}
总结
本文详细学习了鸿蒙开发使用多线程并发的开发方式,针对CPU密集、IO密集以及同步任务开发场景做了一些开发说明,下文将鸿蒙开发ArkTS语言容器类库相关知识