03-Go并发编程

并发是Go的杀手级特性。goroutine极轻量,channel类型安全,select优雅多路复用。

goroutine深入

特性对比

特性

线程(Thread)

goroutine

栈大小

1-2MB

2KB起步,动态扩展

创建开销

~1-2ms

~1μs

调度

OS内核调度

Go运行时调度(M:N)

切换成本

高(内核态切换)

低(用户态)

数量上限

几千

百万级

goroutine调度模型(GMP)

G:goroutine
M:系统线程(Machine)
P:处理器(Processor),逻辑CPU

调度模型:M:N
- 多个goroutine(G)映射到少量线程(M)
- P控制G在M上执行
- 默认P数量=CPU核心数

创建和控制

// 创建
go func() {
    fmt.Println("goroutine")
}()

// 等待(WaitGroup)
import "sync"

var wg sync.WaitGroup

for i := 0; i < 10; i++ {
    wg.Add(1)
    go func(n int) {
        defer wg.Done()
        process(n)
    }(i)
}

wg.Wait()

// 限制并发数(worker pool)
func worker Pool(tasks <-chan Task, results chan<- Result) {
    const numWorkers = 5
    var wg sync.WaitGroup
    
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for task := range tasks {
                results <- process(task)
            }
        }()
    }
    
    wg.Wait()
    close(results)
}

channel详解

无缓冲channel

发送阻塞直到接收,接收阻塞直到发送。同步通信。

ch := make(chan int)

// 发送(阻塞)
go func() {
    ch <- 42
}()

// 接收(阻塞)
value := <-ch

// 应用:同步点
done := make(chan struct{})
go func() {
    work()
    done <- struct{}{}  // 信号
}()
<-done  // 等待完成

缓冲channel

缓冲满才阻塞发送,缓冲空才阻塞接收。异步通信。

ch := make(chan int, 3)

ch <- 1  // 不阻塞
ch <- 2
ch <- 3
// ch <- 4  // 阻塞(缓冲满)

<-ch     // 1
<-ch     // 2
// 现在可以再发送2个

单向channel

限制channel操作方向,增强类型安全。

// 只发送channel
func send(ch chan<- int) {
    ch <- 42
    // <-ch  // 编译错误
}

// 只接收channel
func receive(ch <-chan int) {
    value := <-ch
    // ch <- 42  // 编译错误
}

// 函数参数自动转换
ch := make(chan int)
go send(ch)     // chan int → chan<- int
receive(ch)     // chan int → <-chan int

select多路复用

select {
case msg := <-ch1:
    fmt.Println("ch1:", msg)
case msg := <-ch2:
    fmt.Println("ch2:", msg)
case ch3 <- value:
    fmt.Println("sent to ch3")
default:
    fmt.Println("no ready channel")
}

// 超时模式
select {
case result := <-ch:
    fmt.Println(result)
case <-time.After(time.Second):
    fmt.Println("timeout")
}

// 取消模式
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

select {
case <-ch:
    // 正常处理
case <-ctx.Done():
    fmt.Println("cancelled")
}

// for-select循环
for {
    select {
    case msg := <-ch:
        process(msg)
    case <-quit:
        return
    }
}

sync包

Mutex(互斥锁)

import "sync"

type SafeCounter struct {
    mu    sync.Mutex
    count int
}

func (c *SafeCounter) Inc() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.count++
}

func (c *SafeCounter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.count
}

RWMutex(读写锁)

type Cache struct {
    mu   sync.RWMutex
    data map[string]string
}

func (c *Cache) Get(key string) string {
    c.mu.RLock()         // 读锁(多个goroutine可同时读)
    defer c.mu.RUnlock()
    return c.data[key]
}

func (c *Cache) Set(key, value string) {
    c.mu.Lock()          // 写锁(独占)
    defer c.mu.Unlock()
    c.data[key] = value
}

Once(单次执行)

var once sync.Once
var instance *Singleton

func GetInstance() *Singleton {
    once.Do(func() {
        instance = &Singleton{}  // 仅执行一次
    })
    return instance
}

Pool(对象池)

var bufferPool = sync.Pool{
    New: func() interface{} {
        return new(bytes.Buffer)  // 创建新对象
    },
}

func process() {
    buf := bufferPool.Get().(*bytes.Buffer)  // 获取
    defer bufferPool.Put(buf)                // 归还
    
    buf.Reset()  // 重置
    buf.WriteString("data")
    // 使用buf
}

Map(并发安全map)

var m sync.Map

// 操作
m.Store("key", "value")          // 存储
value, ok := m.Load("key")       // 读取
m.Delete("key")                  // 删除
value, loaded := m.LoadOrStore("key", "value")  // 读取或存储
m.Range(func(key, value interface{}) bool {
    fmt.Println(key, value)
    return true  // 继续遍历
})

Cond(条件变量)

var (
    mu    sync.Mutex
    cond  = sync.NewCond(&mu)
    ready bool
)

