# 08-并发编程基础 并发是现代程序充分利用多核CPU的关键。C++11引入标准线程库,提供跨平台的并发支持。 ## 线程基础 线程是操作系统调度的最小单位。C++ `std::thread` 封装了系统线程,支持函数、成员函数、Lambda等多种启动方式。 ### 线程创建和管理 创建线程时传入可调用对象及参数。线程对象销毁前必须 `join()` 或 `detach()`,否则程序终止。 ```cpp #include #include // 函数线程 void threadFunction(int id) { std::cout << "Thread " << id << " is running" << std::endl; } // 类成员函数线程 class ThreadClass { public: void memberFunction(int id) { std::cout << "Member function in thread " << id << std::endl; } }; // Lambda线程 void createThreads() { // 函数线程 std::thread t1(threadFunction, 1); // 类成员函数线程 ThreadClass obj; std::thread t2(&ThreadClass::memberFunction, &obj, 2); // Lambda线程 std::thread t3([](int id) { std::cout << "Lambda thread " << id << std::endl; }, 3); // 等待线程完成 t1.join(); t2.join(); t3.join(); } ``` ### 线程生命周期 线程有两种结束方式:`join()` 阻塞等待线程完成,`detach()` 分离线程使其在后台运行。选择错误导致资源泄漏或程序崩溃。 ```cpp #include #include void threadLifecycle() { std::thread t([]() { std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "Thread finished" << std::endl; }); // 检查线程是否可join if (t.joinable()) { std::cout << "Thread is joinable" << std::endl; } // 等待线程完成 t.join(); // 分离线程(不等待完成) // t.detach(); } ``` ## 互斥锁 多线程访问共享数据需要同步机制。互斥锁(Mutex)是最基本的同步原语,保证同一时刻只有一个线程访问临界区。 ### std::mutex `std::mutex` 提供基本互斥功能。`lock()` 获取锁,`unlock()` 释放锁。推荐使用RAII封装(如 `lock_guard`)避免忘记解锁。 ```cpp #include #include #include class Counter { private: int count = 0; std::mutex mtx; public: void increment() { std::lock_guard lock(mtx); ++count; } int getCount() const { std::lock_guard lock(mtx); return count; } }; void testMutex() { Counter counter; std::vector threads; // 创建多个线程 for (int i = 0; i < 10; ++i) { threads.emplace_back([&counter]() { for (int j = 0; j < 1000; ++j) { counter.increment(); } }); } // 等待所有线程完成 for (auto& t : threads) { t.join(); } std::cout << "Final count: " << counter.getCount() << std::endl; } ``` ### 递归互斥锁 `std::recursive_mutex` 允许同一线程多次获取锁。适用于递归调用或嵌套锁场景,但性能较低,优先重构代码避免使用。 ```cpp #include class RecursiveCounter { private: int count = 0; std::recursive_mutex mtx; public: void increment() { std::lock_guard lock(mtx); ++count; } void incrementTwice() { std::lock_guard lock(mtx); increment(); // 递归调用,需要递归锁 increment(); } int getCount() const { std::lock_guard lock(mtx); return count; } }; ``` ### 超时互斥锁 ```cpp #include #include class TimeoutExample { private: std::timed_mutex mtx; public: bool tryLockFor(int seconds) { return mtx.try_lock_for(std::chrono::seconds(seconds)); } bool tryLockUntil(std::chrono::steady_clock::time_point time) { return mtx.try_lock_until(time); } void unlock() { mtx.unlock(); } }; ``` ## 条件变量 条件变量用于线程间通信,实现等待/通知模式。线程可以等待某个条件成立,其他线程在条件改变时通知等待的线程。 ### 基本使用 `std::condition_variable` 必须与 `std::unique_lock` 配合。`wait()` 释放锁并阻塞,被通知后重新获取锁。防止虚假唤醒需要传入谓词。 ```cpp #include #include #include class ProducerConsumer { private: std::queue queue; std::mutex mtx; std::condition_variable cv; bool finished = false; public: void producer() { for (int i = 0; i < 10; ++i) { std::unique_lock lock(mtx); queue.push(i); std::cout << "Produced: " << i << std::endl; cv.notify_one(); // 通知等待的线程 } // 通知消费者结束 { std::unique_lock lock(mtx); finished = true; } cv.notify_all(); } void consumer() { while (true) { std::unique_lock lock(mtx); // 等待条件满足 cv.wait(lock, [this] { return !queue.empty() || finished; }); if (finished && queue.empty()) { break; } if (!queue.empty()) { int value = queue.front(); queue.pop(); std::cout << "Consumed: " << value << std::endl; } } } }; ``` ### 等待条件 ```cpp #include #include class WaitExample { private: std::mutex mtx; std::condition_variable cv; bool ready = false; public: void waitForReady() { std::unique_lock lock(mtx); // 等待条件满足 cv.wait(lock, [this] { return ready; }); std::cout << "Ready!" << std::endl; } void setReady() { { std::lock_guard lock(mtx); ready = true; } cv.notify_all(); } }; ``` ## 原子操作 原子操作是不可分割的操作,无需锁即可安全并发访问。`std::atomic` 提供无锁编程能力,性能高但使用复杂。 ### 基本原子类型 `std::atomic` 支持整型、指针、浮点型(C++20)。提供 `load`、`store`、`exchange`、`compare_exchange` 等操作。 ```cpp #include #include class AtomicCounter { private: std::atomic count{0}; public: void increment() { ++count; // 原子操作 } void add(int value) { count += value; // 原子操作 } int getCount() const { return count.load(); // 原子读取 } void setCount(int value) { count.store(value); // 原子写入 } bool compareAndSwap(int expected, int desired) { return count.compare_exchange_weak(expected, desired); } }; ``` ### 原子操作类型 ```cpp #include void atomicTypes() { // 基本原子类型 std::atomic atomicInt{42}; std::atomic atomicBool{true}; std::atomic atomicDouble{3.14}; // 原子指针 int value = 42; std::atomic atomicPtr{&value}; // 原子标志 std::atomic_flag flag = ATOMIC_FLAG_INIT; // 原子操作 atomicInt.fetch_add(10); // 原子加法 atomicInt.fetch_sub(5); // 原子减法 atomicInt.fetch_and(0xFF); // 原子按位与 atomicInt.fetch_or(0x100); // 原子按位或 atomicInt.fetch_xor(0x200); // 原子按位异或 } ``` ## 线程安全容器 ### 线程安全队列 ```cpp #include #include #include template class ThreadSafeQueue { private: std::queue queue; mutable std::mutex mtx; std::condition_variable cv; public: void push(T value) { std::lock_guard lock(mtx); queue.push(value); cv.notify_one(); } bool tryPop(T& value) { std::lock_guard lock(mtx); if (queue.empty()) { return false; } value = queue.front(); queue.pop(); return true; } void waitAndPop(T& value) { std::unique_lock lock(mtx); cv.wait(lock, [this] { return !queue.empty(); }); value = queue.front(); queue.pop(); } bool empty() const { std::lock_guard lock(mtx); return queue.empty(); } size_t size() const { std::lock_guard lock(mtx); return queue.size(); } }; ``` ### 线程安全单例 ```cpp #include #include class ThreadSafeSingleton { private: static std::unique_ptr instance; static std::mutex mtx; ThreadSafeSingleton() = default; public: static ThreadSafeSingleton& getInstance() { if (!instance) { std::lock_guard lock(mtx); if (!instance) { // 双重检查锁定 instance = std::unique_ptr(new ThreadSafeSingleton); } } return *instance; } // 禁用拷贝和赋值 ThreadSafeSingleton(const ThreadSafeSingleton&) = delete; ThreadSafeSingleton& operator=(const ThreadSafeSingleton&) = delete; }; // 静态成员定义 std::unique_ptr ThreadSafeSingleton::instance = nullptr; std::mutex ThreadSafeSingleton::mtx; ``` ## 线程池 ### 简单线程池实现 ```cpp #include #include #include #include #include #include #include class ThreadPool { private: std::vector workers; std::queue> tasks; std::mutex queueMutex; std::condition_variable condition; bool stop; public: ThreadPool(size_t threads) : stop(false) { for (size_t i = 0; i < threads; ++i) { workers.emplace_back([this] { while (true) { std::function task; { std::unique_lock lock(queueMutex); condition.wait(lock, [this] { return stop || !tasks.empty(); }); if (stop && tasks.empty()) { return; } task = std::move(tasks.front()); tasks.pop(); } task(); } }); } } template auto enqueue(F&& f, Args&&... args) -> std::future::type> { using return_type = typename std::result_of::type; auto task = std::make_shared>( std::bind(std::forward(f), std::forward(args)...) ); std::future res = task->get_future(); { std::unique_lock lock(queueMutex); if (stop) { throw std::runtime_error("enqueue on stopped ThreadPool"); } tasks.emplace([task] { (*task)(); }); } condition.notify_one(); return res; } ~ThreadPool() { { std::unique_lock lock(queueMutex); stop = true; } condition.notify_all(); for (std::thread& worker : workers) { worker.join(); } } }; ``` ## 异步编程 ### std::async ```cpp #include #include int longRunningTask(int value) { std::this_thread::sleep_for(std::chrono::seconds(2)); return value * value; } void asyncExample() { // 异步执行任务 std::future future = std::async(std::launch::async, longRunningTask, 5); // 做其他工作 std::cout << "Doing other work..." << std::endl; // 获取结果 int result = future.get(); std::cout << "Result: " << result << std::endl; } ``` ### std::promise和std::future ```cpp #include #include void promiseFutureExample() { std::promise promise; std::future future = promise.get_future(); std::thread t([&promise]() { std::this_thread::sleep_for(std::chrono::seconds(1)); promise.set_value(42); }); // 等待结果 int result = future.get(); std::cout << "Result: " << result << std::endl; t.join(); } ``` ## 常见并发问题与解决方案 ### 1. 死锁(Deadlock) **问题:两个线程互相等待对方持有的锁** ```cpp // ❌ 死锁示例 std::mutex mtx1, mtx2; void thread1() { std::lock_guard lock1(mtx1); std::this_thread::sleep_for(std::chrono::milliseconds(1)); std::lock_guard lock2(mtx2); // 等待mtx2 } void thread2() { std::lock_guard lock2(mtx2); std::this_thread::sleep_for(std::chrono::milliseconds(1)); std::lock_guard lock1(mtx1); // 等待mtx1(死锁!) } // ✅ 解决方案1:固定加锁顺序 void thread1() { std::lock_guard lock1(mtx1); // 总是先锁mtx1 std::lock_guard lock2(mtx2); } void thread2() { std::lock_guard lock1(mtx1); // 总是先锁mtx1 std::lock_guard lock2(mtx2); } // ✅ 解决方案2:同时获取多个锁 void safeFunction() { std::scoped_lock lock(mtx1, mtx2); // C++17,原子获取 // 或使用 std::lock(mtx1, mtx2) } ``` ### 2. 竞态条件(Race Condition) **问题:多线程无同步访问共享数据** ```cpp // ❌ 竞态条件 int counter = 0; void increment() { for (int i = 0; i < 1000; ++i) { counter++; // 非原子操作! // 实际是:read-modify-write三步,可能交错 } } // ✅ 解决方案1:互斥锁 std::mutex mtx; void increment() { for (int i = 0; i < 1000; ++i) { std::lock_guard lock(mtx); counter++; } } // ✅ 解决方案2:原子操作 std::atomic counter{0}; void increment() { for (int i = 0; i < 1000; ++i) { counter++; // 原子操作 } } ``` ### 3. 数据竞争(Data Race) **问题:至少一个线程写,另一个线程读/写,无同步** ```cpp // ❌ 数据竞争(未定义行为) std::vector vec; void writer() { vec.push_back(42); // 写 } void reader() { if (!vec.empty()) { int val = vec[0]; // 读(数据竞争!) } } // ✅ 解决方案:加锁 std::mutex mtx; std::vector vec; void writer() { std::lock_guard lock(mtx); vec.push_back(42); } void reader() { std::lock_guard lock(mtx); if (!vec.empty()) { int val = vec[0]; } } ``` ### 4. ABA问题 **问题:CAS操作中值从A变B再变回A,误以为未变** ```cpp // ❌ ABA问题 std::atomic head; void push(int val) { Node* newNode = new Node(val); Node* oldHead; do { oldHead = head.load(); newNode->next = oldHead; } while (!head.compare_exchange_weak(oldHead, newNode)); // 如果oldHead在中间被删除又分配了同地址的新节点,CAS成功但逻辑错误 } // ✅ 解决方案:版本号或使用智能指针 struct VersionedPtr { Node* ptr; size_t version; }; std::atomic head; ``` ### 5. 虚假唤醒(Spurious Wakeup) **问题:条件变量可能无故唤醒** ```cpp // ❌ 未处理虚假唤醒 std::condition_variable cv; std::mutex mtx; bool ready = false; void wait() { std::unique_lock lock(mtx); cv.wait(lock); // 可能虚假唤醒 // ready可能仍为false! } // ✅ 解决方案:使用谓词 void wait() { std::unique_lock lock(mtx); cv.wait(lock, [] { return ready; }); // 循环检查条件 } ``` ### 并发编程原则 1. **最小化共享数据**:减少锁竞争 2. **优先使用高层抽象**:`std::async` > `std::thread` 3. **避免嵌套锁**:易死锁,用 `std::scoped_lock` 4. **用RAII管理锁**:`lock_guard`/`unique_lock` 5. **移动而非拷贝**:线程间传递大对象 6. **无锁优于有锁**:原子操作 > 互斥锁 7. **读多写少用读写锁**:`shared_mutex`(C++17) 8. **测试并发代码**:使用ThreadSanitizer检测数据竞争 **核心思想:并发很难,能避免就避免;不能避免就用标准库;必须手写就充分测试。**