Channel 模式:掌握 Go 并发编程的精髓

深入学习 Go 的各种 Channel 使用模式,包括 Pipeline、Fan-in/Fan-out、Worker Pool 等经典并发模式

Channel 模式:掌握 Go 并发编程的精髓

Go 语言的 Channel 是并发编程的核心,但仅仅知道如何创建和使用 Channel 是不够的。掌握各种 Channel 模式(Channel Patterns)才能让你写出优雅、高效、可维护的并发程序。

本文将深入介绍 Go 中最常用的 Channel 模式,通过实战案例帮你掌握并发编程的精髓。

基础回顾

Channel 的基本操作

// 创建 channel
ch := make(chan int)        // 无缓冲
ch := make(chan int, 10)    // 缓冲容量为 10

// 发送和接收
ch <- 42        // 发送
value := <-ch   // 接收

// 关闭 channel
close(ch)

// 检查 channel 是否关闭
value, ok := <-ch
if !ok {
    // channel 已关闭
}

黄金法则

不要通过共享内存来通信,而要通过通信来共享内存。

模式 1:Pipeline(管道)

Pipeline 模式将处理流程分解为多个阶段,每个阶段通过 Channel 连接。

基本 Pipeline

package main

import (
    "fmt"
)

// 阶段 1:生成数字
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

// 阶段 2:平方
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

// 阶段 3:过滤偶数
func filterEven(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            if n%2 == 0 {
                out <- n
            }
        }
    }()
    return out
}

func main() {
    // 构建 pipeline:生成 -> 平方 -> 过滤偶数
    nums := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    squares := square(nums)
    evens := filterEven(squares)
    
    // 消费结果
    for result := range evens {
        fmt.Println(result)  // 4, 16, 36, 64, 100
    }
}

带错误处理的 Pipeline

package main

import (
    "errors"
    "fmt"
)

type Result struct {
    Value int
    Err   error
}

// 阶段 1:解析
func parse(inputs ...string) <-chan Result {
    out := make(chan Result)
    go func() {
        defer close(out)
        for _, input := range inputs {
            var n int
            _, err := fmt.Sscanf(input, "%d", &n)
            if err != nil {
                out <- Result{Err: fmt.Errorf("parse %q: %w", input, err)}
                continue
            }
            out <- Result{Value: n}
        }
    }()
    return out
}

// 阶段 2:验证
func validate(in <-chan Result) <-chan Result {
    out := make(chan Result)
    go func() {
        defer close(out)
        for r := range in {
            if r.Err != nil {
                out <- r
                continue
            }
            if r.Value < 0 {
                out <- Result{Err: errors.New("negative number")}
                continue
            }
            out <- r
        }
    }()
    return out
}

// 阶段 3:处理
func process(in <-chan Result) <-chan Result {
    out := make(chan Result)
    go func() {
        defer close(out)
        for r := range in {
            if r.Err != nil {
                out <- r
                continue
            }
            out <- Result{Value: r.Value * 2}
        }
    }()
    return out
}

func main() {
    inputs := []string{"1", "2", "abc", "-3", "4"}
    
    results := process(validate(parse(inputs...)))
    
    for r := range results {
        if r.Err != nil {
            fmt.Printf("Error: %v\n", r.Err)
        } else {
            fmt.Printf("Result: %d\n", r.Value)
        }
    }
}

模式 2:Fan-out/Fan-in(扇出/扇入)

Fan-out 将工作分发给多个 worker,Fan-in 将多个 worker 的结果合并。

Fan-out:分发工作

package main

import (
    "fmt"
    "sync"
)

func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, job)
        result := job * 2
        results <- result
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)
    
    // Fan-out:启动 5 个 worker
    var wg sync.WaitGroup
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            worker(id, jobs, results)
        }(i)
    }
    
    // 发送工作
    for i := 1; i <= 20; i++ {
        jobs <- i
    }
    close(jobs)
    
    // 等待所有 worker 完成
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // 收集结果
    for result := range results {
        fmt.Printf("Result: %d\n", result)
    }
}

Fan-in:合并结果

package main

import (
    "fmt"
    "sync"
)

