08-并发编程基础

并发是现代程序充分利用多核CPU的关键。C++11引入标准线程库,提供跨平台的并发支持。

线程基础

线程是操作系统调度的最小单位。C++ std::thread 封装了系统线程,支持函数、成员函数、Lambda等多种启动方式。

线程创建和管理

创建线程时传入可调用对象及参数。线程对象销毁前必须 join()detach(),否则程序终止。

#include <thread>
#include <iostream>

// 函数线程
void threadFunction(int id) {
    std::cout << "Thread " << id << " is running" << std::endl;
}

// 类成员函数线程
class ThreadClass {
public:
    void memberFunction(int id) {
        std::cout << "Member function in thread " << id << std::endl;
    }
};

// Lambda线程
void createThreads() {
    // 函数线程
    std::thread t1(threadFunction, 1);
    
    // 类成员函数线程
    ThreadClass obj;
    std::thread t2(&ThreadClass::memberFunction, &obj, 2);
    
    // Lambda线程
    std::thread t3([](int id) {
        std::cout << "Lambda thread " << id << std::endl;
    }, 3);
    
    // 等待线程完成
    t1.join();
    t2.join();
    t3.join();
}

线程生命周期

线程有两种结束方式:join() 阻塞等待线程完成,detach() 分离线程使其在后台运行。选择错误导致资源泄漏或程序崩溃。

#include <thread>
#include <chrono>

void threadLifecycle() {
    std::thread t([]() {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        std::cout << "Thread finished" << std::endl;
    });
    
    // 检查线程是否可join
    if (t.joinable()) {
        std::cout << "Thread is joinable" << std::endl;
    }
    
    // 等待线程完成
    t.join();
    
    // 分离线程(不等待完成)
    // t.detach();
}

互斥锁

多线程访问共享数据需要同步机制。互斥锁(Mutex)是最基本的同步原语,保证同一时刻只有一个线程访问临界区。

std::mutex

std::mutex 提供基本互斥功能。lock() 获取锁,unlock() 释放锁。推荐使用RAII封装(如 lock_guard)避免忘记解锁。

#include <mutex>
#include <thread>
#include <vector>

class Counter {
private:
    int count = 0;
    std::mutex mtx;
    
public:
    void increment() {
        std::lock_guard<std::mutex> lock(mtx);
        ++count;
    }
    
    int getCount() const {
        std::lock_guard<std::mutex> lock(mtx);
        return count;
    }
};

void testMutex() {
    Counter counter;
    std::vector<std::thread> threads;
    
    // 创建多个线程
    for (int i = 0; i < 10; ++i) {
        threads.emplace_back([&counter]() {
            for (int j = 0; j < 1000; ++j) {
                counter.increment();
            }
        });
    }
    
    // 等待所有线程完成
    for (auto& t : threads) {
        t.join();
    }
    
    std::cout << "Final count: " << counter.getCount() << std::endl;
}

递归互斥锁

std::recursive_mutex 允许同一线程多次获取锁。适用于递归调用或嵌套锁场景,但性能较低,优先重构代码避免使用。

#include <mutex>

class RecursiveCounter {
private:
    int count = 0;
    std::recursive_mutex mtx;
    
public:
    void increment() {
        std::lock_guard<std::recursive_mutex> lock(mtx);
        ++count;
    }
    
    void incrementTwice() {
        std::lock_guard<std::recursive_mutex> lock(mtx);
        increment();  // 递归调用,需要递归锁
        increment();
    }
    
    int getCount() const {
        std::lock_guard<std::recursive_mutex> lock(mtx);
        return count;
    }
};

超时互斥锁

#include <mutex>
#include <chrono>

class TimeoutExample {
private:
    std::timed_mutex mtx;
    
public:
    bool tryLockFor(int seconds) {
        return mtx.try_lock_for(std::chrono::seconds(seconds));
    }
    
    bool tryLockUntil(std::chrono::steady_clock::time_point time) {
        return mtx.try_lock_until(time);
    }
    
    void unlock() {
        mtx.unlock();
    }
};

条件变量

