死锁处理

💡 核心结论

  1. 死锁四大必要条件:互斥、占有并等待、非抢占、循环等待

  2. 破坏任意一个条件即可预防死锁

  3. 银行家算法是安全性检查的经典算法

  4. 死锁检测需要资源分配图,查找环

  5. 实际系统多采用鸵鸟策略(忽略死锁)


1. 死锁定义

死锁(Deadlock):多个进程永久等待对方持有的资源

1.1 经典例子

mutex_t lock_A, lock_B;

// 进程1
void process1() {
    lock(lock_A);
    sleep(1);         // 等待,让进程2获得lock_B
    lock(lock_B);     // 死锁:等待进程2释放lock_B
    // ...
    unlock(lock_B);
    unlock(lock_A);
}

// 进程2
void process2() {
    lock(lock_B);
    sleep(1);         // 等待,让进程1获得lock_A
    lock(lock_A);     // 死锁:等待进程1释放lock_A
    // ...
    unlock(lock_A);
    unlock(lock_B);
}

结果

进程1持有A,等待B
进程2持有B,等待A
↓
死锁!

2. 死锁四大条件

2.1 互斥(Mutual Exclusion)

定义:资源同时只能被一个进程使用

mutex_t lock;

// 互斥访问临界区
void critical_section() {
    lock(lock);
    // 只有一个进程能进入
    unlock(lock);
}

2.2 占有并等待(Hold and Wait)

定义:进程持有资源的同时等待其他资源

void process() {
    lock(resource1);    // 持有resource1
    // ...
    lock(resource2);    // 等待resource2
}

2.3 非抢占(No Preemption)

定义:资源不能被强制抢占,只能主动释放

// 进程持有锁后,其他进程不能强制夺走
lock(lock);
// 必须由进程自己释放
unlock(lock);

2.4 循环等待(Circular Wait)

定义:存在进程环路,每个进程等待下一个进程的资源

P1 → R1 → P2 → R2 → P3 → R3 → P1
(P1等R1,R1被P2持有,P2等R2,R2被P3持有,P3等R3,R3被P1持有)

3. 死锁预防

原理:破坏四大条件之一

3.1 破坏互斥条件

方法:资源共享(不现实)

// 只读资源可以共享
int shared_data;  // 多个进程可同时读取

// 但某些资源必须互斥
mutex_t printer;  // 打印机不能共享

结论:通常无法破坏(资源本质决定)

3.2 破坏占有并等待

方法1:一次性申请所有资源

void process() {
    // 一次性获取所有需要的锁
    lock_all(lock_A, lock_B, lock_C);
    
    // 使用资源
    
    unlock_all(lock_A, lock_B, lock_C);
}

缺点

  • 资源利用率低

  • 可能导致饥饿

  • 难以预知所有需要的资源

方法2:申请资源前释放已持有的资源

void process() {
    while (1) {
        lock(lock_A);
        
        if (trylock(lock_B) == SUCCESS) {
            // 成功获取两个锁
            break;
        } else {
            // 失败,释放已持有的锁
            unlock(lock_A);
            // 等待后重试
            sleep(random());
        }
    }
    
    // 使用资源
    unlock(lock_B);
    unlock(lock_A);
}

3.3 破坏非抢占条件

方法:允许抢占

void process() {
    lock(lock_A);
    
    if (trylock(lock_B) == FAILED) {
        // lock_B被占用,释放lock_A
        unlock(lock_A);
        // 等待后重试
        wait_and_retry();
    }
    
    // 使用资源
    unlock(lock_B);
    unlock(lock_A);
}

问题

  • 实现复杂

  • 可能导致活锁

  • 某些资源不能抢占(如打印机)

3.4 破坏循环等待(最常用)

方法:资源排序,按序申请

// 给锁分配全局顺序
#define LOCK_A_ID 1
#define LOCK_B_ID 2
#define LOCK_C_ID 3

void lock_in_order(mutex_t *lock1, int id1, 
                  mutex_t *lock2, int id2) {
    if (id1 < id2) {
        lock(lock1);
        lock(lock2);
    } else {
        lock(lock2);
        lock(lock1);
    }
}

// 使用
void process() {
    lock_in_order(&lock_A, LOCK_A_ID, &lock_B, LOCK_B_ID);
    // ...
    unlock(lock_B);
    unlock(lock_A);
}

Linux内核示例

// 两个inode锁,按地址排序
if (inode1 < inode2) {
    mutex_lock(&inode1->i_mutex);
    mutex_lock(&inode2->i_mutex);
} else {
    mutex_lock(&inode2->i_mutex);
    mutex_lock(&inode1->i_mutex);
}

4. 死锁避免

原理:动态检查资源分配安全性

4.1 银行家算法

概念:类比银行贷款,确保系统处于安全状态

// 系统状态
int available[M];           // 可用资源
int max[N][M];             // 最大需求
int allocation[N][M];      // 已分配
int need[N][M];            // 还需要 = max - allocation

