Go 并发入门:goroutine、channel 和 context 怎么一起用

本文用并发抓取、超时取消和 worker 池示例讲解 Go 的 goroutine、channel、select、WaitGroup 和 context。

Go 的并发为什么容易上手也容易误用

Go 最吸引人的特性之一是并发。启动一个 goroutine 只需要在函数调用前加 go,channel 可以在 goroutine 之间传递数据,select 可以等待多个通信事件,context 可以传递取消信号和超时。相比手动管理线程,这套模型轻很多。

但轻不代表没有成本。goroutine 如果没人退出,会泄漏;channel 如果没人接收,发送方会阻塞;多个 goroutine 同时写共享变量,会产生数据竞争;没有超时的外部调用,会拖住整个请求。入门阶段必须同时学习“怎么启动并发”和“怎么收回来”。

这篇文章从最小 goroutine 讲起,再介绍 channel、WaitGroup、select 和 context。示例会围绕并发抓取页面和处理任务展开。重点不是炫技,而是建立安全直觉:每个 goroutine 都应该有退出路径,每个阻塞等待都应该考虑取消和超时。

启动一个 goroutine

最简单的 goroutine:

package main

import (
	"fmt"
	"time"
)

func say(message string) {
	fmt.Println(message)
}

func main() {
	go say("hello from goroutine")
	time.Sleep(100 * time.Millisecond)
}

go say(...) 会启动一个新的 goroutine 执行函数。主 goroutine 不会等待它完成,所以示例里用了 time.Sleep。这只是演示,不是推荐做法。真实代码里不要靠睡眠等待并发完成。

更好的方式是使用 sync.WaitGroup

var wg sync.WaitGroup

wg.Add(1)
go func() {
	defer wg.Done()
	fmt.Println("work done")
}()

wg.Wait()

Add(1) 表示有一个任务要等,goroutine 结束时调用 Done(),主流程用 Wait() 等全部完成。defer wg.Done() 常放在 goroutine 开头,避免中途 return 时忘记通知。

channel 用来传递结果

goroutine 之间可以通过 channel 通信:

ch := make(chan string)

go func() {
	ch <- "hello"
}()

message := <-ch
fmt.Println(message)

未缓冲 channel 的发送和接收会互相等待。发送方执行 ch <- "hello" 时,如果没有接收者,会阻塞;接收方执行 <-ch 时,如果没有发送者,也会阻塞。

可以创建带缓冲的 channel:

ch := make(chan string, 2)
ch <- "a"
ch <- "b"
fmt.Println(<-ch)
fmt.Println(<-ch)

缓冲 channel 在容量未满时发送不会阻塞。它适合削峰或任务队列,但不要把缓冲当成解决并发问题的万能办法。容量设置不合理,只会把阻塞推迟。

关闭 channel:

close(ch)

关闭表示不会再发送新值。接收方可以用 range 读取直到关闭:

for value := range ch {
	fmt.Println(value)
}

通常由发送方关闭 channel。接收方不要随便关闭,因为它不知道是否还有其他发送者。

并发抓取多个地址

写一个并发请求示例:

type FetchResult struct {
	URL    string
	Status int
	Err    error
}

func fetch(url string) FetchResult {
	resp, err := http.Get(url)
	if err != nil {
		return FetchResult{URL: url, Err: err}
	}
	defer resp.Body.Close()

	return FetchResult{URL: url, Status: resp.StatusCode}
}

并发执行:

func fetchAll(urls []string) []FetchResult {
	results := make(chan FetchResult)

	for _, url := range urls {
		url := url
		go func() {
			results <- fetch(url)
		}()
	}

	collected := make([]FetchResult, 0, len(urls))
	for range urls {
		collected = append(collected, <-results)
	}

	return collected
}

注意 url := url。在循环里启动 goroutine 时,要避免闭包捕获循环变量导致所有 goroutine 看到同一个变量。较新 Go 版本对 range 变量做了改进,但写成局部变量仍然能让意图更清楚,也兼容旧代码。

这个版本有一个问题:http.Get 没有超时。如果某个请求一直卡住,整个函数也会一直等。

使用 context 控制超时

context.Context 用来传递取消信号、超时和请求范围数据。HTTP 请求可以绑定 context:

func fetchWithContext(ctx context.Context, url string) FetchResult {
	req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
	if err != nil {
		return FetchResult{URL: url, Err: err}
	}

	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return FetchResult{URL: url, Err: err}
	}
	defer resp.Body.Close()

	return FetchResult{URL: url, Status: resp.StatusCode}
}