// 等待
go func() {
    cond.L.Lock()
    for !ready {  // 循环检查(防止虚假唤醒)
        cond.Wait()
    }
    cond.L.Unlock()
    // 开始工作
}()

// 通知
cond.L.Lock()
ready = true
cond.Signal()     // 唤醒一个
// cond.Broadcast()  // 唤醒所有
cond.L.Unlock()

context包

传递截止时间、取消信号、请求作用域值。

import "context"

// 创建context
ctx := context.Background()           // 根context
ctx := context.TODO()                 // 临时context

// WithCancel:取消信号
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
    <-ctx.Done()
    fmt.Println("cancelled")
}()

cancel()  // 取消

// WithTimeout:超时
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

select {
case <-time.After(10 * time.Second):
    fmt.Println("work done")
case <-ctx.Done():
    fmt.Println("timeout:", ctx.Err())
}

// WithDeadline:截止时间
deadline := time.Now().Add(5 * time.Second)
ctx, cancel := context.WithDeadline(context.Background(), deadline)

// WithValue:传递值(少用,仅请求作用域数据)
ctx := context.WithValue(context.Background(), "userID", 123)
userID := ctx.Value("userID").(int)

context使用模式

// HTTP服务器
func handler(w http.ResponseWriter, r *http.Request) {
    ctx := r.Context()  // 请求的context
    
    // 调用其他服务
    result, err := callService(ctx, data)
    if err != nil {
        // 处理错误
    }
}

func callService(ctx context.Context, data Data) (Result, error) {
    select {
    case result := <-doWork(data):
        return result, nil
    case <-ctx.Done():
        return Result{}, ctx.Err()  // 请求取消或超时
    }
}

并发模式

Worker Pool

func workerPool(jobs <-chan int, results chan<- int) {
    const numWorkers = 5
    
    for i := 0; i < numWorkers; i++ {
        go worker(i, jobs, results)
    }
}

func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, job)
        time.Sleep(time.Second)
        results <- job * 2
    }
}

// 使用
jobs := make(chan int, 100)
results := make(chan int, 100)

go workerPool(jobs, results)

// 发送任务
for i := 1; i <= 10; i++ {
    jobs <- i
}
close(jobs)

// 接收结果
for i := 1; i <= 10; i++ {
    <-results
}

Fan-out/Fan-in

// Fan-out:分发任务到多个goroutine
func fanOut(input <-chan int) []<-chan int {
    const numWorkers = 3
    channels := make([]<-chan int, numWorkers)
    
    for i := 0; i < numWorkers; i++ {
        channels[i] = worker(input)
    }
    
    return channels
}

// Fan-in:合并多个channel
func fanIn(channels ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup
    
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for val := range c {
                out <- val
            }
        }(ch)
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

Pipeline

// 生成器
func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

// 平方
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

// 组合pipeline
nums := gen(1, 2, 3, 4, 5)
squared := square(nums)

for n := range squared {
    fmt.Println(n)  // 1, 4, 9, 16, 25
}

超时和取消

// 超时模式
func doWithTimeout(timeout time.Duration) error {
    done := make(chan struct{})
    
    go func() {
        work()
        done <- struct{}{}
    }()
    
    select {
    case <-done:
        return nil
    case <-time.After(timeout):
        return errors.New("timeout")
    }
}

// 取消模式
func doWithCancel(ctx context.Context) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            // 执行工作
            if finished {
                return nil
            }
        }
    }
}

原子操作

import "sync/atomic"

var count int64

// 原子操作
atomic.AddInt64(&count, 1)       // 原子加
atomic.LoadInt64(&count)         // 原子读
atomic.StoreInt64(&count, 100)   // 原子写
atomic.SwapInt64(&count, 200)    // 原子交换
atomic.CompareAndSwapInt64(&count, 200, 300)  // CAS

// Value(原子值)
var config atomic.Value

// 存储
config.Store(Config{Timeout: 5})

// 读取
cfg := config.Load().(Config)

并发安全数据结构

线程安全Map

type SafeMap struct {
    mu sync.RWMutex
    data map[string]interface{}
}

func NewSafeMap() *SafeMap {
    return &SafeMap{
        data: make(map[string]interface{}),
    }
}

func (m *SafeMap) Set(key string, value interface{}) {
    m.mu.Lock()
    defer m.mu.Unlock()
    m.data[key] = value
}

func (m *SafeMap) Get(key string) (interface{}, bool) {
    m.mu.RLock()
    defer m.mu.RUnlock()
    value, ok := m.data[key]
    return value, ok
}

// 或使用sync.Map(高并发读多写少)
var m sync.Map
m.Store("key", "value")
value, ok := m.Load("key")

线程安全Slice

type SafeSlice struct {
    mu   sync.Mutex
    data []int
}

func (s *SafeSlice) Append(val int) {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.data = append(s.data, val)
}

func (s *SafeSlice) Get(i int) int {
    s.mu.Lock()
    defer s.mu.Unlock()
    return s.data[i]
}

