03-异步编程与数据库

Node.js异步编程是核心特性,从回调到Promise再到async/await,不断演进。

异步编程演进

回调函数

// 回调地狱
fs.readFile('file1.txt', 'utf8', (err, data1) => {
    if (err) return console.error(err);
    
    fs.readFile('file2.txt', 'utf8', (err, data2) => {
        if (err) return console.error(err);
        
        fs.readFile('file3.txt', 'utf8', (err, data3) => {
            if (err) return console.error(err);
            
            console.log(data1, data2, data3);
        });
    });
});

Promise

const fs = require('fs').promises;

// Promise链
fs.readFile('file1.txt', 'utf8')
    .then(data1 => {
        console.log(data1);
        return fs.readFile('file2.txt', 'utf8');
    })
    .then(data2 => {
        console.log(data2);
        return fs.readFile('file3.txt', 'utf8');
    })
    .then(data3 => {
        console.log(data3);
    })
    .catch(err => {
        console.error(err);
    });

// Promise.all(并行)
Promise.all([
    fs.readFile('file1.txt', 'utf8'),
    fs.readFile('file2.txt', 'utf8'),
    fs.readFile('file3.txt', 'utf8')
])
.then(([data1, data2, data3]) => {
    console.log(data1, data2, data3);
})
.catch(err => console.error(err));

// Promise.race(第一个完成)
Promise.race([
    fetch('url1'),
    fetch('url2')
])
.then(result => console.log(result));

// Promise.allSettled(等待全部,不管成功失败)
Promise.allSettled([
    promise1,
    promise2,
    promise3
])
.then(results => {
    results.forEach(result => {
        if (result.status === 'fulfilled') {
            console.log('成功:', result.value);
        } else {
            console.log('失败:', result.reason);
        }
    });
});

async/await

const fs = require('fs').promises;

// async函数
async function readFiles() {
    try {
        const data1 = await fs.readFile('file1.txt', 'utf8');
        const data2 = await fs.readFile('file2.txt', 'utf8');
        const data3 = await fs.readFile('file3.txt', 'utf8');
        console.log(data1, data2, data3);
    } catch (err) {
        console.error(err);
    }
}

// 并行await
async function readFilesParallel() {
    try {
        const [data1, data2, data3] = await Promise.all([
            fs.readFile('file1.txt', 'utf8'),
            fs.readFile('file2.txt', 'utf8'),
            fs.readFile('file3.txt', 'utf8')
        ]);
        console.log(data1, data2, data3);
    } catch (err) {
        console.error(err);
    }
}

// Express中使用
app.get('/users', async (req, res, next) => {
    try {
        const users = await User.find();
        res.json(users);
    } catch (err) {
        next(err);
    }
});

// 顶层await(ES2022,Node.js 14.8+)
const data = await fs.readFile('config.json', 'utf8');

事件循环

事件循环阶段

   ┌───────────────────────────┐
┌─>│           timers          │ // setTimeout、setInterval
│  └─────────────┬─────────────┘
│  ┌─────────────┴─────────────┐
│  │     pending callbacks     │ // 系统操作回调
│  └─────────────┬─────────────┘
│  ┌─────────────┴─────────────┐
│  │       idle, prepare       │ // 内部使用
│  └─────────────┬─────────────┘
│  ┌─────────────┴─────────────┐
│  │           poll            │ // I/O回调
│  └─────────────┬─────────────┘
│  ┌─────────────┴─────────────┐
│  │           check           │ // setImmediate
│  └─────────────┬─────────────┘
│  ┌─────────────┴─────────────┐
└──┤      close callbacks      │ // socket.on('close')
   └───────────────────────────┘

微任务和宏任务

console.log('1');

setTimeout(() => console.log('2'), 0);  // 宏任务

Promise.resolve().then(() => console.log('3'));  // 微任务

process.nextTick(() => console.log('4'));  // 微任务(优先)

console.log('5');

