05-WebSocket与性能优化
WebSocket实现双向实时通信,性能优化涵盖事件循环、内存管理、集群等方面。
WebSocket
Socket.IO
npm install socket.io
服务端:
const express = require('express');
const http = require('http');
const { Server } = require('socket.io');
const app = express();
const server = http.createServer(app);
const io = new Server(server, {
cors: {
origin: '*'
}
});
// 连接事件
io.on('connection', (socket) => {
console.log('用户连接:', socket.id);
// 接收消息
socket.on('message', (data) => {
console.log('收到消息:', data);
// 回复客户端
socket.emit('message', { text: 'Server received' });
// 广播给所有客户端
io.emit('message', data);
// 广播给除自己外的所有客户端
socket.broadcast.emit('message', data);
});
// 加入房间
socket.on('join', (room) => {
socket.join(room);
io.to(room).emit('message', `${socket.id} joined ${room}`);
});
// 离开房间
socket.on('leave', (room) => {
socket.leave(room);
});
// 房间内广播
socket.on('room-message', (room, data) => {
io.to(room).emit('message', data);
});
// 断开连接
socket.on('disconnect', () => {
console.log('用户断开:', socket.id);
});
});
server.listen(3000);
客户端:
const socket = io('http://localhost:3000');
// 连接事件
socket.on('connect', () => {
console.log('已连接:', socket.id);
});
// 发送消息
socket.emit('message', { text: 'Hello' });
// 接收消息
socket.on('message', (data) => {
console.log('收到:', data);
});
// 加入房间
socket.emit('join', 'room1');
// 断开连接
socket.on('disconnect', () => {
console.log('连接断开');
});
ws(原生WebSocket)
npm install ws
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', (ws) => {
console.log('新连接');
// 接收消息
ws.on('message', (message) => {
console.log('收到:', message.toString());
// 发送消息
ws.send('Server received: ' + message);
});
// 广播
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
});
ws.on('close', () => {
console.log('连接关闭');
});
ws.on('error', (error) => {
console.error('错误:', error);
});
});
// 心跳检测
setInterval(() => {
wss.clients.forEach((ws) => {
if (!ws.isAlive) {
return ws.terminate();
}
ws.isAlive = false;
ws.ping();
});
}, 30000);
wss.on('connection', (ws) => {
ws.isAlive = true;
ws.on('pong', () => {
ws.isAlive = true;
});
});
性能优化
集群模式
const cluster = require('cluster');
const os = require('os');
const express = require('express');
if (cluster.isMaster) {
const numCPUs = os.cpus().length;
console.log(`主进程 ${process.pid} 启动`);
console.log(`启动 ${numCPUs} 个工作进程`);
// 创建工作进程
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
// 工作进程退出时重启
cluster.on('exit', (worker, code, signal) => {
console.log(`工作进程 ${worker.process.pid} 退出`);
cluster.fork();
});
} else {
// 工作进程运行Express
const app = express();
app.get('/', (req, res) => {
res.send(`进程 ${process.pid} 处理请求`);
});
app.listen(3000, () => {
console.log(`工作进程 ${process.pid} 启动`);
});
}
PM2
npm install -g pm2
# 启动
pm2 start app.js
# 集群模式
pm2 start app.js -i max # 使用所有CPU核心
pm2 start app.js -i 4 # 4个实例
# 监控
pm2 list
pm2 monit
# 日志
pm2 logs
pm2 logs app
# 重启
pm2 restart app
pm2 reload app # 0秒停机重启
# 停止
pm2 stop app
pm2 delete app
# 配置文件(ecosystem.config.js)
module.exports = {
apps: [{
name: 'myapp',
script: './dist/index.js',
instances: 'max',
exec_mode: 'cluster',
env: {
NODE_ENV: 'development'
},
env_production: {
NODE_ENV: 'production'
}
}]
};
# 使用配置
pm2 start ecosystem.config.js --env production
内存优化
// ❌ 内存泄漏:全局变量积累
const cache = {};
app.get('/user/:id', (req, res) => {
cache[req.params.id] = { /* 大对象 */ }; // 永不释放
});
// ✅ 使用LRU缓存
const LRU = require('lru-cache');
const cache = new LRU({
max: 500, // 最多500项
ttl: 1000 * 60 * 5 // 5分钟过期
});
// ❌ 闭包陷阱
function createHandler() {
const largeData = new Array(1000000); // 大数组
return () => {
// 闭包持有largeData引用,无法释放
};
}
// ✅ 及时清理
function createHandler() {
let largeData = new Array(1000000);
return () => {
const result = processData(largeData);
largeData = null; // 清理引用
return result;
};
}
// 监控内存
setInterval(() => {
const usage = process.memoryUsage();
console.log({
rss: `${Math.round(usage.rss / 1024 / 1024)} MB`,
heapTotal: `${Math.round(usage.heapTotal / 1024 / 1024)} MB`,
heapUsed: `${Math.round(usage.heapUsed / 1024 / 1024)} MB`,
external: `${Math.round(usage.external / 1024 / 1024)} MB`
});
}, 60000);
CPU优化
// ❌ CPU密集任务阻塞事件循环
app.get('/heavy', (req, res) => {
const result = heavyComputation(); // 阻塞其他请求
res.json({ result });
});
// ✅ 使用Worker Threads
const { Worker } = require('worker_threads');
app.get('/heavy', (req, res) => {
const worker = new Worker('./heavy-task.js');
worker.on('message', (result) => {
res.json({ result });
});
worker.on('error', (err) => {
res.status(500).json({ error: err.message });
});
worker.postMessage({ data: req.body });
});
// heavy-task.js
const { parentPort } = require('worker_threads');
parentPort.on('message', ({ data }) => {
const result = heavyComputation(data);
parentPort.postMessage(result);
});
缓存策略
// 1. 内存缓存(单机)
const NodeCache = require('node-cache');
const cache = new NodeCache({ stdTTL: 600 });
app.get('/api/data', async (req, res) => {
const cacheKey = 'data';
let data = cache.get(cacheKey);
if (data) {
return res.json({ data, cached: true });
}
data = await fetchData();
cache.set(cacheKey, data);
res.json({ data, cached: false });
});
// 2. Redis缓存(分布式)
async function getCached(key, fetcher, ttl = 3600) {
const cached = await redis.get(key);
if (cached) {
return JSON.parse(cached);
}
const data = await fetcher();
await redis.set(key, JSON.stringify(data), 'EX', ttl);
return data;
}
// 使用
const user = await getCached(
`user:${id}`,
() => User.findById(id),
3600
);
// 3. HTTP缓存头
app.get('/static-data', (req, res) => {
res.set('Cache-Control', 'public, max-age=3600');
res.json({ data });
});
压缩响应
const compression = require('compression');
app.use(compression({
filter: (req, res) => {
if (req.headers['x-no-compression']) {
return false;
}
return compression.filter(req, res);
},
level: 6 // 压缩级别 0-9
}));
数据库优化
// ❌ N+1查询
const posts = await Post.find();
for (const post of posts) {
post.author = await User.findById(post.userId); // N次查询
}
// ✅ 使用populate(Mongoose)
const posts = await Post.find().populate('author');
// ✅ 批量查询
const posts = await Post.find();
const userIds = [...new Set(posts.map(p => p.userId))];
const users = await User.find({ _id: { $in: userIds } });
const userMap = new Map(users.map(u => [u.id, u]));
posts.forEach(post => {
post.author = userMap.get(post.userId);
});
// 索引优化
userSchema.index({ email: 1 }); // 单字段索引
userSchema.index({ name: 1, age: -1 }); // 复合索引
// 查询投影
const users = await User.find().select('name email'); // 只返回部分字段
// 分页
const page = 1;
const limit = 20;
const users = await User.find()
.skip((page - 1) * limit)
.limit(limit);
性能监控
内置性能API
const { performance } = require('perf_hooks');
const start = performance.now();
await someOperation();
const end = performance.now();
console.log(`耗时: ${end - start}ms`);
// 性能标记
performance.mark('start-fetch');
await fetch('url');
performance.mark('end-fetch');
performance.measure('fetch-duration', 'start-fetch', 'end-fetch');
const measure = performance.getEntriesByName('fetch-duration')[0];
console.log(`Fetch耗时: ${measure.duration}ms`);
APM工具
// New Relic
require('newrelic');
// AppDynamics
require('appdynamics');
// Datadog
const tracer = require('dd-trace').init();
// 自定义追踪
const span = tracer.startSpan('operation');
await operation();
span.finish();
最佳实践
异步错误处理
// ✓ 统一错误处理
process.on('unhandledRejection', (reason, promise) => {
console.error('未处理的Promise拒绝:', reason);
// 记录日志、发送告警
});
process.on('uncaughtException', (err) => {
console.error('未捕获的异常:', err);
process.exit(1); // 退出进程
});
// ✓ 优雅关闭
process.on('SIGTERM', async () => {
console.log('收到SIGTERM信号');
server.close(() => {
console.log('HTTP服务器关闭');
});
await mongoose.connection.close();
console.log('数据库连接关闭');
process.exit(0);
});
连接池
// MongoDB连接池
mongoose.connect('mongodb://localhost/myapp', {
maxPoolSize: 10, // 最大连接数
minPoolSize: 2, // 最小连接数
serverSelectionTimeoutMS: 5000
});
// MySQL连接池
const pool = mysql.createPool({
host: 'localhost',
user: 'root',
password: 'password',
database: 'myapp',
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0
});
限流
const rateLimit = require('express-rate-limit');
const limiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15分钟
max: 100, // 最多100个请求
message: '请求过于频繁,请稍后再试'
});
app.use('/api/', limiter);
// 按IP限流
const ipLimiter = rateLimit({
windowMs: 60 * 1000,
max: 5,
standardHeaders: true,
legacyHeaders: false
});
app.post('/api/login', ipLimiter, loginHandler);
核心: WebSocket实现双向实时通信。性能优化关注事件循环、内存泄漏、数据库查询、缓存策略和集群部署。