// Fan-in:合并多个 channel
func merge(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)
    
    // 为每个输入 channel 启动一个 goroutine
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                out <- v
            }
        }(ch)
    }
    
    // 等待所有 goroutine 完成后关闭输出 channel
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

func producer(id int, count int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for i := 0; i < count; i++ {
            out <- id*100 + i
        }
    }()
    return out
}

func main() {
    // 创建 3 个 producer
    p1 := producer(1, 5)
    p2 := producer(2, 5)
    p3 := producer(3, 5)
    
    // Fan-in:合并所有 producer 的输出
    merged := merge(p1, p2, p3)
    
    // 消费合并后的结果
    for v := range merged {
        fmt.Println(v)
    }
}

完整的 Fan-out/Fan-in 示例

package main

import (
    "fmt"
    "sync"
    "time"
)

type Job struct {
    ID      int
    Payload string
}

type Result struct {
    JobID  int
    Output string
    Err    error
}

func worker(id int, jobs <-chan Job, results chan<- Result) {
    for job := range jobs {
        // 模拟处理时间
        time.Sleep(100 * time.Millisecond)
        
        result := Result{
            JobID:  job.ID,
            Output: fmt.Sprintf("Processed: %s", job.Payload),
        }
        results <- result
    }
}

func fanOut(jobs <-chan Job, workerCount int) []<-chan Result {
    channels := make([]<-chan Result, workerCount)
    
    for i := 0; i < workerCount; i++ {
        ch := make(chan Result)
        channels[i] = ch
        go worker(i, jobs, ch)
    }
    
    return channels
}

func fanIn(channels ...<-chan Result) <-chan Result {
    var wg sync.WaitGroup
    out := make(chan Result)
    
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan Result) {
            defer wg.Done()
            for r := range c {
                out <- r
            }
        }(ch)
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

func main() {
    jobs := make(chan Job, 100)
    
    // 发送工作
    go func() {
        defer close(jobs)
        for i := 1; i <= 20; i++ {
            jobs <- Job{ID: i, Payload: fmt.Sprintf("Task %d", i)}
        }
    }()
    
    // Fan-out:分发给 5 个 worker
    resultChannels := fanOut(jobs, 5)
    
    // Fan-in:合并结果
    results := fanIn(resultChannels...)
    
    // 消费结果
    for result := range results {
        if result.Err != nil {
            fmt.Printf("Job %d failed: %v\n", result.JobID, result.Err)
        } else {
            fmt.Printf("Job %d: %s\n", result.JobID, result.Output)
        }
    }
}

模式 3:Worker Pool(工作池)

Worker Pool 模式创建固定数量的 worker 来处理任务。

基本 Worker Pool

package main

import (
    "fmt"
    "sync"
    "time"
)

type WorkerPool struct {
    workerCount int
    jobs        chan Job
    results     chan Result
    wg          sync.WaitGroup
}

func NewWorkerPool(workerCount int) *WorkerPool {
    return &WorkerPool{
        workerCount: workerCount,
        jobs:        make(chan Job, 100),
        results:     make(chan Result, 100),
    }
}

func (wp *WorkerPool) Start() {
    for i := 0; i < wp.workerCount; i++ {
        wp.wg.Add(1)
        go wp.worker(i)
    }
}

func (wp *WorkerPool) worker(id int) {
    defer wp.wg.Done()
    
    for job := range wp.jobs {
        fmt.Printf("Worker %d started job %d\n", id, job.ID)
        time.Sleep(time.Second)  // 模拟工作
        
        result := Result{
            JobID:  job.ID,
            Output: fmt.Sprintf("Result of job %d", job.ID),
        }
        wp.results <- result
        fmt.Printf("Worker %d finished job %d\n", id, job.ID)
    }
}

func (wp *WorkerPool) Submit(job Job) {
    wp.jobs <- job
}

func (wp *WorkerPool) Results() <-chan Result {
    return wp.results
}

func (wp *WorkerPool) Shutdown() {
    close(wp.jobs)
    wp.wg.Wait()
    close(wp.results)
}

func main() {
    pool := NewWorkerPool(3)
    pool.Start()
    
    // 提交任务
    for i := 1; i <= 10; i++ {
        pool.Submit(Job{ID: i, Payload: fmt.Sprintf("Task %d", i)})
    }
    
    // 收集结果
    go func() {
        for result := range pool.Results() {
            fmt.Printf("Received: %s\n", result.Output)
        }
    }()
    
    // 等待完成
    pool.Shutdown()
}