条件变量用于线程间通信,实现等待/通知模式。线程可以等待某个条件成立,其他线程在条件改变时通知等待的线程。

基本使用

std::condition_variable 必须与 std::unique_lock 配合。wait() 释放锁并阻塞,被通知后重新获取锁。防止虚假唤醒需要传入谓词。

#include <condition_variable>
#include <mutex>
#include <queue>

class ProducerConsumer {
private:
    std::queue<int> queue;
    std::mutex mtx;
    std::condition_variable cv;
    bool finished = false;
    
public:
    void producer() {
        for (int i = 0; i < 10; ++i) {
            std::unique_lock<std::mutex> lock(mtx);
            queue.push(i);
            std::cout << "Produced: " << i << std::endl;
            cv.notify_one();  // 通知等待的线程
        }
        
        // 通知消费者结束
        {
            std::unique_lock<std::mutex> lock(mtx);
            finished = true;
        }
        cv.notify_all();
    }
    
    void consumer() {
        while (true) {
            std::unique_lock<std::mutex> lock(mtx);
            
            // 等待条件满足
            cv.wait(lock, [this] { return !queue.empty() || finished; });
            
            if (finished && queue.empty()) {
                break;
            }
            
            if (!queue.empty()) {
                int value = queue.front();
                queue.pop();
                std::cout << "Consumed: " << value << std::endl;
            }
        }
    }
};

等待条件

#include <condition_variable>
#include <mutex>

class WaitExample {
private:
    std::mutex mtx;
    std::condition_variable cv;
    bool ready = false;
    
public:
    void waitForReady() {
        std::unique_lock<std::mutex> lock(mtx);
        
        // 等待条件满足
        cv.wait(lock, [this] { return ready; });
        
        std::cout << "Ready!" << std::endl;
    }
    
    void setReady() {
        {
            std::lock_guard<std::mutex> lock(mtx);
            ready = true;
        }
        cv.notify_all();
    }
};

原子操作

原子操作是不可分割的操作,无需锁即可安全并发访问。std::atomic 提供无锁编程能力,性能高但使用复杂。

基本原子类型

std::atomic<T> 支持整型、指针、浮点型(C++20)。提供 loadstoreexchangecompare_exchange 等操作。

#include <atomic>
#include <thread>

class AtomicCounter {
private:
    std::atomic<int> count{0};
    
public:
    void increment() {
        ++count;  // 原子操作
    }
    
    void add(int value) {
        count += value;  // 原子操作
    }
    
    int getCount() const {
        return count.load();  // 原子读取
    }
    
    void setCount(int value) {
        count.store(value);  // 原子写入
    }
    
    bool compareAndSwap(int expected, int desired) {
        return count.compare_exchange_weak(expected, desired);
    }
};

原子操作类型

#include <atomic>

void atomicTypes() {
    // 基本原子类型
    std::atomic<int> atomicInt{42};
    std::atomic<bool> atomicBool{true};
    std::atomic<double> atomicDouble{3.14};
    
    // 原子指针
    int value = 42;
    std::atomic<int*> atomicPtr{&value};
    
    // 原子标志
    std::atomic_flag flag = ATOMIC_FLAG_INIT;
    
    // 原子操作
    atomicInt.fetch_add(10);  // 原子加法
    atomicInt.fetch_sub(5);   // 原子减法
    atomicInt.fetch_and(0xFF); // 原子按位与
    atomicInt.fetch_or(0x100); // 原子按位或
    atomicInt.fetch_xor(0x200); // 原子按位异或
}

线程安全容器

线程安全队列

#include <queue>
#include <mutex>
#include <condition_variable>

template<typename T>
class ThreadSafeQueue {
private:
    std::queue<T> queue;
    mutable std::mutex mtx;
    std::condition_variable cv;
    
public:
    void push(T value) {
        std::lock_guard<std::mutex> lock(mtx);
        queue.push(value);
        cv.notify_one();
    }
    
    bool tryPop(T& value) {
        std::lock_guard<std::mutex> lock(mtx);
        if (queue.empty()) {
            return false;
        }
        value = queue.front();
        queue.pop();
        return true;
    }
    