调用时设置总超时:

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

result := fetchWithContext(ctx, "https://example.com")
fmt.Println(result)

defer cancel() 很重要。即使超时没有发生,也应该释放 context 相关资源。

把它放进并发版本:

func fetchAllWithContext(ctx context.Context, urls []string) []FetchResult {
	results := make(chan FetchResult, len(urls))

	for _, url := range urls {
		url := url
		go func() {
			results <- fetchWithContext(ctx, url)
		}()
	}

	collected := make([]FetchResult, 0, len(urls))
	for range urls {
		collected = append(collected, <-results)
	}

	return collected
}

这里 channel 使用了缓冲,容量等于 URL 数量。即使调用方开始接收稍晚,goroutine 发送结果也不会因为没有接收者而卡住。

select 等待多个事件

select 可以同时等待多个 channel:

select {
case result := <-results:
	fmt.Println(result)
case <-ctx.Done():
	fmt.Println("cancelled:", ctx.Err())
}

如果要在收集结果时响应取消:

func collect(ctx context.Context, results <-chan FetchResult, count int) []FetchResult {
	collected := make([]FetchResult, 0, count)

	for len(collected) < count {
		select {
		case result := <-results:
			collected = append(collected, result)
		case <-ctx.Done():
			collected = append(collected, FetchResult{Err: ctx.Err()})
			return collected
		}
	}

	return collected
}

ctx.Done() 返回一个 channel,当 context 被取消或超时时关闭。ctx.Err() 可以拿到原因,比如 context deadline exceeded

在服务端代码里,HTTP 请求的 r.Context() 会在客户端断开、请求超时或服务端取消时结束。你应该把它传给数据库、HTTP 客户端和业务函数,而不是在深层代码里随便创建 context.Background()

worker 池:限制并发数量

并发不是越多越好。如果有 10000 个 URL,一次启动 10000 个 goroutine 可能压垮目标服务或本机资源。可以用 worker 池限制并发:

func fetchWithWorkers(ctx context.Context, urls []string, workerCount int) []FetchResult {
	jobs := make(chan string)
	results := make(chan FetchResult, len(urls))

	var wg sync.WaitGroup
	for i := 0; i < workerCount; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for url := range jobs {
				results <- fetchWithContext(ctx, url)
			}
		}()
	}

	go func() {
		defer close(jobs)
		for _, url := range urls {
			select {
			case jobs <- url:
			case <-ctx.Done():
				return
			}
		}
	}()

	go func() {
		wg.Wait()
		close(results)
	}()

	var collected []FetchResult
	for result := range results {
		collected = append(collected, result)
	}
	return collected
}

这个例子有三个 channel 相关动作:主 goroutine 把 URL 发进 jobs;固定数量 worker 从 jobs 读取并发送结果;所有 worker 结束后关闭 results,收集方 range 到 channel 关闭。

真实项目里,worker 池要考虑更多细节,比如错误是否立刻取消全部任务,结果是否需要保持输入顺序,单个任务是否有独立超时。但入门阶段先理解这个骨架就够了。

共享变量需要同步

不要这样统计成功数量:

success := 0

for _, url := range urls {
	go func(url string) {
		result := fetch(url)
		if result.Err == nil {
			success++
		}
	}(url)
}

多个 goroutine 同时修改 success,会产生数据竞争。可以用 sync.Mutex

var mu sync.Mutex
success := 0

mu.Lock()
success++
mu.Unlock()

但更 Go 风格的方式是让 goroutine 发送结果,单个收集者统计:

success := 0
for _, result := range results {
	if result.Err == nil {
		success++
	}
}

“通过通信共享内存,而不是通过共享内存通信”是 Go 并发常被提到的一句话。它不是绝对规则,但提醒你优先考虑数据流,而不是让很多 goroutine 同时抢同一个变量。

小结

Go 并发的入门语法很简单:go 启动 goroutine,channel 传递数据,WaitGroup 等待完成,select 同时监听多个事件,context 控制取消和超时。真正需要练的是退出路径和资源边界。

写并发代码时,先问几个问题:这个 goroutine 什么时候结束?发送到 channel 时有没有接收者?如果外部请求超时,内部任务能否停止?是否有多个 goroutine 同时写同一个变量?并发数量是否需要限制?

只要这些问题能回答清楚,Go 的并发会非常实用。它不是为了把代码写得神秘,而是让你用较低成本处理网络请求、后台任务和流水线处理这类真实工程问题。

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页