// 输出:1 5 4 3 2
// 执行顺序:同步 → process.nextTick → Promise → setTimeout

setImmediate vs setTimeout

// setImmediate:check阶段执行
setImmediate(() => {
    console.log('immediate');
});

// setTimeout:timers阶段执行
setTimeout(() => {
    console.log('timeout');
}, 0);

// I/O回调中:setImmediate总是先执行
fs.readFile('file.txt', () => {
    setImmediate(() => console.log('immediate'));  // 先
    setTimeout(() => console.log('timeout'), 0);    // 后
});

MongoDB

Mongoose

npm install mongoose
const mongoose = require('mongoose');

// 连接数据库
mongoose.connect('mongodb://localhost:27017/myapp', {
    useNewUrlParser: true,
    useUnifiedTopology: true
});

const db = mongoose.connection;
db.on('error', console.error);
db.once('open', () => console.log('MongoDB connected'));

// 定义Schema
const userSchema = new mongoose.Schema({
    name: {
        type: String,
        required: true,
        trim: true
    },
    email: {
        type: String,
        required: true,
        unique: true,
        lowercase: true
    },
    age: {
        type: Number,
        min: 0,
        max: 150
    },
    createdAt: {
        type: Date,
        default: Date.now
    }
});

// 添加方法
userSchema.methods.greet = function() {
    return `Hello, ${this.name}!`;
};

// 静态方法
userSchema.statics.findByEmail = function(email) {
    return this.findOne({ email });
};

// 创建Model
const User = mongoose.model('User', userSchema);

// CRUD操作
// 创建
const user = new User({ name: 'Alice', email: 'alice@example.com', age: 25 });
await user.save();

// 或
const user = await User.create({ name: 'Alice', email: 'alice@example.com' });

// 查询
const users = await User.find();
const user = await User.findById(id);
const user = await User.findOne({ email: 'alice@example.com' });
const users = await User.find({ age: { $gte: 18 } });

// 更新
await User.updateOne({ _id: id }, { name: 'Alice2' });
await User.findByIdAndUpdate(id, { name: 'Alice2' }, { new: true });

// 删除
await User.deleteOne({ _id: id });
await User.findByIdAndDelete(id);

// 查询链
const users = await User
    .find({ age: { $gte: 18 } })
    .sort({ createdAt: -1 })
    .limit(10)
    .select('name email');

MySQL

mysql2

npm install mysql2
const mysql = require('mysql2/promise');

// 创建连接池
const pool = mysql.createPool({
    host: 'localhost',
    user: 'root',
    password: 'password',
    database: 'myapp',
    waitForConnections: true,
    connectionLimit: 10,
    queueLimit: 0
});

// 查询
const [rows] = await pool.query('SELECT * FROM users WHERE age > ?', [18]);

// 插入
const [result] = await pool.query(
    'INSERT INTO users (name, email) VALUES (?, ?)',
    ['Alice', 'alice@example.com']
);
console.log(result.insertId);

// 更新
await pool.query('UPDATE users SET name = ? WHERE id = ?', ['Alice2', 1]);

// 删除
await pool.query('DELETE FROM users WHERE id = ?', [1]);

// 事务
const connection = await pool.getConnection();
await connection.beginTransaction();

try {
    await connection.query('INSERT INTO users ...');
    await connection.query('UPDATE accounts ...');
    await connection.commit();
} catch (err) {
    await connection.rollback();
    throw err;
} finally {
    connection.release();
}

Sequelize(ORM)

npm install sequelize mysql2
const { Sequelize, DataTypes } = require('sequelize');

// 连接
const sequelize = new Sequelize('myapp', 'root', 'password', {
    host: 'localhost',
    dialect: 'mysql',
    logging: false
});

// 定义模型
const User = sequelize.define('User', {
    name: {
        type: DataTypes.STRING,
        allowNull: false
    },
    email: {
        type: DataTypes.STRING,
        unique: true,
        validate: {
            isEmail: true
        }
    },
    age: {
        type: DataTypes.INTEGER,
        defaultValue: 0
    }
});

