Go worker 取消入门:后台循环如何听懂 context

用消息消费 worker 示例讲 context 取消、select 循环、任务超时、资源关闭和测试后台 goroutine 退出。

后台 worker 是 Go 服务里很常见的结构:消费队列、定时清理、同步外部数据、处理导入任务。很多 worker 写起来很简单,但停不下来。服务收到关闭信号后,HTTP server 已经退出,worker 还卡在 sleep、网络请求或 channel receive 上,进程迟迟不结束。

本文讲 worker 如何使用 context 取消。核心原则是:循环要监听 ctx.Done(),任务处理要传递 context,阻塞等待要能被取消。

一个基础 worker

type Job struct {
	ID string
}

func RunWorker(ctx context.Context, jobs <-chan Job, handle func(context.Context, Job) error) {
	for {
		select {
		case <-ctx.Done():
			return
		case job, ok := <-jobs:
			if !ok {
				return
			}
			if err := handle(ctx, job); err != nil {
				log.Printf("handle job %s: %v", job.ID, err)
			}
		}
	}
}

这个 worker 会在 context 取消或 jobs channel 关闭时退出。处理函数也接收 context,便于内部数据库查询和 HTTP 调用停止。

给单个任务设置超时

整个 worker 的 ctx 表示服务生命周期;单个任务还可以有自己的超时:

func handleWithTimeout(parent context.Context, job Job) error {
	ctx, cancel := context.WithTimeout(parent, 30*time.Second)
	defer cancel()
	return processJob(ctx, job)
}

这样某个任务卡住不会无限占用 worker。注意从 parent 派生,这样服务关闭时任务也会立即取消。

不要用 time.Sleep 阻塞退出

坏写法:

for {
	process()
	time.Sleep(time.Minute)
}

取消时可能要等一分钟。用 timer 或 ticker 配合 select:

timer := time.NewTimer(time.Minute)
select {
case <-timer.C:
case <-ctx.Done():
	timer.Stop()
	return
}

定时循环用 time.Ticker 也可以,但记得 defer ticker.Stop()

处理中的任务怎么办

服务关闭时,worker 有两种策略:尽快取消当前任务,或者给当前任务一段时间收尾。选择取决于业务。发送通知可以取消后下次重试;写关键数据可能希望完成当前事务。

可以在 shutdown 时给外层 context 一个宽限期:

shutdownCtx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
RunWorker(shutdownCtx, jobs, handle)

更常见的是 worker 在程序启动时运行,收到信号后取消 root context,然后等待 WaitGroup。

用 WaitGroup 等待退出

ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup

for i := 0; i < 4; i++ {
	wg.Add(1)
	go func() {
		defer wg.Done()
		RunWorker(ctx, jobs, handle)
	}()
}

// 收到信号
cancel()
wg.Wait()

不要启动 goroutine 后就不管。长期运行服务应该知道自己启动了哪些后台任务,并在退出时等待它们收尾。

测试 worker 能退出

func TestWorkerStopsOnCancel(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	jobs := make(chan Job)

	done := make(chan struct{})
	go func() {
		RunWorker(ctx, jobs, func(ctx context.Context, job Job) error {
			return nil
		})
		close(done)
	}()

	cancel()
	select {
	case <-done:
	case <-time.After(time.Second):
		t.Fatal("worker did not stop")
	}
}

这类测试很有价值。后台 goroutine 泄漏通常不会马上报错,但会让测试进程、服务退出和资源使用变得不稳定。

channel 关闭由谁负责

并发代码里一个常见约定是:谁发送,谁关闭。worker 只是从 jobs 里读任务,不应该关闭 jobs。如果多个地方都可能关闭同一个 channel,程序很容易 panic。

func produce(ctx context.Context, jobs chan<- Job) {
	defer close(jobs)

	for i := 0; i < 100; i++ {
		select {
		case <-ctx.Done():
			return
		case jobs <- Job{ID: i}:
		}
	}
}

worker 看到 jobs 被关闭后退出循环。这样职责很清楚:生产者决定什么时候没有新任务,worker 只负责消费。若任务来自消息队列或数据库,通常不关闭 channel,而是让 context 控制退出。

失败任务如何处理

示例里 worker 遇到错误只是记录日志,真实项目要决定失败任务的去向。一般有三种方式:直接丢弃、重试、写入失败表。入门项目可以先实现有限次数重试,避免无限循环。

func handleWithRetry(ctx context.Context, job Job, max int) error {
	var last error
	for attempt := 1; attempt <= max; attempt++ {
		if err := handle(ctx, job); err != nil {
			last = err
			select {
			case <-ctx.Done():
				return ctx.Err()
			case <-time.After(time.Duration(attempt) * 200 * time.Millisecond):
			}
			continue
		}
		return nil
	}
	return fmt.Errorf("job %d failed after %d attempts: %w", job.ID, max, last)
}

重试间隔不要写成固定的零等待,否则下游服务抖动时 worker 会快速打满数据库或第三方 API。即使只是 200ms、400ms、600ms 这样的简单退避,也比立刻重试友好很多。

限制并发

worker 数量不是越多越好。CPU 密集任务可以接近 CPU 核数,IO 密集任务可以多一些,但最终要看数据库连接池、外部服务限流和机器资源。一个简单的经验是:先用小并发上线,再通过指标调大。

func runPool(ctx context.Context, n int, jobs <-chan Job) {
	var wg sync.WaitGroup
	wg.Add(n)

	for i := 0; i < n; i++ {
		id := i + 1
		go func() {
			defer wg.Done()
			worker(ctx, id, jobs)
		}()
	}

	wg.Wait()
}

如果 worker 内部还会访问数据库,记得让 SetMaxOpenConns 和 worker 数量匹配。二十个 worker 配五个数据库连接,很多任务会卡在等待连接;五十个 worker 配五十个连接,又可能把数据库压垮。并发控制要从整条链路看。

退出时的可观测性

服务关闭时,只打印“退出了”信息不够。最好记录收到信号、停止取新任务、等待 worker、剩余任务数量、最终耗时。排查发布卡住时,这些日志比猜测有用。

started := time.Now()
log.Println("worker pool stopping")
cancel()
wg.Wait()
log.Printf("worker pool stopped in %s", time.Since(started))

如果使用队列系统,还可以在退出前停止拉取新消息,等正在处理的消息 ack 完成。不要在收到信号后继续抢新任务,否则发布窗口会被拉长。

小结

Go worker 要能体面退出,循环里必须监听 ctx.Done(),任务处理要传递 context,sleep 和 ticker 也要可取消。单个任务可以派生超时,多个 worker 用 WaitGroup 等待退出。

后台任务的正确性不只在“能处理任务”,也在“该停的时候能停”。把取消路径写进设计和测试,worker 才适合长期运行。

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页