// 安全性检查
bool is_safe() {
    int work[M];
    bool finish[N];
    
    // 初始化
    for (int i = 0; i < M; i++) {
        work[i] = available[i];
    }
    for (int i = 0; i < N; i++) {
        finish[i] = false;
    }
    
    // 查找安全序列
    while (true) {
        bool found = false;
        
        for (int i = 0; i < N; i++) {
            if (!finish[i] && need[i] <= work) {
                // 进程i可以完成
                for (int j = 0; j < M; j++) {
                    work[j] += allocation[i][j];
                }
                finish[i] = true;
                found = true;
            }
        }
        
        if (!found) break;
    }
    
    // 检查是否所有进程都能完成
    for (int i = 0; i < N; i++) {
        if (!finish[i]) return false;
    }
    return true;
}

// 资源请求
bool request_resources(int process, int request[M]) {
    // 1. 检查请求是否超过需求
    if (request > need[process]) {
        return false;
    }
    
    // 2. 检查资源是否足够
    if (request > available) {
        return false;  // 等待
    }
    
    // 3. 试探性分配
    available -= request;
    allocation[process] += request;
    need[process] -= request;
    
    // 4. 安全性检查
    if (is_safe()) {
        return true;  // 安全,分配成功
    } else {
        // 不安全,回滚
        available += request;
        allocation[process] -= request;
        need[process] += request;
        return false;
    }
}

示例

3个进程,3种资源(A=10, B=5, C=7)

进程  Max      Allocation  Need      Available
     A B C     A B C       A B C     A B C
P0   7 5 3     0 1 0       7 4 3     3 3 2
P1   3 2 2     2 0 0       1 2 2
P2   9 0 2     3 0 2       6 0 0

安全序列:P1 → P0 → P2
1. P1完成,释放(2,0,0),可用(5,3,2)
2. P0完成,释放(0,1,0),可用(5,4,2)
3. P2完成,释放(3,0,2),可用(8,4,4)

缺点

  • 进程数和资源类型固定

  • 需要预知最大需求

  • 实现复杂

  • 很少使用


5. 死锁检测

原理:允许死锁发生,定期检测并恢复

5.1 资源分配图

进程 → 资源:请求边
资源 → 进程:分配边

死锁 ⇔ 图中有环

示例

P1 → R1 → P2 → R2 → P1
(有环,死锁)

P1 → R1 → P2 → R2
          ↓
         P3
(无环,无死锁)

5.2 检测算法

struct process {
    int id;
    int request[M];    // 请求的资源
    int allocation[M]; // 已分配的资源
};

bool detect_deadlock() {
    int available[M];
    bool finish[N];
    
    // 初始化
    calculate_available(available);
    for (int i = 0; i < N; i++) {
        finish[i] = (allocation[i] == 0);  // 无资源的进程已完成
    }
    
    // 查找可完成的进程
    while (true) {
        bool found = false;
        
        for (int i = 0; i < N; i++) {
            if (!finish[i] && request[i] <= available) {
                // 进程i可以完成
                for (int j = 0; j < M; j++) {
                    available[j] += allocation[i][j];
                }
                finish[i] = true;
                found = true;
            }
        }
        
        if (!found) break;
    }
    
    // 检查是否有进程未完成
    for (int i = 0; i < N; i++) {
        if (!finish[i]) {
            return true;  // 检测到死锁
        }
    }
    return false;
}

检测频率

  • 每次资源请求时检测(开销大)

  • 定期检测(如每小时)

  • CPU利用率低时检测


6. 死锁恢复

6.1 进程终止

方法1:终止所有死锁进程

void recover_terminate_all() {
    for (Process *p : deadlocked_processes) {
        kill(p);
    }
}

方法2:逐个终止直到解除死锁

void recover_terminate_one() {
    while (detect_deadlock()) {
        Process *victim = select_victim();
        kill(victim);
    }
}

选择受害者

  • 优先级最低

  • 执行时间最短

  • 持有资源最少

  • 已使用CPU时间最少

6.2 资源抢占

void recover_preempt() {
    while (detect_deadlock()) {
        // 1. 选择受害者
        Process *victim = select_victim();
        
        // 2. 回滚到安全状态
        rollback(victim);
        
        // 3. 抢占资源
        Resource *res = preempt_resource(victim);
        
        // 4. 分配给等待的进程
        allocate(res, waiting_process);
    }
}

回滚策略

  • 完全回滚:回到初始状态

  • 部分回滚:回到某个检查点

  • 饥饿问题:同一进程多次被选为受害者


7. 实际系统策略

7.1 鸵鸟策略

原理:忽略死锁(假装不存在)

// 大多数操作系统的做法
if (system_hang) {
    reboot();  // 重启解决一切问题
}

理由

  • 死锁很少发生

  • 预防和检测开销大

  • 性能优先于绝对正确性

使用场景

  • Linux、Windows桌面系统

  • UNIX系统

7.2 超时机制

