使用多线程并发能力进行并发
使用多线程并发能力进行并发
并发概述
并发是指在同一时间段内,能够处理多个任务的能力。为了提升应用的响应速度与帧率,以及防止耗时任务对主线程的干扰,HarmonyOS系统提供了异步并发和多线程并发两种处理策略。
- 异步并发是指异步代码在执行到一定程度后会被暂停,以便在未来某个时间点继续执行,这种情况下,同一时间只有一段代码在执行。
- 多线程并发允许在同一时间段内同时执行多段代码。在主线程继续响应用户操作和更新UI的同时,后台也能执行耗时操作,从而避免应用出现卡顿。
并发能力在多种场景中都有应用,其中包括单次I/O任务、CPU密集型任务、I/O密集型任务和同步任务等。开发者可以根据不同的场景,选择相应的并发策略进行优化和开发。
ArkTS支持异步并发和多线程并发。
- Promise和async/await提供异步并发能力,适用于单次I/O任务的开发场景。详细请参见异步并发概述。
- TaskPool和Worker提供多线程并发能力,适用于CPU密集型任务、I/O密集型任务和同步任务等并发场景。详细请参见多线程并发概述。
多线程并发概述
简介
并发模型是用来实现不同应用场景中并发任务的编程模型,常见的并发模型分为基于内存共享的并发模型和基于消息通信的并发模型。
Actor并发模型作为基于消息通信并发模型的典型代表,不需要开发者去面对锁带来的一系列复杂偶发的问题,同时并发度也相对较高,因此得到了广泛的支持和使用,也是当前ArkTS语言选择的并发模型。
由于Actor模型的内存隔离特性,所以需要进行跨线程的数据序列化传输。
数据传输对象
目前支持传输的数据对象可以分为普通对象、可转移对象、可共享对象、Native绑定对象四种。
普通对象
普通对象传输采用标准的结构化克隆算法(Structured Clone)进行序列化,此算法可以通过递归的方式拷贝传输对象,相较于其他序列化的算法,支持的对象类型更加丰富。
序列化支持的类型包括:除Symbol之外的基础类型、Date、String、RegExp、Array、Map、Set、Object(仅限简单对象,比如通过“{}”或者“new Object”创建,普通对象仅支持传递属性,不支持传递其原型及方法)、ArrayBuffer、TypedArray。
可转移对象
可转移对象(Transferable object)传输采用地址转移进行序列化,不需要内容拷贝,会将ArrayBuffer的所有权转移给接收该ArrayBuffer的线程,转移后该ArrayBuffer在发送它的线程中变为不可用,不允许再访问。
// 定义可转移对象
let buffer = new ArrayBuffer(100);
可共享对象
共享对象SharedArrayBuffer,拥有固定长度,可以存储任何类型的数据,包括数字、字符串等。
共享对象传输指SharedArrayBuffer支持在多线程之间传递,传递之后的SharedArrayBuffer对象和原始的SharedArrayBuffer对象可以指向同一块内存,进而达到内存共享的目的。
SharedArrayBuffer对象存储的数据在同时被修改时,需要通过原子操作保证其同步性,即下个操作开始之前务必需要等到上个操作已经结束。
// 定义可共享对象,可以使用Atomics进行操作
let sharedBuffer = new SharedArrayBuffer(1024);
Native绑定对象
Native绑定对象(Native Binding Object)是系统所提供的对象,该对象与底层系统功能进行绑定,提供直接访问底层系统功能的能力。
当前支持序列化传输的Native绑定对象主要包含:Context和RemoteObject。
Context对象包含应用程序组件的上下文信息,它提供了一种访问系统服务和资源的方式,使得应用程序组件可以与系统进行交互。获取Context信息的方法可以参考获取上下文信息。
RemoteObject对象的主要作用是实现远程通信的功能,它允许在不同的进程间传递对象的引用,使得不同进程之间可以共享对象的状态和方法,服务提供者必须继承此类,RemoteObject对象的创建可以参考RemoteObject的实现。
TaskPool和Worker
ArkTS提供了TaskPool和Worker两种并发能力供开发者选择,其具体的实现特点和各自的适用场景存在差异,详细请参见TaskPool和Worker的对比。
TaskPool和Worker的对比
TaskPool(任务池)和Worker的作用是为应用程序提供一个多线程的运行环境,用于处理耗时的计算任务或其他密集型任务。可以有效地避免这些任务阻塞主线程,从而最大化系统的利用率,降低整体资源消耗,并提高系统的整体性能。
本文将从实现特点和适用场景两个方面来进行TaskPool与Worker的比较,同时提供了各自运作机制和注意事项的相关说明。
实现特点对比
TaskPool和Worker的实现特点对比
实现 | TaskPool | Worker |
---|---|---|
内存模型 | 线程间隔离,内存不共享。 | 线程间隔离,内存不共享。 |
参数传递机制 | 采用标准的结构化克隆算法(Structured Clone)进行序列化、反序列化,完成参数传递。支持ArrayBuffer转移和SharedArrayBuffer共享。 | 采用标准的结构化克隆算法(Structured Clone)进行序列化、反序列化,完成参数传递。支持ArrayBuffer转移和SharedArrayBuffer共享。 |
参数传递 | 直接传递,无需封装,默认进行transfer。 | 消息对象唯一参数,需要自己封装。 |
方法调用 | 直接将方法传入调用。 | 在Worker线程中进行消息解析并调用对应方法。 |
返回值 | 异步调用后默认返回。 | 主动发送消息,需在onmessage解析赋值。 |
生命周期 | TaskPool自行管理生命周期,无需关心任务负载高低。 | 开发者自行管理Worker的数量及生命周期。 |
任务池个数上限 | 自动管理,无需配置。 | 最多开启8个Worker。 |
任务执行时长上限 | 无限制。 | 无限制。 |
设置任务的优先级 | 不支持。 | 不支持。 |
执行任务的取消 | 支持取消任务队列中等待的任务。 | 不支持。 |
适用场景对比
TaskPool和Worker均支持多线程并发能力。TaskPool偏向独立任务(线程级)维度;而Worker偏向线程的维度,支持长时间占据线程执行。
常见的一些开发场景及适用具体说明如下:
- 有关联的一系列同步任务。例如某数据库操作时,要用创建的句柄操作,包含增、删、改、查多个任务,要保证同一个句柄,需要使用Worker。
- 需要频繁取消的任务。例如图库大图浏览场景,为提升体验,会同时缓存当前图片左右侧各2张图片,往一侧滑动跳到下一张图片时,要取消另一侧的一个缓存任务,需要使用TaskPool。
- 大量或者调度点较分散的任务。例如大型应用的多个模块包含多个耗时任务,不方便使用8个Worker去做负载管理,推荐采用TaskPool。
TaskPool运作机制
TaskPool运作机制示意图

