并发与同步
💡 核心结论
临界区问题的三个条件:互斥、进步、有限等待
锁的实现需要硬件支持(原子指令)
信号量解决同步问题,互斥锁解决互斥问题
条件变量必须与互斥锁配合使用
避免死锁的关键是破坏其四个必要条件之一
1. 临界区问题
1.1 竞态条件(Race Condition)
示例:银行账户
int balance = 1000;
// 线程1:取款500
void withdraw() {
int temp = balance; // 读取1000
temp = temp - 500; // 计算500
balance = temp; // 写回500
}
// 线程2:取款300
void withdraw2() {
int temp = balance; // 读取1000(!)
temp = temp - 300; // 计算700
balance = temp; // 写回700(!)
}
// 结果:balance = 700 或 500(错误!应该是200)
1.2 临界区解决方案要求
三个条件:
互斥(Mutual Exclusion):一次只有一个进程在临界区
进步(Progress):临界区外的进程不能阻止其他进程进入
有限等待(Bounded Waiting):请求进入的进程不能无限等待
2. 锁机制
2.1 自旋锁(Spinlock)
原理:忙等待
typedef struct {
int locked; // 0=未锁,1=已锁
} spinlock_t;
void spin_lock(spinlock_t *lock) {
while (test_and_set(&lock->locked)) {
// 忙等待(spin)
}
}
void spin_unlock(spinlock_t *lock) {
lock->locked = 0;
}
test_and_set原子指令:
// 硬件提供的原子操作
int test_and_set(int *ptr) {
int old = *ptr;
*ptr = 1;
return old;
}
// x86汇编实现
lock:
movl $1, %eax
xchgl %eax, (%rdi) // 原子交换
ret
compare_and_swap(CAS):
int compare_and_swap(int *ptr, int expected, int new_value) {
int actual = *ptr;
if (actual == expected) {
*ptr = new_value;
}
return actual;
}
// 用CAS实现自旋锁
void spin_lock_cas(spinlock_t *lock) {
while (compare_and_swap(&lock->locked, 0, 1) != 0) {
// 忙等待
}
}
优缺点:
✅ 简单、快速(临界区短时)
✅ 无上下文切换
❌ CPU时间浪费(忙等待)
❌ 优先级反转问题
2.2 互斥锁(Mutex)
原理:阻塞等待
typedef struct {
int locked;
queue_t wait_queue;
} mutex_t;
void mutex_lock(mutex_t *lock) {
while (test_and_set(&lock->locked)) {
// 加入等待队列
enqueue(&lock->wait_queue, current_thread);
// 休眠(释放CPU)
park();
}
}
void mutex_unlock(mutex_t *lock) {
lock->locked = 0;
// 唤醒一个等待线程
thread_t *t = dequeue(&lock->wait_queue);
if (t) {
unpark(t);
}
}
POSIX互斥锁:
#include <pthread.h>
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
void safe_increment() {
pthread_mutex_lock(&lock);
count++; // 临界区
pthread_mutex_unlock(&lock);
}
2.3 读写锁(RWLock)
原理:多个读者,一个写者
typedef struct {
int readers; // 当前读者数
int writer; // 是否有写者
queue_t read_queue;
queue_t write_queue;
} rwlock_t;
void read_lock(rwlock_t *lock) {
while (lock->writer) {
wait(&lock->read_queue);
}
lock->readers++;
}
void read_unlock(rwlock_t *lock) {
lock->readers--;
if (lock->readers == 0) {
// 通知等待的写者
signal_one(&lock->write_queue);
}
}
void write_lock(rwlock_t *lock) {
while (lock->writer || lock->readers > 0) {
wait(&lock->write_queue);
}
lock->writer = 1;
}
void write_unlock(rwlock_t *lock) {
lock->writer = 0;
// 优先唤醒写者还是读者?
if (!queue_empty(&lock->write_queue)) {
signal_one(&lock->write_queue);
} else {
signal_all(&lock->read_queue);
}
}
使用场景:读多写少
3. 信号量(Semaphore)
3.1 二元信号量
原理:计数器(0或1)
typedef struct {
int value;
queue_t wait_queue;
} semaphore_t;
void sem_wait(semaphore_t *sem) { // P操作
sem->value--;
if (sem->value < 0) {
// 阻塞
enqueue(&sem->wait_queue, current_thread);
park();
}
}
void sem_post(semaphore_t *sem) { // V操作
sem->value++;
if (sem->value <= 0) {
// 唤醒一个等待线程
thread_t *t = dequeue(&sem->wait_queue);
unpark(t);
}
}
用作互斥锁:
semaphore_t mutex;
sem_init(&mutex, 1); // 初始值1
void critical_section() {
sem_wait(&mutex);
// 临界区
sem_post(&mutex);
}
3.2 计数信号量
用途:限制资源访问数量
// 限制最多N个线程同时访问
semaphore_t sem;
sem_init(&sem, N);
void access_resource() {
sem_wait(&sem);
// 使用资源
sem_post(&sem);
}
3.3 经典问题:生产者-消费者
#define BUFFER_SIZE 10
int buffer[BUFFER_SIZE];
int in = 0, out = 0;
semaphore_t empty; // 空槽位数
semaphore_t full; // 满槽位数
semaphore_t mutex; // 互斥访问buffer
void init() {
sem_init(&empty, BUFFER_SIZE);
sem_init(&full, 0);
sem_init(&mutex, 1);
}
void producer() {
while (1) {
int item = produce_item();
sem_wait(&empty); // 等待空槽位
sem_wait(&mutex); // 进入临界区
buffer[in] = item;
in = (in + 1) % BUFFER_SIZE;
sem_post(&mutex); // 离开临界区
sem_post(&full); // 增加满槽位
}
}
void consumer() {
while (1) {
sem_wait(&full); // 等待满槽位
sem_wait(&mutex); // 进入临界区
int item = buffer[out];
out = (out + 1) % BUFFER_SIZE;
sem_post(&mutex); // 离开临界区
sem_post(&empty); // 增加空槽位
consume_item(item);
}
}
3.4 经典问题:读者-写者
int readers = 0;
semaphore_t mutex; // 保护readers计数
semaphore_t write_lock; // 写锁
void reader() {
while (1) {
sem_wait(&mutex);
readers++;
if (readers == 1) {
sem_wait(&write_lock); // 第一个读者获取写锁
}
sem_post(&mutex);
// 读取数据
read_data();
sem_wait(&mutex);
readers--;
if (readers == 0) {
sem_post(&write_lock); // 最后一个读者释放写锁
}
sem_post(&mutex);
}
}
void writer() {
while (1) {
sem_wait(&write_lock);
// 写入数据
write_data();
sem_post(&write_lock);
}
}
问题:写者可能饥饿(读者不断)
解决:引入写者优先
int readers = 0, writers = 0;
semaphore_t mutex, r_mutex, w_mutex;
void reader() {
sem_wait(&r_mutex);
sem_wait(&mutex);
readers++;
if (readers == 1) sem_wait(&write_lock);
sem_post(&mutex);
sem_post(&r_mutex);
read_data();
sem_wait(&mutex);
readers--;
if (readers == 0) sem_post(&write_lock);
sem_post(&mutex);
}
void writer() {
sem_wait(&w_mutex);
writers++;
if (writers == 1) sem_wait(&r_mutex); // 阻止新读者
sem_post(&w_mutex);
sem_wait(&write_lock);
write_data();
sem_post(&write_lock);
sem_wait(&w_mutex);
writers--;
if (writers == 0) sem_post(&r_mutex);
sem_post(&w_mutex);
}
4. 条件变量(Condition Variable)
4.1 基本操作
typedef struct {
queue_t wait_queue;
} cond_t;
void cond_wait(cond_t *cond, mutex_t *mutex) {
// 1. 释放锁
mutex_unlock(mutex);
// 2. 加入等待队列并休眠
enqueue(&cond->wait_queue, current_thread);
park();
// 3. 被唤醒后重新获取锁
mutex_lock(mutex);
}
void cond_signal(cond_t *cond) {
// 唤醒一个等待线程
thread_t *t = dequeue(&cond->wait_queue);
if (t) unpark(t);
}
void cond_broadcast(cond_t *cond) {
// 唤醒所有等待线程
while (!queue_empty(&cond->wait_queue)) {
thread_t *t = dequeue(&cond->wait_queue);
unpark(t);
}
}
4.2 生产者-消费者(条件变量版)
mutex_t mutex;
cond_t not_full, not_empty;
int count = 0;
int buffer[BUFFER_SIZE];
void producer() {
while (1) {
int item = produce_item();
pthread_mutex_lock(&mutex);
while (count == BUFFER_SIZE) {
pthread_cond_wait(¬_full, &mutex); // 等待非满
}
buffer[count++] = item;
pthread_cond_signal(¬_empty); // 通知非空
pthread_mutex_unlock(&mutex);
}
}
void consumer() {
while (1) {
pthread_mutex_lock(&mutex);
while (count == 0) {
pthread_cond_wait(¬_empty, &mutex); // 等待非空
}
int item = buffer[--count];
pthread_cond_signal(¬_full); // 通知非满
pthread_mutex_unlock(&mutex);
consume_item(item);
}
}
为什么用while而不是if?
// 错误:用if
if (count == 0) {
pthread_cond_wait(¬_empty, &mutex);
}
// 唤醒后不检查,可能count仍为0(虚假唤醒)
// 正确:用while
while (count == 0) {
pthread_cond_wait(¬_empty, &mutex);
}
// 唤醒后重新检查条件
5. 管程(Monitor)
概念:封装共享数据和同步操作
class BoundedBuffer {
private:
int buffer[SIZE];
int count = 0;
mutex_t mutex;
cond_t not_full, not_empty;
public:
void put(int item) {
mutex_lock(&mutex);
while (count == SIZE) {
cond_wait(¬_full, &mutex);
}
buffer[count++] = item;
cond_signal(¬_empty);
mutex_unlock(&mutex);
}
int get() {
mutex_lock(&mutex);
while (count == 0) {
cond_wait(¬_empty, &mutex);
}
int item = buffer[--count];
cond_signal(¬_full);
mutex_unlock(&mutex);
return item;
}
};
Java synchronized:
class BoundedBuffer {
private int[] buffer = new int[SIZE];
private int count = 0;
public synchronized void put(int item) throws InterruptedException {
while (count == SIZE) {
wait(); // 等待非满
}
buffer[count++] = item;
notifyAll();
}
public synchronized int get() throws InterruptedException {
while (count == 0) {
wait(); // 等待非空
}
int item = buffer[--count];
notifyAll();
return item;
}
}
6. 无锁编程
6.1 原子操作
#include <stdatomic.h>
atomic_int counter = 0;
void increment() {
atomic_fetch_add(&counter, 1); // 原子加
}
// CAS循环
void cas_increment() {
int old, new;
do {
old = atomic_load(&counter);
new = old + 1;
} while (!atomic_compare_exchange_weak(&counter, &old, new));
}
6.2 无锁栈
typedef struct node {
int value;
struct node *next;
} node_t;
atomic_ptr top = NULL;
void push(int value) {
node_t *new_node = malloc(sizeof(node_t));
new_node->value = value;
node_t *old_top;
do {
old_top = atomic_load(&top);
new_node->next = old_top;
} while (!atomic_compare_exchange_weak(&top, &old_top, new_node));
}
int pop() {
node_t *old_top, *new_top;
do {
old_top = atomic_load(&top);
if (old_top == NULL) return -1; // 空栈
new_top = old_top->next;
} while (!atomic_compare_exchange_weak(&top, &old_top, new_top));
int value = old_top->value;
free(old_top); // ABA问题!
return value;
}
ABA问题:
线程1:读取top=A
线程2:pop A, pop B, push A
线程1:CAS成功(top还是A,但已经不是原来的A!)
解决:版本号
typedef struct {
node_t *ptr;
int version;
} versioned_ptr;
7. 常见并发bug
7.1 死锁
mutex_t lock1, lock2;
// 线程1
void thread1() {
pthread_mutex_lock(&lock1);
pthread_mutex_lock(&lock2); // 等待线程2释放lock2
// ...
pthread_mutex_unlock(&lock2);
pthread_mutex_unlock(&lock1);
}
// 线程2
void thread2() {
pthread_mutex_lock(&lock2);
pthread_mutex_lock(&lock1); // 等待线程1释放lock1
// ...
pthread_mutex_unlock(&lock1);
pthread_mutex_unlock(&lock2);
}
解决:锁排序
void acquire_both_locks() {
if (&lock1 < &lock2) {
pthread_mutex_lock(&lock1);
pthread_mutex_lock(&lock2);
} else {
pthread_mutex_lock(&lock2);
pthread_mutex_lock(&lock1);
}
}
7.2 活锁(Livelock)
// 两个线程都在尝试让对方先执行
while (other_is_waiting) {
yield(); // 无限循环
}
7.3 优先级反转
H: 高优先级(等待锁)
M: 中优先级(运行)
L: 低优先级(持有锁)
H等待L,但M抢占L,导致H间接等待M
解决:优先级继承
8. 性能优化
8.1 减少锁竞争
// 粗粒度锁(高竞争)
mutex_t global_lock;
void increment_all() {
pthread_mutex_lock(&global_lock);
for (int i = 0; i < N; i++) {
counters[i]++;
}
pthread_mutex_unlock(&global_lock);
}
// 细粒度锁(低竞争)
mutex_t locks[N];
void increment_one(int i) {
pthread_mutex_lock(&locks[i]);
counters[i]++;
pthread_mutex_unlock(&locks[i]);
}
8.2 无锁数据结构
// 用原子操作代替锁
atomic_int counter = 0;
void fast_increment() {
atomic_fetch_add(&counter, 1); // 无锁
}
9. 常见问题
Q1: 自旋锁 vs 互斥锁?
A:
自旋锁:临界区短(<微秒)、多核、不能休眠
互斥锁:临界区长、单核、可以休眠
Q2: 信号量 vs 条件变量?
A:
信号量:计数、不需要锁、P/V操作可分离
条件变量:必须与锁配合、等待特定条件
Q3: 如何避免死锁?
A:
锁排序
超时机制
尝试锁(trylock)
避免嵌套锁
Q4: volatile vs atomic?
A:
volatile:防止编译器优化,不保证原子性
atomic:保证原子性和可见性
LeetCode相关题目
参考资源
《Operating Systems: Three Easy Pieces》- Concurrency章节
《The Art of Multiprocessor Programming》
Linux源码:
kernel/locking/POSIX线程编程指南