// 同步模型
await sequelize.sync();

// CRUD
const user = await User.create({ name: 'Alice', email: 'alice@example.com' });

const users = await User.findAll();
const user = await User.findByPk(1);
const user = await User.findOne({ where: { email: 'alice@example.com' } });

await User.update({ name: 'Alice2' }, { where: { id: 1 } });

await User.destroy({ where: { id: 1 } });

// 关联
const Post = sequelize.define('Post', { /* ... */ });

User.hasMany(Post);
Post.belongsTo(User);

// 查询关联
const users = await User.findAll({
    include: Post
});

Redis

ioredis

npm install ioredis
const Redis = require('ioredis');

const redis = new Redis({
    host: 'localhost',
    port: 6379,
    password: 'password',
    db: 0
});

// 字符串
await redis.set('key', 'value');
const value = await redis.get('key');

// 过期时间
await redis.set('key', 'value', 'EX', 60);  // 60秒过期

// 哈希
await redis.hset('user:1', 'name', 'Alice');
await redis.hset('user:1', 'age', 25);
const name = await redis.hget('user:1', 'name');
const user = await redis.hgetall('user:1');

// 列表
await redis.lpush('list', 'item1', 'item2');
const item = await redis.rpop('list');

// 集合
await redis.sadd('set', 'member1', 'member2');
const members = await redis.smembers('set');

// 有序集合
await redis.zadd('leaderboard', 100, 'player1', 200, 'player2');
const top10 = await redis.zrevrange('leaderboard', 0, 9, 'WITHSCORES');

// Pipeline(批量操作)
const pipeline = redis.pipeline();
pipeline.set('key1', 'value1');
pipeline.set('key2', 'value2');
pipeline.get('key1');
const results = await pipeline.exec();

// 发布订阅
const subscriber = new Redis();
subscriber.subscribe('channel');
subscriber.on('message', (channel, message) => {
    console.log(`${channel}: ${message}`);
});

const publisher = new Redis();
publisher.publish('channel', 'Hello');

缓存策略

缓存模式

// 查询缓存
async function getUserById(id) {
    const cacheKey = `user:${id}`;
    
    // 1. 查缓存
    const cached = await redis.get(cacheKey);
    if (cached) {
        return JSON.parse(cached);
    }
    
    // 2. 查数据库
    const user = await User.findById(id);
    
    // 3. 写缓存
    if (user) {
        await redis.set(cacheKey, JSON.stringify(user), 'EX', 3600);
    }
    
    return user;
}

// 缓存失效
async function updateUser(id, data) {
    const user = await User.findByIdAndUpdate(id, data, { new: true });
    
    // 删除缓存
    await redis.del(`user:${id}`);
    
    return user;
}

// 缓存预热
async function warmUpCache() {
    const users = await User.find().limit(100);
    const pipeline = redis.pipeline();
    
    users.forEach(user => {
        pipeline.set(
            `user:${user.id}`,
            JSON.stringify(user),
            'EX',
            3600
        );
    });
    
    await pipeline.exec();
}

异步并发控制

并发限制

// ❌ 问题:同时发起1000个请求
const promises = urls.map(url => fetch(url));
const results = await Promise.all(promises);  // 可能压垮服务器

// ✅ 解决:限制并发数
async function promiseLimit(tasks, limit) {
    const results = [];
    const executing = [];
    
    for (const task of tasks) {
        const p = Promise.resolve().then(() => task());
        results.push(p);
        
        if (limit <= tasks.length) {
            const e = p.then(() => executing.splice(executing.indexOf(e), 1));
            executing.push(e);
            if (executing.length >= limit) {
                await Promise.race(executing);
            }
        }
    }
    
    return Promise.all(results);
}