带超时的 Worker Pool

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type TimeoutWorkerPool struct {
    workerCount int
    timeout     time.Duration
    jobs        chan Job
    results     chan Result
    wg          sync.WaitGroup
}

func (wp *TimeoutWorkerPool) worker(id int) {
    defer wp.wg.Done()
    
    for job := range wp.jobs {
        ctx, cancel := context.WithTimeout(context.Background(), wp.timeout)
        
        result := make(chan Result, 1)
        go func() {
            // 模拟工作
            time.Sleep(time.Duration(job.ID) * 500 * time.Millisecond)
            result <- Result{
                JobID:  job.ID,
                Output: fmt.Sprintf("Completed job %d", job.ID),
            }
        }()
        
        select {
        case r := <-result:
            wp.results <- r
        case <-ctx.Done():
            wp.results <- Result{
                JobID: job.ID,
                Err:   fmt.Errorf("timeout"),
            }
        }
        
        cancel()
    }
}

模式 4:Generator(生成器)

Generator 模式使用 Channel 实现惰性求值。

基本 Generator

package main

import (
    "fmt"
)

// 生成斐波那契数列
func fibonacci() <-chan int {
    ch := make(chan int)
    go func() {
        defer close(ch)
        a, b := 0, 1
        for {
            ch <- a
            a, b = b, a+b
        }
    }()
    return ch
}

func main() {
    fib := fibonacci()
    
    // 只取前 10 个
    for i := 0; i < 10; i++ {
        fmt.Println(<-fib)
    }
}

带停止信号的 Generator

package main

import (
    "fmt"
)

func counter(stop <-chan struct{}) <-chan int {
    ch := make(chan int)
    go func() {
        defer close(ch)
        i := 0
        for {
            select {
            case <-stop:
                return
            case ch <- i:
                i++
            }
        }
    }()
    return ch
}

func main() {
    stop := make(chan struct{})
    count := counter(stop)
    
    // 取前 5 个
    for i := 0; i < 5; i++ {
        fmt.Println(<-count)
    }
    
    // 停止生成器
    close(stop)
}

模式 5:Or-Done Channel

Or-Done 模式允许在多个 channel 中任意一个关闭时触发操作。

package main

import (
    "fmt"
    "time"
)

// orDone 返回一个 channel,当任意输入 channel 关闭时关闭
func orDone(channels ...<-chan struct{}) <-chan struct{} {
    switch len(channels) {
    case 0:
        return nil
    case 1:
        return channels[0]
    }
    
    orDone := make(chan struct{})
    go func() {
        defer close(orDone)
        
        select {
        case <-channels[0]:
        case <-channels[1]:
        case <-orDone(channels[2:]...):
        }
    }()
    
    return orDone
}

func main() {
    sig := func(after time.Duration) <-chan struct{} {
        c := make(chan struct{})
        go func() {
            defer close(c)
            time.Sleep(after)
        }()
        return c
    }
    
    start := time.Now()
    
    <-orDone(
        sig(2*time.Hour),
        sig(5*time.Minute),
        sig(1*time.Second),  // 这个会最先触发
        sig(1*time.Hour),
    )
    
    fmt.Printf("Done after: %v\n", time.Since(start))
}

模式 6:Tee Channel(分流)

Tee 模式将一个 channel 的数据分流到两个 channel。

package main

import (
    "fmt"
)

func tee(in <-chan int) (<-chan int, <-chan int) {
    out1 := make(chan int)
    out2 := make(chan int)
    
    go func() {
        defer close(out1)
        defer close(out2)
        
        for val := range in {
            // 使用局部变量避免闭包问题
            val1, val2 := val, val
            
            // 并发发送到两个 channel
            select {
            case out1 <- val1:
            }
            select {
            case out2 <- val2:
            }
        }
    }()
    
    return out1, out2
}