常见并发问题

竞态条件

// ❌ 竞态条件
var count int

func increment() {
    count++  // 非原子:读-改-写
}

for i := 0; i < 1000; i++ {
    go increment()
}
// count结果不确定(<1000)

// ✅ 使用互斥锁
var (
    count int
    mu    sync.Mutex
)

func increment() {
    mu.Lock()
    count++
    mu.Unlock()
}

// ✅ 使用原子操作
var count int64

func increment() {
    atomic.AddInt64(&count, 1)
}

// ✅ 使用channel
counter := make(chan int)
go func() {
    count := 0
    for range counter {
        count++
    }
}()

for i := 0; i < 1000; i++ {
    counter <- 1
}

死锁

// ❌ 死锁:互相等待
ch := make(chan int)
ch <- 42  // 阻塞(无接收者)
<-ch

// ✅ 使用goroutine
ch := make(chan int)
go func() {
    ch <- 42
}()
<-ch

// ❌ 所有goroutine都sleep
func main() {
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        wg.Wait()  // 等待自己(死锁)
    }()
    wg.Wait()
}

goroutine泄漏

// ❌ 泄漏:channel永远不关闭
func leak() {
    ch := make(chan int)
    go func() {
        for val := range ch {  // 永远阻塞
            process(val)
        }
    }()
    // ch从未关闭,goroutine永远不退出
}

// ✅ 使用done channel
func noLeak() {
    ch := make(chan int)
    done := make(chan struct{})
    
    go func() {
        for {
            select {
            case val := <-ch:
                process(val)
            case <-done:
                return  // 退出goroutine
            }
        }
    }()
    
    // 清理
    close(done)
}

// ✅ 使用context
func noLeak2(ctx context.Context) {
    ch := make(chan int)
    
    go func() {
        for {
            select {
            case val := <-ch:
                process(val)
            case <-ctx.Done():
                return
            }
        }
    }()
}

并发工具

errgroup

import "golang.org/x/sync/errgroup"

func process Files(files []string) error {
    g, ctx := errgroup.WithContext(context.Background())
    
    for _, file := range files {
        file := file  // 捕获循环变量
        g.Go(func() error {
            return processFile(ctx, file)
        })
    }
    
    // 等待所有goroutine,返回第一个错误
    return g.Wait()
}

singleflight

防止缓存击穿,同时只执行一次。

import "golang.org/x/sync/singleflight"

var g singleflight.Group

func getValue(key string) (string, error) {
    value, err, _ := g.Do(key, func() (interface{}, error) {
        // 耗时操作(如数据库查询)
        return fetchFromDB(key)
    })
    return value.(string), err
}

// 多个goroutine同时调用getValue("同一key")
// 只有第一个真正执行,其他等待并共享结果

限流(rate limiting)

import "golang.org/x/time/rate"

// 令牌桶算法
limiter := rate.NewLimiter(10, 100)  // 每秒10个,桶容量100

// 等待许可
if err := limiter.Wait(ctx); err != nil {
    return err
}

// 尝试获取(非阻塞)
if !limiter.Allow() {
    return errors.New("rate limit exceeded")
}

性能优化

减少锁竞争

// ❌ 粗粒度锁
type Counter struct {
    mu    sync.Mutex
    count map[string]int
}

func (c *Counter) Inc(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.count[key]++
}

// ✅ 细粒度锁(分段锁)
type ShardedCounter struct {
    shards [16]struct {
        mu    sync.Mutex
        count map[string]int
    }
}

func (c *ShardedCounter) Inc(key string) {
    shard := &c.shards[hash(key)%16]
    shard.mu.Lock()
    shard.count[key]++
    shard.mu.Unlock()
}

channel vs mutex

// channel:通信和同步
// mutex:保护共享数据

// CPU密集型:mutex(开销小)
// IO密集型:channel(清晰)

// 简单计数:atomic
// 复杂状态:mutex
// 消息传递:channel

并发测试

竞态检测

# 编译时启用竞态检测器
go build -race

# 测试时检测
go test -race

# 运行时检测
go run -race main.go

压力测试

func TestConcurrent(t *testing.T) {
    const numGoroutines = 100
    const numIterations = 1000
    
    var wg sync.WaitGroup
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < numIterations; j++ {
                operation()
            }
        }()
    }
    wg.Wait()
}

最佳实践

  1. channel传递所有权:发送后不再访问

  2. 关闭channel的发送方关闭:接收方不关闭

  3. 用select避免阻塞:加default或timeout

  4. context传递取消信号:优于done channel

  5. defer解锁:确保锁释放

  6. 读多写少用RWMutex:提升性能

  7. 原子操作简单计数:比mutex轻量

  8. 避免goroutine泄漏:确保退出路径

  9. 竞态检测常开:测试和开发环境

  10. 小心闭包捕获变量:循环中传参

核心: Don’t communicate by sharing memory; share memory by communicating.