bool lock_with_timeout(mutex_t *lock, int timeout_ms) {
    int elapsed = 0;
    
    while (!trylock(lock)) {
        sleep(10);
        elapsed += 10;
        
        if (elapsed >= timeout_ms) {
            return false;  // 超时,可能死锁
        }
    }
    
    return true;
}

void safe_operation() {
    if (lock_with_timeout(&lock_A, 1000)) {
        if (lock_with_timeout(&lock_B, 1000)) {
            // 成功获取两个锁
            critical_section();
            unlock(lock_B);
        }
        unlock(lock_A);
    } else {
        // 超时,放弃操作
        log("可能发生死锁");
    }
}

8. 实战案例

8.1 哲学家就餐问题

问题:5个哲学家,5根筷子,需要两根筷子才能吃饭

// 死锁版本
void philosopher(int i) {
    while (1) {
        think();
        
        pick_up(chopstick[i]);       // 拿起左边筷子
        pick_up(chopstick[(i+1)%5]); // 拿起右边筷子
        
        eat();
        
        put_down(chopstick[(i+1)%5]);
        put_down(chopstick[i]);
    }
}
// 如果所有人同时拿起左边筷子 → 死锁

解决方案1:最多4人同时拿筷子

semaphore_t room = 4;  // 最多4人

void philosopher(int i) {
    while (1) {
        think();
        
        sem_wait(&room);  // 进入房间
        pick_up(chopstick[i]);
        pick_up(chopstick[(i+1)%5]);
        
        eat();
        
        put_down(chopstick[(i+1)%5]);
        put_down(chopstick[i]);
        sem_post(&room);  // 离开房间
    }
}

解决方案2:奇数先左后右,偶数先右后左

void philosopher(int i) {
    while (1) {
        think();
        
        if (i % 2 == 0) {
            pick_up(chopstick[i]);
            pick_up(chopstick[(i+1)%5]);
        } else {
            pick_up(chopstick[(i+1)%5]);
            pick_up(chopstick[i]);
        }
        
        eat();
        
        put_down(chopstick[(i+1)%5]);
        put_down(chopstick[i]);
    }
}

8.2 数据库死锁

-- 事务1
BEGIN;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;  -- 锁定账户1
UPDATE accounts SET balance = balance + 100 WHERE id = 2;  -- 等待账户2
COMMIT;

-- 事务2
BEGIN;
UPDATE accounts SET balance = balance - 50 WHERE id = 2;   -- 锁定账户2
UPDATE accounts SET balance = balance + 50 WHERE id = 1;   -- 等待账户1
COMMIT;

-- 死锁!

MySQL死锁检测

// MySQL自动检测死锁并回滚其中一个事务
void detect_and_recover() {
    if (detect_cycle_in_wait_graph()) {
        Transaction *victim = select_victim();
        rollback(victim);
        return ERROR_DEADLOCK;
    }
}

9. 常见问题

Q1: 死锁和饥饿的区别?

A:

  • 死锁:互相等待,永远无法继续

  • 饥饿:一直得不到资源,但系统仍在运行

Q2: 如何调试死锁?

A:

  1. ps aux 查看进程状态(D状态)

  2. pstack 查看调用栈

  3. gdb 附加进程查看锁状态

  4. 使用死锁检测工具(如Helgrind)

Q3: 为什么数据库能自动处理死锁?

A:

  • 数据库维护等待图

  • 定期检测环

  • 选择代价小的事务回滚

  • 应用层可以重试

Q4: 多线程程序如何避免死锁?

A:

  1. 锁排序(最重要)

  2. 使用trylock

  3. 超时机制

  4. 减少锁的持有时间

  5. 使用无锁数据结构


10. 调试工具

10.1 Linux工具

# 查看进程状态
ps aux | grep D  # D状态可能是死锁

# 查看线程栈
pstack <pid>

# 查看锁信息
cat /proc/<pid>/status

# 使用gdb
gdb -p <pid>
(gdb) info threads
(gdb) thread apply all bt

10.2 Valgrind Helgrind

# 死锁检测
valgrind --tool=helgrind ./program

# 输出示例
==12345== Thread #1: lock order "0x12345 before 0x67890" violated
==12345== Thread #2: lock order "0x67890 before 0x12345"

10.3 死锁检测代码

#define MAX_LOCKS 100

struct lock_info {
    pthread_t thread;
    void *lock_addr;
    const char *file;
    int line;
};

lock_info held_locks[MAX_LOCKS];
int lock_count = 0;

void record_lock(void *lock) {
    held_locks[lock_count++] = {
        pthread_self(), lock, __FILE__, __LINE__
    };
    
    // 检测环
    if (detect_cycle()) {
        fprintf(stderr, "Deadlock detected!\n");
        print_lock_graph();
        abort();
    }
}

参考资源

  • 《Operating Systems: Three Easy Pieces》- Deadlock章节

  • 《Database System Concepts》- 事务和并发控制

  • Linux源码:kernel/locking/lockdep.c(死锁检测)

  • 《Art of Multiprocessor Programming》