Go JSON 流式处理入门:不用一次把大文件读进内存

本文讲解 Go encoding/json 的流式 Decoder 和 Encoder,适合处理大 JSON 数组、日志导出和 HTTP 请求体。

大 JSON 不应该总是 ReadAll

很多入门代码会这样解析 JSON:

data, err := io.ReadAll(r)
if err != nil {
	return err
}

var users []User
if err := json.Unmarshal(data, &users); err != nil {
	return err
}

小数据没问题,但如果请求体或文件很大,这会一次性把所有内容读进内存。导入几万行用户、处理日志导出、读取大数组时,更好的方式是使用 json.Decoder 流式处理。它可以从 io.Reader 一边读一边解码,不必先把全部数据变成 []byte

这篇文章讲大 JSON 数组的流式读取,以及如何逐条写出 JSON。

逐个读取数组元素

假设 JSON 文件是一个数组:

[
  {"email":"a@example.com","name":"小林"},
  {"email":"b@example.com","name":"阿周"}
]

结构体:

type User struct {
	Email string `json:"email"`
	Name  string `json:"name"`
}

流式读取:

func ImportUsers(r io.Reader, handle func(User) error) error {
	decoder := json.NewDecoder(r)

	token, err := decoder.Token()
	if err != nil {
		return fmt.Errorf("read start token: %w", err)
	}
	if token != json.Delim('[') {
		return fmt.Errorf("expected json array")
	}

	for decoder.More() {
		var user User
		if err := decoder.Decode(&user); err != nil {
			return fmt.Errorf("decode user: %w", err)
		}
		if err := handle(user); err != nil {
			return err
		}
	}

	token, err = decoder.Token()
	if err != nil {
		return fmt.Errorf("read end token: %w", err)
	}
	if token != json.Delim(']') {
		return fmt.Errorf("expected array end")
	}
	return nil
}

调用:

file, err := os.Open("users.json")
if err != nil {
	return err
}
defer file.Close()

err = ImportUsers(file, func(user User) error {
	fmt.Println(user.Email)
	return nil
})

这样处理大文件时,内存压力会小很多。

边读边批量写数据库

真实导入时,你可能不想每条都写一次数据库,而是批量写:

func ImportUsersBatch(r io.Reader, save func([]User) error) error {
	const batchSize = 500
	batch := make([]User, 0, batchSize)

	return ImportUsers(r, func(user User) error {
		batch = append(batch, user)
		if len(batch) < batchSize {
			return nil
		}

		if err := save(batch); err != nil {
			return err
		}
		batch = batch[:0]
		return nil
	})
}

上面代码还缺最后一批保存。可以稍微调整:

func ImportUsersBatch(r io.Reader, save func([]User) error) error {
	const batchSize = 500
	batch := make([]User, 0, batchSize)

	err := ImportUsers(r, func(user User) error {
		batch = append(batch, user)
		if len(batch) == batchSize {
			if err := save(batch); err != nil {
				return err
			}
			batch = batch[:0]
		}
		return nil
	})
	if err != nil {
		return err
	}
	if len(batch) > 0 {
		return save(batch)
	}
	return nil
}

这类结构很适合导入任务。

流式写出 JSON

如果要导出数据,也可以逐条编码:

func ExportUsers(w io.Writer, users []User) error {
	encoder := json.NewEncoder(w)

	if _, err := w.Write([]byte("[\n")); err != nil {
		return err
	}
	for i, user := range users {
		if i > 0 {
			if _, err := w.Write([]byte(",\n")); err != nil {
				return err
			}
		}
		if err := encoder.Encode(user); err != nil {
			return err
		}
	}
	if _, err := w.Write([]byte("]\n")); err != nil {
		return err
	}
	return nil
}

如果数据来自数据库分页,你可以一页页查询,一条条写到 HTTP 响应,避免把全部结果放进内存。导出接口还要注意超时、客户端断开和权限控制。

请求体大小仍然要限制

流式解码不等于可以接受无限输入。HTTP handler 里仍然应该限制大小:

r.Body = http.MaxBytesReader(w, r.Body, 20<<20) // 20 MB
defer r.Body.Close()

然后再把 r.Body 交给 Decoder。大文件导入最好有明确大小上限和异步处理流程,不要让一个 HTTP 请求无限跑。

错误行号和部分成功

导入类接口还会遇到一个产品问题:第 738 条数据失败时,前 737 条要不要保存?如果保存了,用户重新上传会不会重复?这些问题和 JSON 解码方式无关,但流式处理会让它们更早暴露。比较常见的做法是按批次校验,整批成功后再写入;或者给每条记录设计幂等键,让重复导入不会产生重复数据。

如果希望错误更友好,可以在循环里维护行号或序号:

index := 0
for dec.More() {
	index++
	var item ImportUser
	if err := dec.Decode(&item); err != nil {
		return fmt.Errorf("decode item %d: %w", index, err)
	}
	if err := validate(item); err != nil {
		return fmt.Errorf("validate item %d: %w", index, err)
	}
}

这样用户看到的是“第 738 条邮箱为空”,而不是一段底层 JSON 错误。工程上也更容易排查,因为日志里有明确位置。流式处理不只是省内存,还要求你把错误处理、事务边界和幂等策略想清楚。

小结

json.Decoder 可以从 io.Reader 流式读取 JSON,适合大数组、导入任务和 HTTP 请求体。你可以用 TokenMore 逐个处理数组元素,边读边写数据库。导出时也可以用 json.Encoder 逐条写出。

流式处理能减少内存压力,但仍然要限制输入大小、处理错误、考虑超时。它不是复杂技巧,而是面对大数据时更稳的基本功。

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页