Channel 模式:掌握 Go 并发编程的精髓
Go 语言的 Channel 是并发编程的核心,但仅仅知道如何创建和使用 Channel 是不够的。掌握各种 Channel 模式(Channel Patterns)才能让你写出优雅、高效、可维护的并发程序。
本文将深入介绍 Go 中最常用的 Channel 模式,通过实战案例帮你掌握并发编程的精髓。
基础回顾
Channel 的基本操作
// 创建 channel
ch := make(chan int) // 无缓冲
ch := make(chan int, 10) // 缓冲容量为 10
// 发送和接收
ch <- 42 // 发送
value := <-ch // 接收
// 关闭 channel
close(ch)
// 检查 channel 是否关闭
value, ok := <-ch
if !ok {
// channel 已关闭
}
黄金法则
不要通过共享内存来通信,而要通过通信来共享内存。
模式 1:Pipeline(管道)
Pipeline 模式将处理流程分解为多个阶段,每个阶段通过 Channel 连接。
基本 Pipeline
package main
import (
"fmt"
)
// 阶段 1:生成数字
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
// 阶段 2:平方
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
// 阶段 3:过滤偶数
func filterEven(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if n%2 == 0 {
out <- n
}
}
}()
return out
}
func main() {
// 构建 pipeline:生成 -> 平方 -> 过滤偶数
nums := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
squares := square(nums)
evens := filterEven(squares)
// 消费结果
for result := range evens {
fmt.Println(result) // 4, 16, 36, 64, 100
}
}
带错误处理的 Pipeline
package main
import (
"errors"
"fmt"
)
type Result struct {
Value int
Err error
}
// 阶段 1:解析
func parse(inputs ...string) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for _, input := range inputs {
var n int
_, err := fmt.Sscanf(input, "%d", &n)
if err != nil {
out <- Result{Err: fmt.Errorf("parse %q: %w", input, err)}
continue
}
out <- Result{Value: n}
}
}()
return out
}
// 阶段 2:验证
func validate(in <-chan Result) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for r := range in {
if r.Err != nil {
out <- r
continue
}
if r.Value < 0 {
out <- Result{Err: errors.New("negative number")}
continue
}
out <- r
}
}()
return out
}
// 阶段 3:处理
func process(in <-chan Result) <-chan Result {
out := make(chan Result)
go func() {
defer close(out)
for r := range in {
if r.Err != nil {
out <- r
continue
}
out <- Result{Value: r.Value * 2}
}
}()
return out
}
func main() {
inputs := []string{"1", "2", "abc", "-3", "4"}
results := process(validate(parse(inputs...)))
for r := range results {
if r.Err != nil {
fmt.Printf("Error: %v\n", r.Err)
} else {
fmt.Printf("Result: %d\n", r.Value)
}
}
}
模式 2:Fan-out/Fan-in(扇出/扇入)
Fan-out 将工作分发给多个 worker,Fan-in 将多个 worker 的结果合并。
Fan-out:分发工作
package main
import (
"fmt"
"sync"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
result := job * 2
results <- result
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// Fan-out:启动 5 个 worker
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
worker(id, jobs, results)
}(i)
}
// 发送工作
for i := 1; i <= 20; i++ {
jobs <- i
}
close(jobs)
// 等待所有 worker 完成
go func() {
wg.Wait()
close(results)
}()
// 收集结果
for result := range results {
fmt.Printf("Result: %d\n", result)
}
}
Fan-in:合并结果
package main
import (
"fmt"
"sync"
)
// Fan-in:合并多个 channel
func merge(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 为每个输入 channel 启动一个 goroutine
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
out <- v
}
}(ch)
}
// 等待所有 goroutine 完成后关闭输出 channel
go func() {
wg.Wait()
close(out)
}()
return out
}
func producer(id int, count int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 0; i < count; i++ {
out <- id*100 + i
}
}()
return out
}
func main() {
// 创建 3 个 producer
p1 := producer(1, 5)
p2 := producer(2, 5)
p3 := producer(3, 5)
// Fan-in:合并所有 producer 的输出
merged := merge(p1, p2, p3)
// 消费合并后的结果
for v := range merged {
fmt.Println(v)
}
}
完整的 Fan-out/Fan-in 示例
package main
import (
"fmt"
"sync"
"time"
)
type Job struct {
ID int
Payload string
}
type Result struct {
JobID int
Output string
Err error
}
func worker(id int, jobs <-chan Job, results chan<- Result) {
for job := range jobs {
// 模拟处理时间
time.Sleep(100 * time.Millisecond)
result := Result{
JobID: job.ID,
Output: fmt.Sprintf("Processed: %s", job.Payload),
}
results <- result
}
}
func fanOut(jobs <-chan Job, workerCount int) []<-chan Result {
channels := make([]<-chan Result, workerCount)
for i := 0; i < workerCount; i++ {
ch := make(chan Result)
channels[i] = ch
go worker(i, jobs, ch)
}
return channels
}
func fanIn(channels ...<-chan Result) <-chan Result {
var wg sync.WaitGroup
out := make(chan Result)
for _, ch := range channels {
wg.Add(1)
go func(c <-chan Result) {
defer wg.Done()
for r := range c {
out <- r
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
jobs := make(chan Job, 100)
// 发送工作
go func() {
defer close(jobs)
for i := 1; i <= 20; i++ {
jobs <- Job{ID: i, Payload: fmt.Sprintf("Task %d", i)}
}
}()
// Fan-out:分发给 5 个 worker
resultChannels := fanOut(jobs, 5)
// Fan-in:合并结果
results := fanIn(resultChannels...)
// 消费结果
for result := range results {
if result.Err != nil {
fmt.Printf("Job %d failed: %v\n", result.JobID, result.Err)
} else {
fmt.Printf("Job %d: %s\n", result.JobID, result.Output)
}
}
}
模式 3:Worker Pool(工作池)
Worker Pool 模式创建固定数量的 worker 来处理任务。
基本 Worker Pool
package main
import (
"fmt"
"sync"
"time"
)
type WorkerPool struct {
workerCount int
jobs chan Job
results chan Result
wg sync.WaitGroup
}
func NewWorkerPool(workerCount int) *WorkerPool {
return &WorkerPool{
workerCount: workerCount,
jobs: make(chan Job, 100),
results: make(chan Result, 100),
}
}
func (wp *WorkerPool) Start() {
for i := 0; i < wp.workerCount; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
}
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
for job := range wp.jobs {
fmt.Printf("Worker %d started job %d\n", id, job.ID)
time.Sleep(time.Second) // 模拟工作
result := Result{
JobID: job.ID,
Output: fmt.Sprintf("Result of job %d", job.ID),
}
wp.results <- result
fmt.Printf("Worker %d finished job %d\n", id, job.ID)
}
}
func (wp *WorkerPool) Submit(job Job) {
wp.jobs <- job
}
func (wp *WorkerPool) Results() <-chan Result {
return wp.results
}
func (wp *WorkerPool) Shutdown() {
close(wp.jobs)
wp.wg.Wait()
close(wp.results)
}
func main() {
pool := NewWorkerPool(3)
pool.Start()
// 提交任务
for i := 1; i <= 10; i++ {
pool.Submit(Job{ID: i, Payload: fmt.Sprintf("Task %d", i)})
}
// 收集结果
go func() {
for result := range pool.Results() {
fmt.Printf("Received: %s\n", result.Output)
}
}()
// 等待完成
pool.Shutdown()
}
带超时的 Worker Pool
package main
import (
"context"
"fmt"
"sync"
"time"
)
type TimeoutWorkerPool struct {
workerCount int
timeout time.Duration
jobs chan Job
results chan Result
wg sync.WaitGroup
}
func (wp *TimeoutWorkerPool) worker(id int) {
defer wp.wg.Done()
for job := range wp.jobs {
ctx, cancel := context.WithTimeout(context.Background(), wp.timeout)
result := make(chan Result, 1)
go func() {
// 模拟工作
time.Sleep(time.Duration(job.ID) * 500 * time.Millisecond)
result <- Result{
JobID: job.ID,
Output: fmt.Sprintf("Completed job %d", job.ID),
}
}()
select {
case r := <-result:
wp.results <- r
case <-ctx.Done():
wp.results <- Result{
JobID: job.ID,
Err: fmt.Errorf("timeout"),
}
}
cancel()
}
}
模式 4:Generator(生成器)
Generator 模式使用 Channel 实现惰性求值。
基本 Generator
package main
import (
"fmt"
)
// 生成斐波那契数列
func fibonacci() <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
a, b := 0, 1
for {
ch <- a
a, b = b, a+b
}
}()
return ch
}
func main() {
fib := fibonacci()
// 只取前 10 个
for i := 0; i < 10; i++ {
fmt.Println(<-fib)
}
}
带停止信号的 Generator
package main
import (
"fmt"
)
func counter(stop <-chan struct{}) <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
i := 0
for {
select {
case <-stop:
return
case ch <- i:
i++
}
}
}()
return ch
}
func main() {
stop := make(chan struct{})
count := counter(stop)
// 取前 5 个
for i := 0; i < 5; i++ {
fmt.Println(<-count)
}
// 停止生成器
close(stop)
}
模式 5:Or-Done Channel
Or-Done 模式允许在多个 channel 中任意一个关闭时触发操作。
package main
import (
"fmt"
"time"
)
// orDone 返回一个 channel,当任意输入 channel 关闭时关闭
func orDone(channels ...<-chan struct{}) <-chan struct{} {
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
orDone := make(chan struct{})
go func() {
defer close(orDone)
select {
case <-channels[0]:
case <-channels[1]:
case <-orDone(channels[2:]...):
}
}()
return orDone
}
func main() {
sig := func(after time.Duration) <-chan struct{} {
c := make(chan struct{})
go func() {
defer close(c)
time.Sleep(after)
}()
return c
}
start := time.Now()
<-orDone(
sig(2*time.Hour),
sig(5*time.Minute),
sig(1*time.Second), // 这个会最先触发
sig(1*time.Hour),
)
fmt.Printf("Done after: %v\n", time.Since(start))
}
模式 6:Tee Channel(分流)
Tee 模式将一个 channel 的数据分流到两个 channel。
package main
import (
"fmt"
)
func tee(in <-chan int) (<-chan int, <-chan int) {
out1 := make(chan int)
out2 := make(chan int)
go func() {
defer close(out1)
defer close(out2)
for val := range in {
// 使用局部变量避免闭包问题
val1, val2 := val, val
// 并发发送到两个 channel
select {
case out1 <- val1:
}
select {
case out2 <- val2:
}
}
}()
return out1, out2
}
func main() {
// 创建数据源
source := make(chan int)
go func() {
defer close(source)
for i := 1; i <= 5; i++ {
source <- i
}
}()
// 分流
ch1, ch2 := tee(source)
// 消费两个 channel
for i := 1; i <= 5; i++ {
v1 := <-ch1
v2 := <-ch2
fmt.Printf("ch1: %d, ch2: %d\n", v1, v2)
}
}
模式 7:Bridge Channel(桥接)
Bridge 模式将多个 channel 串联成一个。
package main
import (
"fmt"
)
func bridge(chanStream <-chan <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for ch := range chanStream {
for val := range ch {
out <- val
}
}
}()
return out
}
func main() {
// 创建 channel 的 stream
chanStream := make(chan (<-chan int))
go func() {
defer close(chanStream)
// 发送多个 channel
for i := 0; i < 3; i++ {
ch := make(chan int, 3)
for j := 0; j < 3; j++ {
ch <- i*10 + j
}
close(ch)
chanStream <- ch
}
}()
// 桥接所有 channel
for val := range bridge(chanStream) {
fmt.Println(val)
}
}
模式 8:Rate Limiting(限流)
使用 Channel 实现速率限制。
简单限流
package main
import (
"fmt"
"time"
)
func main() {
// 每秒最多处理 3 个请求
limiter := time.Tick(333 * time.Millisecond)
requests := make(chan int, 10)
for i := 1; i <= 10; i++ {
requests <- i
}
close(requests)
for req := range requests {
<-limiter // 等待限流器
fmt.Printf("Processing request %d at %v\n", req, time.Now())
}
}
令牌桶限流
package main
import (
"fmt"
"time"
)
type RateLimiter struct {
tokens chan struct{}
}
func NewRateLimiter(rate int, burst int) *RateLimiter {
rl := &RateLimiter{
tokens: make(chan struct{}, burst),
}
// 以固定速率添加令牌
go func() {
ticker := time.NewTicker(time.Second / time.Duration(rate))
defer ticker.Stop()
for range ticker.C {
select {
case rl.tokens <- struct{}{}:
default:
// 桶已满,丢弃令牌
}
}
}()
return rl
}
func (rl *RateLimiter) Wait() {
<-rl.tokens
}
func main() {
limiter := NewRateLimiter(5, 10) // 每秒 5 个请求,突发 10 个
for i := 1; i <= 20; i++ {
limiter.Wait()
fmt.Printf("Request %d at %v\n", i, time.Now())
}
}
模式 9:Cancellation(取消)
使用 Channel 实现优雅的取消机制。
package main
import (
"fmt"
"time"
)
func longRunningTask(cancel <-chan struct{}) {
for i := 0; i < 10; i++ {
select {
case <-cancel:
fmt.Println("Task cancelled")
return
default:
fmt.Printf("Working... %d\n", i)
time.Sleep(500 * time.Millisecond)
}
}
fmt.Println("Task completed")
}
func main() {
cancel := make(chan struct{})
go longRunningTask(cancel)
// 2 秒后取消
time.Sleep(2 * time.Second)
close(cancel)
time.Sleep(1 * time.Second)
}
总结
Channel 模式是 Go 并发编程的精髓:
核心模式:
- Pipeline:将处理流程分解为多个阶段
- Fan-out/Fan-in:分发和合并工作
- Worker Pool:固定数量的 worker 处理任务
- Generator:惰性求值
- Or-Done:多路选择
- Tee:分流数据
- Bridge:桥接多个 channel
- Rate Limiting:速率限制
- Cancellation:优雅取消
最佳实践:
- 总是关闭发送端的 channel
- 使用
select处理多个 channel - 使用
context进行取消和超时控制 - 避免 goroutine 泄漏
- 使用缓冲 channel 提高性能
记住:并发不是目的,而是手段。选择正确的模式,让并发程序既高效又易于理解。
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。