func main() {
    // 创建数据源
    source := make(chan int)
    go func() {
        defer close(source)
        for i := 1; i <= 5; i++ {
            source <- i
        }
    }()
    
    // 分流
    ch1, ch2 := tee(source)
    
    // 消费两个 channel
    for i := 1; i <= 5; i++ {
        v1 := <-ch1
        v2 := <-ch2
        fmt.Printf("ch1: %d, ch2: %d\n", v1, v2)
    }
}

模式 7:Bridge Channel(桥接)

Bridge 模式将多个 channel 串联成一个。

package main

import (
    "fmt"
)

func bridge(chanStream <-chan <-chan int) <-chan int {
    out := make(chan int)
    
    go func() {
        defer close(out)
        
        for ch := range chanStream {
            for val := range ch {
                out <- val
            }
        }
    }()
    
    return out
}

func main() {
    // 创建 channel 的 stream
    chanStream := make(chan (<-chan int))
    
    go func() {
        defer close(chanStream)
        
        // 发送多个 channel
        for i := 0; i < 3; i++ {
            ch := make(chan int, 3)
            for j := 0; j < 3; j++ {
                ch <- i*10 + j
            }
            close(ch)
            chanStream <- ch
        }
    }()
    
    // 桥接所有 channel
    for val := range bridge(chanStream) {
        fmt.Println(val)
    }
}

模式 8:Rate Limiting(限流)

使用 Channel 实现速率限制。

简单限流

package main

import (
    "fmt"
    "time"
)

func main() {
    // 每秒最多处理 3 个请求
    limiter := time.Tick(333 * time.Millisecond)
    
    requests := make(chan int, 10)
    for i := 1; i <= 10; i++ {
        requests <- i
    }
    close(requests)
    
    for req := range requests {
        <-limiter  // 等待限流器
        fmt.Printf("Processing request %d at %v\n", req, time.Now())
    }
}

令牌桶限流

package main

import (
    "fmt"
    "time"
)

type RateLimiter struct {
    tokens chan struct{}
}

func NewRateLimiter(rate int, burst int) *RateLimiter {
    rl := &RateLimiter{
        tokens: make(chan struct{}, burst),
    }
    
    // 以固定速率添加令牌
    go func() {
        ticker := time.NewTicker(time.Second / time.Duration(rate))
        defer ticker.Stop()
        
        for range ticker.C {
            select {
            case rl.tokens <- struct{}{}:
            default:
                // 桶已满,丢弃令牌
            }
        }
    }()
    
    return rl
}

func (rl *RateLimiter) Wait() {
    <-rl.tokens
}

func main() {
    limiter := NewRateLimiter(5, 10)  // 每秒 5 个请求,突发 10 个
    
    for i := 1; i <= 20; i++ {
        limiter.Wait()
        fmt.Printf("Request %d at %v\n", i, time.Now())
    }
}

模式 9:Cancellation(取消)

使用 Channel 实现优雅的取消机制。

package main

import (
    "fmt"
    "time"
)

func longRunningTask(cancel <-chan struct{}) {
    for i := 0; i < 10; i++ {
        select {
        case <-cancel:
            fmt.Println("Task cancelled")
            return
        default:
            fmt.Printf("Working... %d\n", i)
            time.Sleep(500 * time.Millisecond)
        }
    }
    fmt.Println("Task completed")
}

func main() {
    cancel := make(chan struct{})
    
    go longRunningTask(cancel)
    
    // 2 秒后取消
    time.Sleep(2 * time.Second)
    close(cancel)
    
    time.Sleep(1 * time.Second)
}

总结

Channel 模式是 Go 并发编程的精髓:

核心模式:

  1. Pipeline:将处理流程分解为多个阶段
  2. Fan-out/Fan-in:分发和合并工作
  3. Worker Pool:固定数量的 worker 处理任务
  4. Generator:惰性求值
  5. Or-Done:多路选择
  6. Tee:分流数据
  7. Bridge:桥接多个 channel
  8. Rate Limiting:速率限制
  9. Cancellation:优雅取消

最佳实践:

  1. 总是关闭发送端的 channel
  2. 使用 select 处理多个 channel
  3. 使用 context 进行取消和超时控制
  4. 避免 goroutine 泄漏
  5. 使用缓冲 channel 提高性能

记住:并发不是目的,而是手段。选择正确的模式,让并发程序既高效又易于理解。

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页