很多导出功能会先把全部数据生成到内存或临时文件,再上传到对象存储。数据少时没问题,数据大时就会占用很多内存或磁盘。Go 的 io.Pipe 可以把写入端和读取端连接起来:一边生成,一边被另一边读取,适合流式处理。
本文用“生成 CSV 并上传”的例子讲 io.Pipe。它不神秘,但要注意错误传播和关闭顺序,否则很容易卡住。
io.Pipe 的直觉
io.Pipe 返回一个 reader 和 writer:
reader, writer := io.Pipe()
写入 writer 的数据,可以从 reader 读出来。它没有内部大缓冲,读写双方会互相等待。这让它适合连接两个组件:一个只会写到 io.Writer,另一个只会从 io.Reader 读取。
先写一个 CSV 生成函数
type User struct {
ID int64
Name string
Email string
}
func WriteUsersCSV(w io.Writer, users <-chan User) error {
cw := csv.NewWriter(w)
if err := cw.Write([]string{"id", "name", "email"}); err != nil {
return err
}
for user := range users {
record := []string{
strconv.FormatInt(user.ID, 10),
user.Name,
user.Email,
}
if err := cw.Write(record); err != nil {
return err
}
}
cw.Flush()
return cw.Error()
}
这个函数只依赖 io.Writer,不关心写到文件、HTTP 响应还是 pipe。这样的函数更容易复用。
上传函数只需要 Reader
假设上传接口:
type Uploader interface {
Upload(ctx context.Context, key string, r io.Reader) error
}
它从 reader 读取数据并上传。现在可以用 io.Pipe 把生成和上传接起来:
func ExportAndUpload(ctx context.Context, uploader Uploader, key string, users <-chan User) error {
pr, pw := io.Pipe()
errCh := make(chan error, 1)
go func() {
defer close(errCh)
err := WriteUsersCSV(pw, users)
if err != nil {
_ = pw.CloseWithError(err)
errCh <- err
return
}
errCh <- pw.Close()
}()
uploadErr := uploader.Upload(ctx, key, pr)
writeErr := <-errCh
if uploadErr != nil {
return uploadErr
}
return writeErr
}
生成 goroutine 写入 pipe writer,上传函数从 pipe reader 读取。这样不需要把完整 CSV 放进内存。
CloseWithError 很重要
如果生成 CSV 时失败,应该用 CloseWithError:
_ = pw.CloseWithError(err)
这样 reader 那边能读到错误。否则上传方可能只看到 EOF,误以为文件正常结束。流式处理里,错误传播是第一等问题。
同理,如果上传方失败,生成方可能还在写。更完整的实现可以在上传失败时关闭 reader,或者用 context 通知生成方停止。示例保持简单,但你要知道两边都可能出错。
什么时候适合 Pipe
适合:
- 导出大 CSV 后直接上传
- 压缩数据后写 HTTP 响应
- 边生成边计算 hash
- 把只支持 Writer 的组件接到只支持 Reader 的组件
不适合:
- 数据很小,直接
bytes.Buffer更简单 - 需要反复读取同一份数据
- 需要随机访问内容
- 读写双方生命周期很难管理
不要为了“流式”而让简单代码复杂化。几 KB 的数据用 buffer 更清楚;几百 MB 的导出才值得考虑 pipe。
避免 goroutine 卡住
io.Pipe 没有大缓冲。如果 reader 不读,writer 会卡住;如果 writer 不写,reader 会等。一定要保证两端都能正常结束。
一个常见错误是上传前先等写入完成:
// 错误思路:WriteUsersCSV 会因为没人读而卡住
err := WriteUsersCSV(pw, users)
uploadErr := uploader.Upload(ctx, key, pr)
写和读必须并发进行。通常写端放 goroutine,读端在当前 goroutine 交给上传函数。
测试流式函数
可以写一个 fake uploader:
type fakeUploader struct {
data bytes.Buffer
}
func (u *fakeUploader) Upload(ctx context.Context, key string, r io.Reader) error {
_, err := io.Copy(&u.data, r)
return err
}
测试:
func TestExportAndUpload(t *testing.T) {
users := make(chan User, 1)
users <- User{ID: 1, Name: "A", Email: "a@example.com"}
close(users)
uploader := &fakeUploader{}
err := ExportAndUpload(context.Background(), uploader, "users.csv", users)
if err != nil {
t.Fatal(err)
}
if !strings.Contains(uploader.data.String(), "a@example.com") {
t.Fatalf("csv = %s", uploader.data.String())
}
}
测试不需要真的连对象存储。只要验证 reader 收到的内容正确,pipe 连接就基本可信。
加上 context 取消
如果上传请求被取消,生成端也应该尽快停止。可以让生成数据的来源监听 context,比如用户 channel 由上游关闭,或者写入函数在循环中检查:
func WriteUsersCSVContext(ctx context.Context, w io.Writer, users <-chan User) error {
cw := csv.NewWriter(w)
for {
select {
case <-ctx.Done():
return ctx.Err()
case user, ok := <-users:
if !ok {
cw.Flush()
return cw.Error()
}
if err := cw.Write([]string{user.Name, user.Email}); err != nil {
return err
}
}
}
}
流式任务最怕一端已经失败,另一端还在努力工作。context、CloseWithError 和明确的 goroutine 退出路径要一起设计。只要涉及 pipe,就要问一句:读端失败时写端怎么停?写端失败时读端怎么知道?
小结
io.Pipe 可以把一个写入流和一个读取流连接起来,适合边生成边上传、边压缩边响应这类场景。它能减少内存和临时文件使用,但要求你认真处理并发、关闭和错误传播。
入门时先记住三点:读写要并发,写失败用 CloseWithError,简单小数据用 bytes.Buffer 更直接。用在合适场景里,io.Pipe 是 Go IO 模型里很实用的一块拼图。
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。