    void waitAndPop(T& value) {
        std::unique_lock<std::mutex> lock(mtx);
        cv.wait(lock, [this] { return !queue.empty(); });
        value = queue.front();
        queue.pop();
    }
    
    bool empty() const {
        std::lock_guard<std::mutex> lock(mtx);
        return queue.empty();
    }
    
    size_t size() const {
        std::lock_guard<std::mutex> lock(mtx);
        return queue.size();
    }
};

线程安全单例

#include <mutex>
#include <memory>

class ThreadSafeSingleton {
private:
    static std::unique_ptr<ThreadSafeSingleton> instance;
    static std::mutex mtx;
    
    ThreadSafeSingleton() = default;
    
public:
    static ThreadSafeSingleton& getInstance() {
        if (!instance) {
            std::lock_guard<std::mutex> lock(mtx);
            if (!instance) {  // 双重检查锁定
                instance = std::unique_ptr<ThreadSafeSingleton>(new ThreadSafeSingleton);
            }
        }
        return *instance;
    }
    
    // 禁用拷贝和赋值
    ThreadSafeSingleton(const ThreadSafeSingleton&) = delete;
    ThreadSafeSingleton& operator=(const ThreadSafeSingleton&) = delete;
};

// 静态成员定义
std::unique_ptr<ThreadSafeSingleton> ThreadSafeSingleton::instance = nullptr;
std::mutex ThreadSafeSingleton::mtx;

线程池

简单线程池实现

#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>

class ThreadPool {
private:
    std::vector<std::thread> workers;
    std::queue<std::function<void()>> tasks;
    std::mutex queueMutex;
    std::condition_variable condition;
    bool stop;
    
public:
    ThreadPool(size_t threads) : stop(false) {
        for (size_t i = 0; i < threads; ++i) {
            workers.emplace_back([this] {
                while (true) {
                    std::function<void()> task;
                    
                    {
                        std::unique_lock<std::mutex> lock(queueMutex);
                        condition.wait(lock, [this] { return stop || !tasks.empty(); });
                        
                        if (stop && tasks.empty()) {
                            return;
                        }
                        
                        task = std::move(tasks.front());
                        tasks.pop();
                    }
                    
                    task();
                }
            });
        }
    }
    
    template<typename F, typename... Args>
    auto enqueue(F&& f, Args&&... args) 
        -> std::future<typename std::result_of<F(Args...)>::type> {
        
        using return_type = typename std::result_of<F(Args...)>::type;
        
        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        
        std::future<return_type> res = task->get_future();
        
        {
            std::unique_lock<std::mutex> lock(queueMutex);
            if (stop) {
                throw std::runtime_error("enqueue on stopped ThreadPool");
            }
            tasks.emplace([task] { (*task)(); });
        }
        
        condition.notify_one();
        return res;
    }
    
    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queueMutex);
            stop = true;
        }
        
        condition.notify_all();
        for (std::thread& worker : workers) {
            worker.join();
        }
    }
};

异步编程

std::async

#include <future>
#include <chrono>

int longRunningTask(int value) {
    std::this_thread::sleep_for(std::chrono::seconds(2));
    return value * value;
}

void asyncExample() {
    // 异步执行任务
    std::future<int> future = std::async(std::launch::async, longRunningTask, 5);
    
    // 做其他工作
    std::cout << "Doing other work..." << std::endl;
    
    // 获取结果
    int result = future.get();
    std::cout << "Result: " << result << std::endl;
}

std::promise和std::future

#include <future>
#include <thread>

void promiseFutureExample() {
    std::promise<int> promise;
    std::future<int> future = promise.get_future();
    
    std::thread t([&promise]() {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        promise.set_value(42);
    });
    
    // 等待结果
    int result = future.get();
    std::cout << "Result: " << result << std::endl;
    
    t.join();
}

常见并发问题与解决方案

1. 死锁(Deadlock)

问题:两个线程互相等待对方持有的锁

// ❌ 死锁示例
std::mutex mtx1, mtx2;

void thread1() {
    std::lock_guard<std::mutex> lock1(mtx1);
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
    std::lock_guard<std::mutex> lock2(mtx2);  // 等待mtx2
}

