Go io.Pipe 入门:边生成边上传的流式处理

用导出 CSV 并上传的例子讲 io.Pipe 的基本用法、错误传播、关闭顺序和适合流式处理的场景。

很多导出功能会先把全部数据生成到内存或临时文件,再上传到对象存储。数据少时没问题,数据大时就会占用很多内存或磁盘。Go 的 io.Pipe 可以把写入端和读取端连接起来:一边生成,一边被另一边读取,适合流式处理。

本文用“生成 CSV 并上传”的例子讲 io.Pipe。它不神秘,但要注意错误传播和关闭顺序,否则很容易卡住。

io.Pipe 的直觉

io.Pipe 返回一个 reader 和 writer:

reader, writer := io.Pipe()

写入 writer 的数据,可以从 reader 读出来。它没有内部大缓冲,读写双方会互相等待。这让它适合连接两个组件:一个只会写到 io.Writer,另一个只会从 io.Reader 读取。

先写一个 CSV 生成函数

type User struct {
	ID    int64
	Name  string
	Email string
}

func WriteUsersCSV(w io.Writer, users <-chan User) error {
	cw := csv.NewWriter(w)
	if err := cw.Write([]string{"id", "name", "email"}); err != nil {
		return err
	}
	for user := range users {
		record := []string{
			strconv.FormatInt(user.ID, 10),
			user.Name,
			user.Email,
		}
		if err := cw.Write(record); err != nil {
			return err
		}
	}
	cw.Flush()
	return cw.Error()
}

这个函数只依赖 io.Writer,不关心写到文件、HTTP 响应还是 pipe。这样的函数更容易复用。

上传函数只需要 Reader

假设上传接口:

type Uploader interface {
	Upload(ctx context.Context, key string, r io.Reader) error
}

它从 reader 读取数据并上传。现在可以用 io.Pipe 把生成和上传接起来:

func ExportAndUpload(ctx context.Context, uploader Uploader, key string, users <-chan User) error {
	pr, pw := io.Pipe()

	errCh := make(chan error, 1)
	go func() {
		defer close(errCh)
		err := WriteUsersCSV(pw, users)
		if err != nil {
			_ = pw.CloseWithError(err)
			errCh <- err
			return
		}
		errCh <- pw.Close()
	}()

	uploadErr := uploader.Upload(ctx, key, pr)
	writeErr := <-errCh
	if uploadErr != nil {
		return uploadErr
	}
	return writeErr
}

生成 goroutine 写入 pipe writer,上传函数从 pipe reader 读取。这样不需要把完整 CSV 放进内存。

CloseWithError 很重要

如果生成 CSV 时失败,应该用 CloseWithError

_ = pw.CloseWithError(err)

这样 reader 那边能读到错误。否则上传方可能只看到 EOF,误以为文件正常结束。流式处理里,错误传播是第一等问题。

同理,如果上传方失败,生成方可能还在写。更完整的实现可以在上传失败时关闭 reader,或者用 context 通知生成方停止。示例保持简单,但你要知道两边都可能出错。

什么时候适合 Pipe

适合:

  • 导出大 CSV 后直接上传
  • 压缩数据后写 HTTP 响应
  • 边生成边计算 hash
  • 把只支持 Writer 的组件接到只支持 Reader 的组件

不适合:

  • 数据很小,直接 bytes.Buffer 更简单
  • 需要反复读取同一份数据
  • 需要随机访问内容
  • 读写双方生命周期很难管理

不要为了“流式”而让简单代码复杂化。几 KB 的数据用 buffer 更清楚;几百 MB 的导出才值得考虑 pipe。

避免 goroutine 卡住

io.Pipe 没有大缓冲。如果 reader 不读,writer 会卡住;如果 writer 不写,reader 会等。一定要保证两端都能正常结束。

一个常见错误是上传前先等写入完成:

// 错误思路:WriteUsersCSV 会因为没人读而卡住
err := WriteUsersCSV(pw, users)
uploadErr := uploader.Upload(ctx, key, pr)

写和读必须并发进行。通常写端放 goroutine,读端在当前 goroutine 交给上传函数。

测试流式函数

可以写一个 fake uploader:

type fakeUploader struct {
	data bytes.Buffer
}

func (u *fakeUploader) Upload(ctx context.Context, key string, r io.Reader) error {
	_, err := io.Copy(&u.data, r)
	return err
}

测试:

func TestExportAndUpload(t *testing.T) {
	users := make(chan User, 1)
	users <- User{ID: 1, Name: "A", Email: "a@example.com"}
	close(users)

	uploader := &fakeUploader{}
	err := ExportAndUpload(context.Background(), uploader, "users.csv", users)
	if err != nil {
		t.Fatal(err)
	}
	if !strings.Contains(uploader.data.String(), "a@example.com") {
		t.Fatalf("csv = %s", uploader.data.String())
	}
}

测试不需要真的连对象存储。只要验证 reader 收到的内容正确,pipe 连接就基本可信。

加上 context 取消

如果上传请求被取消,生成端也应该尽快停止。可以让生成数据的来源监听 context,比如用户 channel 由上游关闭,或者写入函数在循环中检查:

func WriteUsersCSVContext(ctx context.Context, w io.Writer, users <-chan User) error {
	cw := csv.NewWriter(w)
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case user, ok := <-users:
			if !ok {
				cw.Flush()
				return cw.Error()
			}
			if err := cw.Write([]string{user.Name, user.Email}); err != nil {
				return err
			}
		}
	}
}

流式任务最怕一端已经失败,另一端还在努力工作。context、CloseWithError 和明确的 goroutine 退出路径要一起设计。只要涉及 pipe,就要问一句:读端失败时写端怎么停?写端失败时读端怎么知道?

小结

io.Pipe 可以把一个写入流和一个读取流连接起来,适合边生成边上传、边压缩边响应这类场景。它能减少内存和临时文件使用,但要求你认真处理并发、关闭和错误传播。

入门时先记住三点:读写要并发,写失败用 CloseWithError,简单小数据用 bytes.Buffer 更直接。用在合适场景里,io.Pipe 是 Go IO 模型里很实用的一块拼图。

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页