TaskPool支持开发者在主线程封装任务抛给任务队列,系统选择合适的工作线程,进行任务的分发及执行,再将结果返回给主线程。接口直观易用,支持任务的执行、取消。工作线程数量上限为4。
Worker运作机制
Worker运作机制示意图

创建Worker的线程称为宿主线程(不一定是主线程,工作线程也支持创建Worker子线程),Worker自身的线程称为Worker子线程(或Actor线程、工作线程)。每个Worker子线程与宿主线程拥有独立的实例,包含基础设施、对象、代码段等。Worker子线程和宿主线程之间的通信是基于消息传递的,Worker通过序列化机制与宿主线程之间相互通信,完成命令及数据交互。
TaskPool注意事项
- 实现任务的函数需要使用装饰器@Concurrent标注,且仅支持在.ets文件中使用。
- 实现任务的函数只支持普通函数或者async函数,不支持类成员函数或者匿名函数。
- 实现任务的函数仅支持在Stage模型的工程中使用import的变量和入参变量,否则只能使用入参变量。
- 实现任务的函数入参需满足序列化支持的类型,详情请参见普通对象传输。
- 由于不同线程中上下文对象是不同的,因此TaskPool工作线程只能使用线程安全的库,例如UI相关的非线程安全库不能使用。
- 序列化传输的数据量大小限制为16MB。
Worker注意事项
- 创建Worker时,传入的Worker.ts路径在不同版本有不同的规则,详情请参见文件路径注意事项。
- Worker创建后需要手动管理生命周期,且最多同时运行的Worker子线程数量为8个,详情请参见生命周期注意事项。
- Ability类型的Module支持使用Worker,Library类型的Module不支持使用Worker。
- 创建Worker不支持使用其他Module的Worker.ts文件,即不支持跨模块调用Worker。
- 由于不同线程中上下文对象是不同的,因此Worker线程只能使用线程安全的库,例如UI相关的非线程安全库不能使用。
- 序列化传输的数据量大小限制为16MB。
文件路径注意事项
当使用Worker模块具体功能时,均需先构造Worker实例对象,其构造函数与API版本相关。
// 导入模块
import worker from '@ohos.worker';
// API 9及之后版本使用:
const worker1 = new worker.ThreadWorker(scriptURL);
// API 8及之前版本使用:
const worker1 = new worker.Worker(scriptURL);
构造函数需要传入Worker的路径(scriptURL),Worker文件存放位置默认路径为Worker文件所在目录与pages目录属于同级。
Stage模型
构造函数中的scriptURL示例如下:
// 导入模块
import worker from '@ohos.worker';
// 写法一
// Stage模型-目录同级(entry模块下,workers目录与pages目录同级)
const worker1 = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts', {name:"first worker in Stage model"});
// Stage模型-目录不同级(entry模块下,workers目录是pages目录的子目录)
const worker2 = new worker.ThreadWorker('entry/ets/pages/workers/MyWorker.ts');
// 写法二
// Stage模型-目录同级(entry模块下,workers目录与pages目录同级),假设bundlename是com.example.workerdemo
const worker3 = new worker.ThreadWorker('@bundle:com.example.workerdemo/entry/ets/workers/worker');
// Stage模型-目录不同级(entry模块下,workers目录是pages目录的子目录),假设bundlename是com.example.workerdemo
const worker4 = new worker.ThreadWorker('@bundle:com.example.workerdemo/entry/ets/pages/workers/worker');
- 基于Stage模型工程目录结构,写法一的路径含义:
- entry:module.json5文件中module的name属性对应值。
- ets:用于存放ets源码,固定目录。
- workers/MyWorker.ts:worker源文件在ets目录下的路径。
- 基于Stage模型工程目录结构,写法二的路径含义:
- @bundle:固定标签。
- bundlename:当前应用包名。
- entryname:module.json5文件中module的name属性对应值。
- ets:用于存放ets源码,固定目录。
- workerdir/workerfile:worker源文件在ets目录下的路径,可不带文件后缀名。
生命周期注意事项
Worker的创建和销毁耗费性能,建议开发者合理管理已创建的Worker并重复使用。Worker空闲时也会一直运行,因此当不需要Worker时,可以调用terminate()接口或parentPort.close()方法主动销毁Worker。若Worker处于已销毁或正在销毁等非运行状态时,调用其功能接口,会抛出相应的错误。
Worker存在数量限制,支持最多同时存在8个Worker。
在API version 8及之前的版本,当Worker数量超出限制时,会抛出“Too many workers, the number of workers exceeds the maximum.”错误。 从API version 9开始,当Worker数量超出限制时,会抛出“Worker initialization failure, the number of workers exceeds the maximum.”错误。
CPU密集型任务开发指导
CPU密集型任务是指需要占用系统资源处理大量计算能力的任务,需要长时间运行,这段时间会阻塞线程其它事件的处理,不适宜放在主线程进行。例如图像处理、视频编码、数据分析等。
基于多线程并发机制处理CPU密集型任务可以提高CPU利用率,提升应用程序响应速度。
当进行一系列同步任务时,推荐使用Worker;而进行大量或调度点较为分散的独立任务时,不方便使用8个Worker去做负载管理,推荐采用TaskPool。接下来将以图像直方图处理以及后台长时间的模型预测任务分别进行举例。
使用TaskPool进行图像直方图处理
1、实现图像处理的业务逻辑。 2、数据分段,将各段数据通过不同任务的执行完成图像处理。 创建Task,通过execute()执行任务,在当前任务结束后,会将直方图处理结果同时返回。 3、结果数组汇总处理。
import taskpool from '@ohos.taskpool';
@Concurrent
function imageProcessing(dataSlice: ArrayBuffer) {
// 步骤1: 具体的图像处理操作及其他耗时操作
return dataSlice;
}
function histogramStatistic(pixelBuffer: ArrayBuffer) {
// 步骤2: 分成三段并发调度
let number = pixelBuffer.byteLength / 3;
let buffer1 = pixelBuffer.slice(0, number);
let buffer2 = pixelBuffer.slice(number, number * 2);
let buffer3 = pixelBuffer.slice(number * 2);
let task1 = new taskpool.Task(imageProcessing, buffer1);
let task2 = new taskpool.Task(imageProcessing, buffer2);
let task3 = new taskpool.Task(imageProcessing, buffer3);
taskpool.execute(task1).then((ret: ArrayBuffer[]) => {
// 步骤3: 结果处理
});
taskpool.execute(task2).then((ret: ArrayBuffer[]) => {
// 步骤3: 结果处理
});
taskpool.execute(task3).then((ret: ArrayBuffer[]) => {
// 步骤3: 结果处理
});
}
@Entry
@Component
struct Index {
@State message: string = 'Hello World'
build() {
Row() {
Column() {
Text(this.message)
.fontSize(50)
.fontWeight(FontWeight.Bold)
.onClick(() => {
let data: ArrayBuffer;
histogramStatistic(data);
})
}
.width('100%')
}
.height('100%')
}
}
使用Worker进行长时间数据分析
本文通过某地区提供的房价数据训练一个简易的房价预测模型,该模型支持通过输入房屋面积和房间数量去预测该区域的房价,模型需要长时间运行,房价预测需要使用前面的模型运行结果,因此需要使用Worker。
1、DevEco Studio提供了Worker创建的模板,新建一个Worker线程,例如命名为“MyWorker”