// 使用
const tasks = urls.map(url => () => fetch(url));
const results = await promiseLimit(tasks, 5);  // 最多5个并发

// 或使用p-limit库
const pLimit = require('p-limit');
const limit = pLimit(5);

const promises = urls.map(url => limit(() => fetch(url)));
await Promise.all(promises);

错误处理

// try-catch
async function getData() {
    try {
        const data = await fetchData();
        return data;
    } catch (err) {
        console.error('获取数据失败:', err);
        throw err;  // 重新抛出
    }
}

// Promise.catch
fetchData()
    .then(data => processData(data))
    .catch(err => console.error(err))
    .finally(() => console.log('完成'));

// 批量错误处理
const results = await Promise.allSettled(promises);
results.forEach((result, index) => {
    if (result.status === 'rejected') {
        console.error(`Task ${index} failed:`, result.reason);
    }
});

Stream流

可读流

const fs = require('fs');

// 创建可读流
const readStream = fs.createReadStream('large-file.txt', {
    encoding: 'utf8',
    highWaterMark: 64 * 1024  // 缓冲区大小64KB
});

readStream.on('data', (chunk) => {
    console.log('读取:', chunk.length, '字节');
});

readStream.on('end', () => {
    console.log('读取完成');
});

readStream.on('error', (err) => {
    console.error('错误:', err);
});

// 暂停和恢复
readStream.pause();
readStream.resume();

可写流

const writeStream = fs.createWriteStream('output.txt');

writeStream.write('第一行\n');
writeStream.write('第二行\n');
writeStream.end('最后一行\n');

writeStream.on('finish', () => {
    console.log('写入完成');
});

writeStream.on('error', (err) => {
    console.error('错误:', err);
});

管道

const fs = require('fs');

// 复制文件
const readStream = fs.createReadStream('source.txt');
const writeStream = fs.createWriteStream('dest.txt');

readStream.pipe(writeStream);

// 链式管道
const zlib = require('zlib');

fs.createReadStream('input.txt')
    .pipe(zlib.createGzip())
    .pipe(fs.createWriteStream('input.txt.gz'));

// HTTP响应流
app.get('/download', (req, res) => {
    const fileStream = fs.createReadStream('large-file.pdf');
    fileStream.pipe(res);
});

Transform流

const { Transform } = require('stream');

// 自定义转换流
const upperCaseTransform = new Transform({
    transform(chunk, encoding, callback) {
        this.push(chunk.toString().toUpperCase());
        callback();
    }
});

fs.createReadStream('input.txt')
    .pipe(upperCaseTransform)
    .pipe(fs.createWriteStream('output.txt'));

性能优化

异步并行

// ❌ 串行(慢)
const user = await User.findById(id);
const posts = await Post.find({ userId: id });
const comments = await Comment.find({ userId: id });

// ✅ 并行(快)
const [user, posts, comments] = await Promise.all([
    User.findById(id),
    Post.find({ userId: id }),
    Comment.find({ userId: id })
]);

数据库批量操作

// ❌ 逐条插入
for (const item of items) {
    await Model.create(item);  // N次数据库调用
}

// ✅ 批量插入
await Model.insertMany(items);  // 1次数据库调用

缓存优化

// 缓存中间件
const cacheMiddleware = (duration) => {
    return async (req, res, next) => {
        const key = `cache:${req.url}`;
        const cached = await redis.get(key);
        
        if (cached) {
            return res.json(JSON.parse(cached));
        }
        
        res.originalJson = res.json;
        res.json = function(data) {
            redis.set(key, JSON.stringify(data), 'EX', duration);
            res.originalJson(data);
        };
        
        next();
    };
};

// 使用
app.get('/api/data', cacheMiddleware(60), async (req, res) => {
    const data = await fetchData();
    res.json(data);
});

核心: Node.js异步I/O通过事件循环实现高并发。Promise/async/await简化异步编程,数据库操作需合理使用缓存和批量操作。