概览

本文讲述如何利用事件队列解决雪崩问题,并从实战的角度出发,用代码实现可复用的工具方法。

何为雪崩?

《深入浅出 Node.js》中是这样定义雪崩:

所谓雪崩问题,就是在高访问量、大并发量的情况下,缓存失效的场景。
此时大量的请求同时涌入数据库,数据库无法同时承受如此大的查询请求,进而影响到网站整体的响应速度。

通俗地讲,就是:

类似于三车道的高速,被 3 辆龟速车占住 3 个车道,导致后面的车无法超车,无法下高速,产生“塞车”。
同理,高速有 3 个车道上限,而数据库的连接数,也是有上限的。如果数据库连接被耗尽,也是会发生查询请求阻塞的情况。

数据库的连接数

以 MySQL 为例,它的默认最大连接数 max_connections 是 151 , 上限为 100,000

虽然,修改数据库的最大连接数配置,可以一定程度上缓解雪崩问题,但是,是一种治标不治本的做法。

怎么解决?

解决的方式,就是把相同的请求进行归类,派出 1 个 代表 A 进行数据请求即可。剩下的同质请求,不连接数据库,等待 代表 A 拿到数据,进行反馈即可。

类比于生活中的场景:

公司发放节日礼品,部门派出 1 位代表,将礼品领回部门即可,不需要所有人都去排队。那么行政人员,对接的 代表,都是不同部门的人,不会产生同质化,所以处理效率也得到提高。

那在代码层面,怎么写呢?

有两个关键点:

  • 添加 状态锁
  • 发布/订阅 模式 通知数据结果

《深入浅出 Node.js》中的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var proxy = new events.EventEmitter(); // 使用 EventEmitter 的发布/订阅 方法  
var status = "ready"; // 初始化 状态锁
var select = function (callback) {
proxy.once("selected", callback); // 使用 once 监听结果回调

// 判断当前是否有锁
if (status === "ready") {
status = "pending"; // 第1个进入的请求,将状态修改为 上锁,则其他请求进不来,只实现了 once 监听
// 查询数据库数据
db.select("SQL", function (results) {
proxy.emit("selected", results); // 数据拿到后,进行广播
status = "ready"; // 广播后,将锁解开
});
}
};

由于 Node 是单线程执行的,所以无须担心状态同步问题。

实战

《深入浅出 Node.js》中的代码,解决的场景是 一个业务类型的 SQL 请求,而在实际项目中,是不可能所有地方都这么写代码的。
这时候,就需要一个工具方法,对整套逻辑进行抽象化。

以下代码中的 preventAvalanche 是笔者在实战项目中抽象的工具方法。

改进点如下:

  • 使用 Map 的 key 标记同质化的 SQL 请求
  • 使用 Promise 的异步能力,对耗时的 SQL 请求加入微任务事件队列,不阻塞其他 Node 操作

代码中使用的 ORM 为:Sequelize

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// helper.js

const { EventEmitter } = require("events");
const proxy = new EventEmitter();
const map = new Map(); // 用于标记 model 请求状态

/**
* sql操作防止雪崩
* @param key 标记
* @param searchFn 查询方法
* @param resolve 成功回调
*/
exports.preventAvalanche = async function (key, searchFn, resolve) {
// 监听数据查询结果
proxy.once(key, (res) => {
resolve(res);
});

if (map.get(key) === "pending") {
// 状态锁判断
return;
}

map.set(key, "pending"); // 上锁

const result = await searchFn(); // 查询数据库数据

proxy.emit(key, result); // 发布查询结果
map.delete(key); // 清除没用的 key
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// service.js
const { preventAvalanche } = require("./helper");

// 获取事件key
function getEventKey(key) {
return `demoService_${key}`;
}

exports.getDemoService = function (SQLModel) {
return {
findOne(id) {
return new Promise(async (resolve) => {
const key = getEventKey(`findOne_${id}`);

// 调用防止雪崩工具方法
preventAvalanche(key, () => {
return SQLModel.findOne({
raw: true,
where: { id },
attributes: {
exclude: ["create_time", "update_time"],
},
});
}, resolve);

});
},
};
};
1
2
3
4
5
6
7
8
9
10
11
12
// controller.js

const { getDemoService } = require("./service.js");

const demoService = getDemoService(SQLModel);

// 查询礼品
async function findGift(giftId) {
const result = await demoService.findOne(giftId);

console.log("查询礼品结果", result);
}

参考

  • 《深入浅出 Node.js》- Page 76