void thread2() {
    std::lock_guard<std::mutex> lock2(mtx2);
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
    std::lock_guard<std::mutex> lock1(mtx1);  // 等待mtx1(死锁!)
}

// ✅ 解决方案1:固定加锁顺序
void thread1() {
    std::lock_guard<std::mutex> lock1(mtx1);  // 总是先锁mtx1
    std::lock_guard<std::mutex> lock2(mtx2);
}

void thread2() {
    std::lock_guard<std::mutex> lock1(mtx1);  // 总是先锁mtx1
    std::lock_guard<std::mutex> lock2(mtx2);
}

// ✅ 解决方案2:同时获取多个锁
void safeFunction() {
    std::scoped_lock lock(mtx1, mtx2);  // C++17,原子获取
    // 或使用 std::lock(mtx1, mtx2)
}

2. 竞态条件(Race Condition)

问题:多线程无同步访问共享数据

// ❌ 竞态条件
int counter = 0;

void increment() {
    for (int i = 0; i < 1000; ++i) {
        counter++;  // 非原子操作!
        // 实际是:read-modify-write三步,可能交错
    }
}

// ✅ 解决方案1:互斥锁
std::mutex mtx;
void increment() {
    for (int i = 0; i < 1000; ++i) {
        std::lock_guard<std::mutex> lock(mtx);
        counter++;
    }
}

// ✅ 解决方案2:原子操作
std::atomic<int> counter{0};
void increment() {
    for (int i = 0; i < 1000; ++i) {
        counter++;  // 原子操作
    }
}

3. 数据竞争(Data Race)

问题:至少一个线程写,另一个线程读/写,无同步

// ❌ 数据竞争(未定义行为)
std::vector<int> vec;

void writer() {
    vec.push_back(42);  // 写
}

void reader() {
    if (!vec.empty()) {
        int val = vec[0];  // 读(数据竞争!)
    }
}

// ✅ 解决方案:加锁
std::mutex mtx;
std::vector<int> vec;

void writer() {
    std::lock_guard<std::mutex> lock(mtx);
    vec.push_back(42);
}

void reader() {
    std::lock_guard<std::mutex> lock(mtx);
    if (!vec.empty()) {
        int val = vec[0];
    }
}

4. ABA问题

问题:CAS操作中值从A变B再变回A,误以为未变

// ❌ ABA问题
std::atomic<Node*> head;

void push(int val) {
    Node* newNode = new Node(val);
    Node* oldHead;
    do {
        oldHead = head.load();
        newNode->next = oldHead;
    } while (!head.compare_exchange_weak(oldHead, newNode));
    // 如果oldHead在中间被删除又分配了同地址的新节点,CAS成功但逻辑错误
}

// ✅ 解决方案:版本号或使用智能指针
struct VersionedPtr {
    Node* ptr;
    size_t version;
};
std::atomic<VersionedPtr> head;

5. 虚假唤醒(Spurious Wakeup)

问题:条件变量可能无故唤醒

// ❌ 未处理虚假唤醒
std::condition_variable cv;
std::mutex mtx;
bool ready = false;

void wait() {
    std::unique_lock<std::mutex> lock(mtx);
    cv.wait(lock);  // 可能虚假唤醒
    // ready可能仍为false!
}

// ✅ 解决方案:使用谓词
void wait() {
    std::unique_lock<std::mutex> lock(mtx);
    cv.wait(lock, [] { return ready; });  // 循环检查条件
}

并发编程原则

  1. 最小化共享数据:减少锁竞争

  2. 优先使用高层抽象std::async > std::thread

  3. 避免嵌套锁:易死锁,用 std::scoped_lock

  4. 用RAII管理锁lock_guard/unique_lock

  5. 移动而非拷贝:线程间传递大对象

  6. 无锁优于有锁:原子操作 > 互斥锁

  7. 读多写少用读写锁shared_mutex(C++17)

  8. 测试并发代码:使用ThreadSanitizer检测数据竞争

核心思想:并发很难,能避免就避免;不能避免就用标准库;必须手写就充分测试。