2019 年 09 月 26 日 node.js 12.11.0 发布,工作线程(多线程)稳定。
# 单进程和单线程
当一个 Node.js 的应用启动的同时,它会启动如下模块:
- 一个进程
- 一个线程
- 事件循环机制
- JS 引擎实例
- Node.js 实例
一个进程:process
对象是一个全局变量,可在 Node.js 程序中任意地方访问,并提供当前进程的相关信息。
一个线程:单线程意味着在当前进程中同一时刻只有一个指令在执行。
事件循环:这是 Node.js 中需要重点理解的一个部分,尽管 JavaScript 是单线程的,但通过使用回调,promises
, async/await
等语法,基于事件循环将对操作系统的操作异步化,使得 Node 拥有异步非阻塞 IO 的特性。
一个 JS 引擎实例:即一个可以运行 JavaScript 代码的程序。
一个 Node.js 实例:即一个可以运行 Node.js 环境的程序。
# 为什么需要多线程
Node.js 由于 JS 的执行在单一线程,导致 CPU 密集计算的任务可能会使主线程会处于繁忙的状态,进而影响服务的性能,虽然可以通过 child_process
模块创建子进程的方式来解决,
但是一方面进程之间无法共享内存,另一方面创建进程的开销也不小。所以在 10.5.0 版本中 Node.js 提供了 worker_threads
模块来支持多线程,一直以来被人所诟病的不擅长 CPU 密集计算有望成为历史。
多线程 Workers
下 Node.js 拥有:
- 一个进程
- 多个线程
- 每个线程都拥有独立的事件循环
- 每个线程都拥有一个 JS 引擎实例
- 每个线程都拥有一个 Node.js 实例
# Worker Threads
Worker Threads
有如下特性:
ArrayBuffers
可以将内存中的变量从一个线程转到另外一个SharedArrayBuffer
可以在多个线程中共享内存中的变量,但是限制为二进制格式的数据。- 可用的原子操作,可以让你更有效率地同时执行某些操作并且实现竞态变量
- 消息端口,用于多个线程间通信。可以用于多个线程间传输结构化的数据,内存空间
- 消息通道就像多线程间的一个异步的双向通信通道。
WorkerData
是用于传输启动数据。在多个线程间使用postMessgae
进行传输的时候,数据会被克隆,并将克隆的数据传输到线程的contructor
中。
API:
const { worker, parantPort } = require("worker_threads");
worker
函数相当于一个独立的 JavaScript 运行环境线程,parentPort 是消息端口的一个实例
new Worker(filename);
//or
new Worker(code, { eval: true });
2
3
启动 worker
的时候有两种方式,可以通过传输文件路径或者代码,在生产环境中推荐使用文件路径的方式。
worker.on("message"), worker.postMessage(data);
这是多线程间监听事件与推送数据的方式。
parentPort.on("message");
parentPort.postMessage(data);
2
在线程中使用 parentPort.postMessage
方式推送的数据可以在父进程中使用 worker.on('message')
的方式接收到,在父进程中使用 worker.postMessage()
的方式推送的数据可以在线程中使用 parentPort.on('message')
的方式监听到。
# 什么是 ArrayBuffer
ArrayBuffer
跟其它 JavaScript 数组差不多,但是不是所有 JavaScript 类型都可以放进去,比如对象、字符串。你唯一可以放进去的只有字节。
ArrayBuffer
仅仅是一个个 0/1
组成的串,它不知道第一个元素和第二个元素的分割点。
为了提供必要的上下文信息,把 ArrayBuffer
分块,我们需要把它包裹到视图里,这些数据的视图可以通过带类型的数组添加,已经支持很多种类型的数组了
例如,你可以用一个 Int8
类型的数组把 0/1
串分割成 8 位一组的序列。
ArrayBuffer
几乎扮演原始内存角色,它模拟内存的各种跟 C
语言里类似的操作。
# 如何创建多线程
worker_threads
模块中比较重要的几个类:
MessageChannel
用于创建异步、双向通信的通道实例。MessageChannel
实例包含两个属性 port1
和 port2
,这两个属性都是 MessagePort
的实例。
MessagePort
用于表示 MessageChannel
通道的终端,用于 Worker
之间传输结构化数据、内存区域和其他的 MessagePort
。MessagePort
继承了 EventEmitter
,因此可以使用 postMessage
和 on
方法实现消息的传递与接收。
Worker: 用于创建单独的 JS 线程。
worker_threads
模块中比较重要的几个属性:
parentPort
子线程中的 parentPort
指向可以与主线程进行通信的 MessagePort
。
子线程向父线程发送消息
parentPort.postMessage(/*...*/);
子线程接受来自父线程的消息
parentPort.on("message", msg => {
/*...*/
});
2
3
isMainThread
用于区分当前文件是否在主线程中执行
workerData
用于传递给 Worker
构造函数的 data
副本,在子线程中可以通过 workerData
获取到父进程传入的数据。
const { Worker, parentPort, isMainThread } = require("worker_threads");
if (isMainThread) {
const w = new Worker(__filename, {
workerData: {
name: "Randal",
},
});
w.postMessage(1e10);
const startTime = Date.now();
w.on("message", function(msg) {
console.log("main thread get message: " + msg);
console.log("compute time ellapsed: " + (Date.now() - startTime) / 1000);
});
console.log("main thread executing");
} else {
const longComputation = val => {
let sum = 0;
for (let i = 0; i < val; i++) {
sum += i;
}
return sum;
};
parentPort.on("message", msg => {
console.log(`${workerData.name} worker get message: ` + msg);
parentPort.postMessage(longComputation(msg));
});
}
// 执行结果
// main thread executing
// Randal worker get message: 10000000000
// main thread get message: 49999999990067860000
// compute time ellapsed: 14.954
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 线程间如何传输数据
port.postMessag(value[, transferList])
除了 value
之外,postMessage
方法还支持传入 transferList
参数,transferList
是一个 List
,支持的数据类型包括 ArrayBuffer
和 MessagePort
对象,transferList
中的对象在传输完成后,在发送对象的线程中就不可以继续使用了。
const { Worker, isMainThread, parentPort } = require("worker_threads");
// 主线程
if (isMainThread) {
const sab = new ArrayBuffer(Int32Array.BYTES_PER_ELEMENT * 100);
const ia = new Int32Array(sab);
for (let i = 0; i < ia.length; i++) {
ia[i] = i;
}
console.log("this is the main thread");
for (let i = 0; i < 1; i++) {
let w = new Worker(__filename);
console.log("before transfer: ", sab);
w.postMessage(null, [sab]);
setTimeout(() => {
console.log("after transfer: ", sab);
}, 1000);
}
} else {
console.log("this isn't main thread");
}
// 输出结果
// this is the main thread
// before transfer: ArrayBuffer { byteLength: 400 }
// this isn't main thread
// after transfer: ArrayBuffer { byteLength: 0 }
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
如果 ArrayBuffer
是通过 value
传输的(且在 transferList
中不存在),则传输过去的是副本,如下所示:
w.postMessage(sab);
// 输出结果
// this is the main thread
// before transfer: ArrayBuffer { byteLength: 400 }
// this isn't main thread
// after transfer: ArrayBuffer { byteLength: 400 }
2
3
4
5
6
7
# 线程间如何共享内存
轮到 SharedArrayBuffer
出场了,如果 postMessage
中的 value
是 SharedArrayBuffer
的话,则线程之间就可以共享内存,如下面例子所示:
const { Worker, isMainThread, parentPort } = require("worker_threads");
// 主线程
if (isMainThread) {
const sab = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT * 5);
const ia = new Int32Array(sab);
for (let i = 0; i < ia.length; i++) {
ia[i] = i;
}
for (let i = 0; i < 2; i++) {
let w = new Worker(__filename);
w.postMessage(sab);
w.on("message", () => {
console.log(ia);
});
}
} else {
parentPort.on("message", msg => {
const ia = new Int32Array(msg, 0, 1);
ia[0] = ia[0] + 1;
parentPort.postMessage("done");
});
}
// 输出结果
// Int32Array [ 1, 1, 2, 3, 4 ]
// Int32Array [ 2, 1, 2, 3, 4 ]
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27