2、在主线程中通过调用ThreadWorker的constructor()方法创建Worker对象,当前线程为宿主线程。
import worker from '@ohos.worker';
const workerInstance = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts');
3、在宿主线程中通过调用onmessage()方法接收Worker线程发送过来的消息,并通过调用postMessage()方法向Worker线程发送消息。
例如向Worker线程发送训练和预测的消息,同时接收Worker线程发送回来的消息。
// 接收Worker子线程的结果
workerInstance.onmessage = function(e) {
// data:主线程发送的信息
let data = e.data;
console.info('MyWorker.ts onmessage');
// 在Worker线程中进行耗时操作
}
workerInstance.onerror = function (d) {
// 接收Worker子线程的错误信息
}
// 向Worker子线程发送训练消息
workerInstance.postMessage({ 'type': 0 });
// 向Worker子线程发送预测消息
workerInstance.postMessage({ 'type': 1, 'value': [90, 5] });
4、在MyWorker.ts文件中绑定Worker对象,当前线程为Worker线程。
import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker';
let workerPort: ThreadWorkerGlobalScope = worker.workerPort;
5、在Worker线程中通过调用onmessage()方法接收宿主线程发送的消息内容,并通过调用postMessage()方法向宿主线程发送消息。
例如在Worker线程中定义预测模型及其训练过程,同时与主线程进行信息交互。
import worker, { ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@ohos.worker';
let workerPort: ThreadWorkerGlobalScope = worker.workerPort;
// 定义训练模型及结果
let result;
// 定义预测函数
function predict(x) {
return result[x];
}
// 定义优化器训练过程
function optimize() {
result = {};
}
// Worker线程的onmessage逻辑
workerPort.onmessage = function (e: MessageEvents) {
let data = e.data
// 根据传输的数据的type选择进行操作
switch (data.type) {
case 0:
// 进行训练
optimize();
// 训练之后发送主线程训练成功的消息
workerPort.postMessage({ type: 'message', value: 'train success.' });
break;
case 1:
// 执行预测
const output = predict(data.value);
// 发送主线程预测的结果
workerPort.postMessage({ type: 'predict', value: output });
break;
default:
workerPort.postMessage({ type: 'message', value: 'send message is invalid' });
break;
}
}
6、在Worker线程中完成任务之后,执行Worker线程销毁操作。销毁线程的方式主要有两种:根据需要可以在宿主线程中对Worker线程进行销毁;也可以在Worker线程中主动销毁Worker线程。
在宿主线程中通过调用onexit()方法定义Worker线程销毁后的处理逻辑。
// Worker线程销毁后,执行onexit回调方法
workerInstance.onexit = function() {
console.info("main thread terminate");
}
方式一:在宿主线程中通过调用terminate()方法销毁Worker线程,并终止Worker接收消息。
// 销毁Worker线程
workerInstance.terminate();
方式二:在Worker线程中通过调用close()方法主动销毁Worker线程,并终止Worker接收消息。
// 销毁线程
workerPort.close();
I/O密集型任务开发指导
使用异步并发可以解决单次I/O任务阻塞的问题,但是如果遇到I/O密集型任务,同样会阻塞线程中其它任务的执行,这时需要使用多线程并发能力来进行解决。
I/O密集型任务的性能重点通常不在于CPU的处理能力,而在于I/O操作的速度和效率。这种任务通常需要频繁地进行磁盘读写、网络通信等操作。此处以频繁读写系统文件来模拟I/O密集型并发任务的处理。
1、定义并发函数,内部密集调用I/O能力。
import fs from '@ohos.file.fs';
// 定义并发函数,内部密集调用I/O能力
@Concurrent
async function concurrentTest(fileList: string[]) {
// 写入文件的实现
async function write(data, filePath) {
let file = await fs.open(filePath, fs.OpenMode.READ_WRITE);
await fs.write(file.fd, data);
fs.close(file);
}
// 循环写文件操作
for (let i = 0; i < fileList.length; i++) {
write('Hello World!', fileList[i]).then(() => {
console.info(`Succeeded in writing the file. FileList: ${fileList[i]}`);
}).catch((err) => {
console.error(`Failed to write the file. Code is ${err.code}, message is ${err.message}`)
return false;
})
}
return true;
}
2、使用TaskPool执行包含密集I/O的并发函数:通过调用execute()方法执行任务,并在回调中进行调度结果处理。示例中的filePath1和filePath2的获取方式请参见获取应用文件路径。
import taskpool from '@ohos.taskpool';
let filePath1 = ...; // 应用文件路径
let filePath2 = ...;
// 使用TaskPool执行包含密集I/O的并发函数
// 数组较大时,I/O密集型任务任务分发也会抢占主线程,需要使用多线程能力
taskpool.execute(concurrentTest, [filePath1, filePath2]).then((ret) => {
// 调度结果处理
console.info(`The result: ${ret}`);
})
同步任务开发指导
同步任务是指在多个线程之间协调执行的任务,其目的是确保多个任务按照一定的顺序和规则执行,例如使用锁来防止数据竞争。
同步任务的实现需要考虑多个线程之间的协作和同步,以确保数据的正确性和程序的正确执行。
由于TaskPool偏向于单个独立的任务,因此当各个同步任务之间相对独立时推荐使用TaskPool,例如一系列导入的静态方法,或者单例实现的方法。如果同步任务之间有关联性,则需要使用Worker,例如无法单例创建的类对象实现的方法。
使用TaskPool处理同步任务
当调度独立的同步任务,或者一系列同步任务为静态方法实现,或者可以通过单例构造唯一的句柄或类对象,可在不同任务池之间使用时,推荐使用TaskPool。
1、定义并发函数,内部调用同步方法。 2、创建任务,并通过TaskPool执行,再对异步结果进行操作。创建Task,通过execute()执行同步任务。
模拟一个包含同步调用的单实例类。
// Handle.ts 代码
export default class Handle {
static getInstance() {
// 返回单例对象
}
static syncGet() {
// 同步Get方法
return;
}
static syncSet(num: number) {
// 同步Set方法
return;
}
}
业务使用TaskPool调用相关同步方法的代码。
// Index.ets代码
import taskpool from '@ohos.taskpool';
import Handle from './Handle'; // 返回静态句柄
// 步骤1: 定义并发函数,内部调用同步方法
@Concurrent
function func(num: number) {
// 调用静态类对象中实现的同步等待调用
Handle.syncSet(num);
// 或者调用单例对象中实现的同步等待调用
Handle.getInstance().syncGet();
return true;
}
// 步骤2: 创建任务并执行
async function asyncGet() {
// 创建task并传入函数func
let task = new taskpool.Task(func, 1);
// 执行task任务,获取结果res
let res = await taskpool.execute(task);
// 对同步逻辑后的结果进行操作
console.info(String(res));
}
@Entry
@Component
struct Index {
@State message: string = 'Hello World';
build() {
Row() {
Column() {
Text(this.message)
.fontSize(50)
.fontWeight(FontWeight.Bold)
.onClick(() => {
// 步骤3: 执行并发操作
asyncGet();
})
}
.width('100%')
.height('100%')
}
}
}
使用Worker处理关联的同步任务 当一系列同步任务需要使用同一个句柄调度,或者需要依赖某个类对象调度,无法在不同任务池之间共享时,需要使用Worker。 1、在主线程中创建Worker对象,同时接收Worker线程发送回来的消息。
import worker from '@ohos.worker';
@Entry
@Component
struct Index {
@State message: string = 'Hello World';
build() {
Row() {
Column() {
Text(this.message)
.fontSize(50)
.fontWeight(FontWeight.Bold)
.onClick(() => {
let w = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts');
w.onmessage = function (d) {
// 接收Worker子线程的结果
}
w.onerror = function (d) {
// 接收Worker子线程的错误信息
}
// 向Worker子线程发送Set消息
w.postMessage({'type': 0, 'data': 'data'})
// 向Worker子线程发送Get消息
w.postMessage({'type': 1})
// 销毁线程
w.terminate()
})
}
.width('100%')
}
.height('100%')
}
}
2、在Worker线程中绑定Worker对象,同时处理同步任务逻辑。
// handle.ts代码
export default class Handle {
syncGet() {
return;
}
syncSet(num: number) {
return;
}
}
// Worker.ts代码
import worker, { ThreadWorkerGlobalScope, MessageEvents } from '@ohos.worker';
import Handle from './handle.ts' // 返回句柄
var workerPort : ThreadWorkerGlobalScope = worker.workerPort;
// 无法传输的句柄,所有操作依赖此句柄
var handler = new Handle()
// Worker线程的onmessage逻辑
workerPort.onmessage = function(e : MessageEvents) {
switch (e.data.type) {
case 0:
handler.syncSet(e.data.data);
workerPort.postMessage('success set');
case 1:
handler.syncGet();
workerPort.postMessage('success get');
}
}