08-并发编程基础
并发是现代程序充分利用多核CPU的关键。C++11引入标准线程库,提供跨平台的并发支持。
线程基础
线程是操作系统调度的最小单位。C++ std::thread 封装了系统线程,支持函数、成员函数、Lambda等多种启动方式。
线程创建和管理
创建线程时传入可调用对象及参数。线程对象销毁前必须 join() 或 detach(),否则程序终止。
#include <thread>
#include <iostream>
// 函数线程
void threadFunction(int id) {
std::cout << "Thread " << id << " is running" << std::endl;
}
// 类成员函数线程
class ThreadClass {
public:
void memberFunction(int id) {
std::cout << "Member function in thread " << id << std::endl;
}
};
// Lambda线程
void createThreads() {
// 函数线程
std::thread t1(threadFunction, 1);
// 类成员函数线程
ThreadClass obj;
std::thread t2(&ThreadClass::memberFunction, &obj, 2);
// Lambda线程
std::thread t3([](int id) {
std::cout << "Lambda thread " << id << std::endl;
}, 3);
// 等待线程完成
t1.join();
t2.join();
t3.join();
}
线程生命周期
线程有两种结束方式:join() 阻塞等待线程完成,detach() 分离线程使其在后台运行。选择错误导致资源泄漏或程序崩溃。
#include <thread>
#include <chrono>
void threadLifecycle() {
std::thread t([]() {
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Thread finished" << std::endl;
});
// 检查线程是否可join
if (t.joinable()) {
std::cout << "Thread is joinable" << std::endl;
}
// 等待线程完成
t.join();
// 分离线程(不等待完成)
// t.detach();
}
互斥锁
多线程访问共享数据需要同步机制。互斥锁(Mutex)是最基本的同步原语,保证同一时刻只有一个线程访问临界区。
std::mutex
std::mutex 提供基本互斥功能。lock() 获取锁,unlock() 释放锁。推荐使用RAII封装(如 lock_guard)避免忘记解锁。
#include <mutex>
#include <thread>
#include <vector>
class Counter {
private:
int count = 0;
std::mutex mtx;
public:
void increment() {
std::lock_guard<std::mutex> lock(mtx);
++count;
}
int getCount() const {
std::lock_guard<std::mutex> lock(mtx);
return count;
}
};
void testMutex() {
Counter counter;
std::vector<std::thread> threads;
// 创建多个线程
for (int i = 0; i < 10; ++i) {
threads.emplace_back([&counter]() {
for (int j = 0; j < 1000; ++j) {
counter.increment();
}
});
}
// 等待所有线程完成
for (auto& t : threads) {
t.join();
}
std::cout << "Final count: " << counter.getCount() << std::endl;
}
递归互斥锁
std::recursive_mutex 允许同一线程多次获取锁。适用于递归调用或嵌套锁场景,但性能较低,优先重构代码避免使用。
#include <mutex>
class RecursiveCounter {
private:
int count = 0;
std::recursive_mutex mtx;
public:
void increment() {
std::lock_guard<std::recursive_mutex> lock(mtx);
++count;
}
void incrementTwice() {
std::lock_guard<std::recursive_mutex> lock(mtx);
increment(); // 递归调用,需要递归锁
increment();
}
int getCount() const {
std::lock_guard<std::recursive_mutex> lock(mtx);
return count;
}
};
超时互斥锁
#include <mutex>
#include <chrono>
class TimeoutExample {
private:
std::timed_mutex mtx;
public:
bool tryLockFor(int seconds) {
return mtx.try_lock_for(std::chrono::seconds(seconds));
}
bool tryLockUntil(std::chrono::steady_clock::time_point time) {
return mtx.try_lock_until(time);
}
void unlock() {
mtx.unlock();
}
};
条件变量
条件变量用于线程间通信,实现等待/通知模式。线程可以等待某个条件成立,其他线程在条件改变时通知等待的线程。
基本使用
std::condition_variable 必须与 std::unique_lock 配合。wait() 释放锁并阻塞,被通知后重新获取锁。防止虚假唤醒需要传入谓词。
#include <condition_variable>
#include <mutex>
#include <queue>
class ProducerConsumer {
private:
std::queue<int> queue;
std::mutex mtx;
std::condition_variable cv;
bool finished = false;
public:
void producer() {
for (int i = 0; i < 10; ++i) {
std::unique_lock<std::mutex> lock(mtx);
queue.push(i);
std::cout << "Produced: " << i << std::endl;
cv.notify_one(); // 通知等待的线程
}
// 通知消费者结束
{
std::unique_lock<std::mutex> lock(mtx);
finished = true;
}
cv.notify_all();
}
void consumer() {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
// 等待条件满足
cv.wait(lock, [this] { return !queue.empty() || finished; });
if (finished && queue.empty()) {
break;
}
if (!queue.empty()) {
int value = queue.front();
queue.pop();
std::cout << "Consumed: " << value << std::endl;
}
}
}
};
等待条件
#include <condition_variable>
#include <mutex>
class WaitExample {
private:
std::mutex mtx;
std::condition_variable cv;
bool ready = false;
public:
void waitForReady() {
std::unique_lock<std::mutex> lock(mtx);
// 等待条件满足
cv.wait(lock, [this] { return ready; });
std::cout << "Ready!" << std::endl;
}
void setReady() {
{
std::lock_guard<std::mutex> lock(mtx);
ready = true;
}
cv.notify_all();
}
};
原子操作
原子操作是不可分割的操作,无需锁即可安全并发访问。std::atomic 提供无锁编程能力,性能高但使用复杂。
基本原子类型
std::atomic<T> 支持整型、指针、浮点型(C++20)。提供 load、store、exchange、compare_exchange 等操作。
#include <atomic>
#include <thread>
class AtomicCounter {
private:
std::atomic<int> count{0};
public:
void increment() {
++count; // 原子操作
}
void add(int value) {
count += value; // 原子操作
}
int getCount() const {
return count.load(); // 原子读取
}
void setCount(int value) {
count.store(value); // 原子写入
}
bool compareAndSwap(int expected, int desired) {
return count.compare_exchange_weak(expected, desired);
}
};
原子操作类型
#include <atomic>
void atomicTypes() {
// 基本原子类型
std::atomic<int> atomicInt{42};
std::atomic<bool> atomicBool{true};
std::atomic<double> atomicDouble{3.14};
// 原子指针
int value = 42;
std::atomic<int*> atomicPtr{&value};
// 原子标志
std::atomic_flag flag = ATOMIC_FLAG_INIT;
// 原子操作
atomicInt.fetch_add(10); // 原子加法
atomicInt.fetch_sub(5); // 原子减法
atomicInt.fetch_and(0xFF); // 原子按位与
atomicInt.fetch_or(0x100); // 原子按位或
atomicInt.fetch_xor(0x200); // 原子按位异或
}
线程安全容器
线程安全队列
#include <queue>
#include <mutex>
#include <condition_variable>
template<typename T>
class ThreadSafeQueue {
private:
std::queue<T> queue;
mutable std::mutex mtx;
std::condition_variable cv;
public:
void push(T value) {
std::lock_guard<std::mutex> lock(mtx);
queue.push(value);
cv.notify_one();
}
bool tryPop(T& value) {
std::lock_guard<std::mutex> lock(mtx);
if (queue.empty()) {
return false;
}
value = queue.front();
queue.pop();
return true;
}
void waitAndPop(T& value) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [this] { return !queue.empty(); });
value = queue.front();
queue.pop();
}
bool empty() const {
std::lock_guard<std::mutex> lock(mtx);
return queue.empty();
}
size_t size() const {
std::lock_guard<std::mutex> lock(mtx);
return queue.size();
}
};
线程安全单例
#include <mutex>
#include <memory>
class ThreadSafeSingleton {
private:
static std::unique_ptr<ThreadSafeSingleton> instance;
static std::mutex mtx;
ThreadSafeSingleton() = default;
public:
static ThreadSafeSingleton& getInstance() {
if (!instance) {
std::lock_guard<std::mutex> lock(mtx);
if (!instance) { // 双重检查锁定
instance = std::unique_ptr<ThreadSafeSingleton>(new ThreadSafeSingleton);
}
}
return *instance;
}
// 禁用拷贝和赋值
ThreadSafeSingleton(const ThreadSafeSingleton&) = delete;
ThreadSafeSingleton& operator=(const ThreadSafeSingleton&) = delete;
};
// 静态成员定义
std::unique_ptr<ThreadSafeSingleton> ThreadSafeSingleton::instance = nullptr;
std::mutex ThreadSafeSingleton::mtx;
线程池
简单线程池实现
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
class ThreadPool {
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queueMutex;
std::condition_variable condition;
bool stop;
public:
ThreadPool(size_t threads) : stop(false) {
for (size_t i = 0; i < threads; ++i) {
workers.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex);
condition.wait(lock, [this] { return stop || !tasks.empty(); });
if (stop && tasks.empty()) {
return;
}
task = std::move(tasks.front());
tasks.pop();
}
task();
}
});
}
}
template<typename F, typename... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queueMutex);
if (stop) {
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks.emplace([task] { (*task)(); });
}
condition.notify_one();
return res;
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
}
condition.notify_all();
for (std::thread& worker : workers) {
worker.join();
}
}
};
异步编程
std::async
#include <future>
#include <chrono>
int longRunningTask(int value) {
std::this_thread::sleep_for(std::chrono::seconds(2));
return value * value;
}
void asyncExample() {
// 异步执行任务
std::future<int> future = std::async(std::launch::async, longRunningTask, 5);
// 做其他工作
std::cout << "Doing other work..." << std::endl;
// 获取结果
int result = future.get();
std::cout << "Result: " << result << std::endl;
}
std::promise和std::future
#include <future>
#include <thread>
void promiseFutureExample() {
std::promise<int> promise;
std::future<int> future = promise.get_future();
std::thread t([&promise]() {
std::this_thread::sleep_for(std::chrono::seconds(1));
promise.set_value(42);
});
// 等待结果
int result = future.get();
std::cout << "Result: " << result << std::endl;
t.join();
}
常见并发问题与解决方案
1. 死锁(Deadlock)
问题:两个线程互相等待对方持有的锁
// ❌ 死锁示例
std::mutex mtx1, mtx2;
void thread1() {
std::lock_guard<std::mutex> lock1(mtx1);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
std::lock_guard<std::mutex> lock2(mtx2); // 等待mtx2
}
void thread2() {
std::lock_guard<std::mutex> lock2(mtx2);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
std::lock_guard<std::mutex> lock1(mtx1); // 等待mtx1(死锁!)
}
// ✅ 解决方案1:固定加锁顺序
void thread1() {
std::lock_guard<std::mutex> lock1(mtx1); // 总是先锁mtx1
std::lock_guard<std::mutex> lock2(mtx2);
}
void thread2() {
std::lock_guard<std::mutex> lock1(mtx1); // 总是先锁mtx1
std::lock_guard<std::mutex> lock2(mtx2);
}
// ✅ 解决方案2:同时获取多个锁
void safeFunction() {
std::scoped_lock lock(mtx1, mtx2); // C++17,原子获取
// 或使用 std::lock(mtx1, mtx2)
}
2. 竞态条件(Race Condition)
问题:多线程无同步访问共享数据
// ❌ 竞态条件
int counter = 0;
void increment() {
for (int i = 0; i < 1000; ++i) {
counter++; // 非原子操作!
// 实际是:read-modify-write三步,可能交错
}
}
// ✅ 解决方案1:互斥锁
std::mutex mtx;
void increment() {
for (int i = 0; i < 1000; ++i) {
std::lock_guard<std::mutex> lock(mtx);
counter++;
}
}
// ✅ 解决方案2:原子操作
std::atomic<int> counter{0};
void increment() {
for (int i = 0; i < 1000; ++i) {
counter++; // 原子操作
}
}
3. 数据竞争(Data Race)
问题:至少一个线程写,另一个线程读/写,无同步
// ❌ 数据竞争(未定义行为)
std::vector<int> vec;
void writer() {
vec.push_back(42); // 写
}
void reader() {
if (!vec.empty()) {
int val = vec[0]; // 读(数据竞争!)
}
}
// ✅ 解决方案:加锁
std::mutex mtx;
std::vector<int> vec;
void writer() {
std::lock_guard<std::mutex> lock(mtx);
vec.push_back(42);
}
void reader() {
std::lock_guard<std::mutex> lock(mtx);
if (!vec.empty()) {
int val = vec[0];
}
}
4. ABA问题
问题:CAS操作中值从A变B再变回A,误以为未变
// ❌ ABA问题
std::atomic<Node*> head;
void push(int val) {
Node* newNode = new Node(val);
Node* oldHead;
do {
oldHead = head.load();
newNode->next = oldHead;
} while (!head.compare_exchange_weak(oldHead, newNode));
// 如果oldHead在中间被删除又分配了同地址的新节点,CAS成功但逻辑错误
}
// ✅ 解决方案:版本号或使用智能指针
struct VersionedPtr {
Node* ptr;
size_t version;
};
std::atomic<VersionedPtr> head;
5. 虚假唤醒(Spurious Wakeup)
问题:条件变量可能无故唤醒
// ❌ 未处理虚假唤醒
std::condition_variable cv;
std::mutex mtx;
bool ready = false;
void wait() {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock); // 可能虚假唤醒
// ready可能仍为false!
}
// ✅ 解决方案:使用谓词
void wait() {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [] { return ready; }); // 循环检查条件
}
并发编程原则
最小化共享数据:减少锁竞争
优先使用高层抽象:
std::async>std::thread避免嵌套锁:易死锁,用
std::scoped_lock用RAII管理锁:
lock_guard/unique_lock移动而非拷贝:线程间传递大对象
无锁优于有锁:原子操作 > 互斥锁
读多写少用读写锁:
shared_mutex(C++17)测试并发代码:使用ThreadSanitizer检测数据竞争
核心思想:并发很难,能避免就避免;不能避免就用标准库;必须手写就充分测试。