piscina 是 Node.js 中提供 工作(进程,线程)池 的工具。
特点:
- 线程间的快速通信
- 覆盖固定任务和可变任务场景
- 固定任务:一个线程执行指定的任务
- 可变任务:由调度线程分配任务
- 支持灵活设置工作池大小
- 提供异步追踪
- 运行和等待时长统计
- 支持取消任务
- 支持限制内存资源使用
- 支持 CommonJS、ESM 和 TypeScript
- 自定义任务队列
- 设置 CPU 调度优先级(Linux)
piscina 是用 TypeScript 写的,需要 Node.js 12.x 及以上版本。
example
分配任务并收集任务结果的是 主线程(main),执行任务并返回结果的是 工作线程(worker)。
每一个工作线程都会执行一个在单独文件中导出的函数,并将该函数的返回值返回给主线程。
API
new Piscina
// https://github.com/piscinajs/piscina#constructor-new-piscinaoptions
new Piscina(options?: Options | undefined): Piscina
支持的选项:
filename: string | null 指定一个文件路径,该文件需要默认导出一个函数(同步或者异步)供工作线程执行name: string | null 默认为'default',表示使用上述文件的默认导出函数,可以改为其他名称,从而使用其他非默认导出的函数minThreads: number 线程池运行的最小线程数,默认根据可用的 CPU 进行设置maxThreads: number 线程池运行的最大线程数,默认根据可用的 CPU 进行设置idleTimeout: number 单位 毫秒,表示线程不执行任务时可以继续等待多长时间,超过这个时间空闲线程将关闭。默认情况下线程执行完任务后会立即关闭,默认设置在执行 任务量多、单个任务耗时短 的任务集合时,会损失一定的性能,因为会频繁的关闭和开启线程。这种情况下,显式设置该参数,避免线程频繁被关闭和启动。maxQueue: number | string 设置 可能 运行的最大任务数量(并不等于实际同时运行的任务数量,因为线程可能不够用)。默认情况下,这个数量不做限制。可以指定为'auto',此时 piscina 将其设置为maxThreads参数值的平方,这个值可以通过options.maxQueue获取到concurrentTasksPerWorker: number 默认值为 1,表示一个工作线程可以同时被多少个任务共享,这在任务中包含异步步骤时可能有用,虽然工作线程可以执行异步任务,然而我们需要时刻牢记:工作线程不是为处理 I/O 而设计的useAtomics: boolean 默认开启,使用 原子 API 实现线程间的快速通信,通过设置环境变量PISCINA_DISABLE_ATOmICS=1可以禁用该选项resourceLimits: object (cautious)限制线程使用的资源,达到资源上限 JS 引擎会关闭工作线程。需要注意的是,即时限制了线程的资源上限,主进程仍然可能因为整体资源超过内存而崩溃。该配置项与 Node.js new Worker options 一致,主要有下面这四个东西:maxOldGenerationSizeMb: number 每个工作线程 主堆 的最大值maxYoungGenerationSizeMb: number 最近创建的对象占用的 堆 的最大值codeRangeSizeMb: number 为生成代码预分配的内存范围stackSizeMb: number 默认为 4,线程栈的最大值,值太小可能导致 Worker 实例不可用
env: object 参考 new Worker 的 options.env 部分,默认值是process.env,用来初始化工作线程的 process.env。设置为require('worker_threads').SHARE_ENV表示工作线程与主线程共享 process.env,当一个线程中 process.env 变化时,另一个线程中的 process.env 也会跟着变化argv: any[] 参考 new Worker 的 options.argv 部分。指定一个参数列表,它将以命令行参数的形式传递给工作线程,因此会被复制到工作线程的process.argv上。它与workData有些类似,都是向工作进程中传递数据。execArgv: string[] 参考 new Worker 的 options.execArgv 部分。将 node CLI 选项(node 启动主线程的参数)传递给工作线程(一些能影响 主进程 的参数会被忽略),这些参数在工作线程中能通过process.execArgv获取到。默认情况下,该值会自动从父线程继承workerData: any 参考 new Worker 的 options.workData 部分。argv只能传递可以被字符串化的参数,而这个workerData则可以传递任意一个可以被克隆的 JavaScript 对象taskQueue: TaskQueue 默认情况下,piscina 使用 先进先出 的队列提交任务,这个参数允许修改为其他自定义类型的任务队列niceIncrement: number (optional, Linux) 管理工作线程的优先级,需要配合 nice-napi 模块使用trackUnmanagedEds: boolean 默认为开启,允许工作线程追踪通过fs.open()和fs.close()管理的 文件描述符,工作线程在退出时会自动关闭它们。
piscina.run(task, [options])
将一个任务分配给一个工作线程,即执行对应的函数,task参数就是传递给那个函数的参数。
选项:
transferList: (optional) 指定一个对象列表,这些数据会被 转交 给工作线程,而不是 克隆filename: (optional) 任务函数所在的文件,会覆盖默认的文件(new Piscina 时指定的文件)name: (optional) 任务函数,使用默认导出的函数值为default,其他非默认函数为函数名abortSignal: AbortSignalp[] 用来取消任务、停止任务
此函数是 异步 的。
piscina.destroy()
停止所有工作线程,被挂起的任务的 Promise 将被 reject
如果所有线程停下了,此函数返回的 Promise 将变为 fullfilled
它也是异步函数。
Event
'error'由工作线程抛出的未捕获的异常,或者工作线程发送的未知的消息'drain'任务队列中没有任务时'message'从工作线程中收到消息时
Property
completedreadonly 当前完成的任务数durationreadonly 距离 Piscina 实例创建的时间,单位毫秒optionsreadonly 构造 Piscina 实例时传入的选项对象的拷贝runTimereadonly 统计已完成任务的直方图数据,可以用来绘图threadsreadonly 线程池对应的 Worker 实例列表queueSizereadonly 任务队列中的任务数量utilizationreadonlywaitTimereadonly
Static Property
isWorkerThreadreadonly 代码是否在工作线程中执行versinreadonly
Static Method
move
import { move } from 'piscina'
默认情况下,工作线程执行完任务函数的返回值会被 克隆 到主线程,即时这个返回值是 可转移的。
这个函数可以包装 可转移的 返回值,是它们 转移到 主线程,而不是 克隆 回去。
move 函数包装的对象必须是 Node.js 支持的 可转移 对象(实现了Transferable接口),例如ArrayBuffer, TypedArray or MessagePort),如果封装对象是不可转移的,move 函数将抛出错误。
可转移对象应直接传递给 move 函数,而不是使用嵌套的方式。
评论区