引言
gRPC不仅支持传统的请求-响应模式,还提供了强大的流式通信能力。流式RPC在实时数据推送、大文件传输、双向通信等场景中具有显著优势。
gRPC四种通信模式
gRPC通信模式对比:
┌─────────────────────────────────────────┐
│ 1. Unary RPC(一元RPC) │
│ Client ──── Request ────▶ Server │
│ Client ◀─── Response ──── Server │
│ 最常用,类似HTTP请求 │
│ │
│ 2. Server Streaming(服务端流) │
│ Client ──── Request ────▶ Server │
│ Client ◀─── Stream ────── Server │
│ 服务端持续推送数据 │
│ │
│ 3. Client Streaming(客户端流) │
│ Client ──── Stream ─────▶ Server │
│ Client ◀─── Response ──── Server │
│ 客户端持续上传数据 │
│ │
│ 4. Bidirectional Streaming(双向流) │
│ Client ◀──▶ Stream ◀──▶ Server │
│ 双方独立地读写流 │
│ 最灵活,最复杂 │
└─────────────────────────────────────────┘
Protocol Buffer定义
syntax = "proto3";
package streamservice;
option go_package = "github.com/example/streamservice";
// 一元RPC
service ChatService {
rpc SendMessage(Message) returns (MessageAck);
// 服务端流:订阅频道消息
rpc SubscribeChannel(ChannelRequest) returns (stream Message);
// 客户端流:批量发送消息
rpc SendMessages(stream Message) returns (BatchAck);
// 双向流:实时聊天
rpc Chat(stream Message) returns (stream Message);
}
// 日志服务
service LogService {
// 服务端流:实时日志
rpc StreamLogs(LogRequest) returns (stream LogEntry);
// 客户端流:批量上报日志
rpc UploadLogs(stream LogEntry) returns (UploadResult);
}
// 文件服务
service FileService {
// 客户端流:上传文件
rpc UploadFile(stream FileChunk) returns (UploadResult);
// 服务端流:下载文件
rpc DownloadFile(FileRequest) returns (stream FileChunk);
// 双向流:断点续传
rpc TransferFile(stream FileTransferMsg) returns (stream FileTransferMsg);
}
message Message {
string id = 1;
string channel_id = 2;
string user_id = 3;
string content = 4;
int64 timestamp = 5;
MessageType type = 6;
}
enum MessageType {
TEXT = 0;
IMAGE = 1;
FILE = 2;
SYSTEM = 3;
}
message MessageAck {
string message_id = 1;
bool success = 2;
string error = 3;
}
message BatchAck {
int32 total_received = 1;
int32 total_success = 2;
repeated string failed_ids = 3;
}
message ChannelRequest {
string channel_id = 1;
string user_id = 2;
int64 since_timestamp = 3;
}
message LogRequest {
string service_name = 1;
LogLevel min_level = 2;
string pattern = 3;
}
enum LogLevel {
DEBUG = 0;
INFO = 1;
WARN = 2;
ERROR = 3;
}
message LogEntry {
string service_name = 1;
LogLevel level = 2;
string message = 3;
int64 timestamp = 4;
map<string, string> metadata = 5;
}
message UploadResult {
bool success = 1;
int64 total_received = 2;
string file_id = 3;
}
message FileChunk {
string file_id = 1;
int64 offset = 2;
bytes data = 3;
bool is_last = 4;
FileMeta meta = 5;
}
message FileMeta {
string filename = 1;
int64 total_size = 2;
string content_type = 3;
string checksum = 4;
}
message FileRequest {
string file_id = 1;
int64 offset = 2;
int32 chunk_size = 3;
}
message FileTransferMsg {
oneof payload {
TransferRequest request = 1;
FileChunk chunk = 2;
TransferAck ack = 3;
TransferComplete complete = 4;
}
}
message TransferRequest {
string file_id = 1;
int64 offset = 2;
Direction direction = 3;
}
enum Direction {
UPLOAD = 0;
DOWNLOAD = 1;
}
message TransferAck {
int64 received_offset = 1;
bool success = 2;
}
message TransferComplete {
string file_id = 1;
string checksum = 2;
}
服务端流实现
实时消息订阅
package server
import (
"fmt"
"log"
"sync"
"time"
pb "github.com/example/streamservice/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type ChatServer struct {
pb.UnimplementedChatServiceServer
mu sync.RWMutex
channels map[string][]chan *pb.Message
messageStore map[string][]*pb.Message
}
func NewChatServer() *ChatServer {
return &ChatServer{
channels: make(map[string][]chan *pb.Message),
messageStore: make(map[string][]*pb.Message),
}
}
// 服务端流:订阅频道消息
func (s *ChatServer) SubscribeChannel(req *pb.ChannelRequest, stream pb.ChatService_SubscribeChannelServer) error {
channelID := req.ChannelId
userID := req.UserId
log.Printf("User %s subscribing to channel %s", userID, channelID)
// 创建消息通道
msgChan := make(chan *pb.Message, 100)
s.mu.Lock()
s.channels[channelID] = append(s.channels[channelID], msgChan)
s.mu.Unlock()
// 确保退出时清理
defer func() {
s.mu.Lock()
chans := s.channels[channelID]
for i, ch := range chans {
if ch == msgChan {
s.channels[channelID] = append(chans[:i], chans[i+1:]...)
break
}
}
s.mu.Unlock()
close(msgChan)
log.Printf("User %s unsubscribed from channel %s", userID, channelID)
}()
// 先发送历史消息
s.mu.RLock()
if messages, ok := s.messageStore[channelID]; ok {
for _, msg := range messages {
if msg.Timestamp > req.SinceTimestamp {
if err := stream.Send(msg); err != nil {
s.mu.RUnlock()
return err
}
}
}
}
s.mu.RUnlock()
// 持续监听新消息
for {
select {
case msg, ok := <-msgChan:
if !ok {
return nil
}
if err := stream.Send(msg); err != nil {
return err
}
case <-stream.Context().Done():
return stream.Context().Err()
}
}
}
// 发送消息(会触发流推送)
func (s *ChatServer) SendMessage(ctx context.Context, msg *pb.Message) (*pb.MessageAck, error) {
msg.Id = generateID()
msg.Timestamp = time.Now().UnixMilli()
// 存储消息
s.mu.Lock()
s.messageStore[msg.ChannelId] = append(s.messageStore[msg.ChannelId], msg)
// 广播给所有订阅者
if chans, ok := s.channels[msg.ChannelId]; ok {
for _, ch := range chans {
select {
case ch <- msg:
default:
log.Printf("Warning: subscriber channel full, dropping message")
}
}
}
s.mu.Unlock()
return &pb.MessageAck{
MessageId: msg.Id,
Success: true,
}, nil
}
实时日志流
type LogServer struct {
pb.UnimplementedLogServiceServer
logCollector *LogCollector
}
func (s *LogServer) StreamLogs(req *pb.LogRequest, stream pb.LogService_StreamLogsServer) error {
serviceName := req.ServiceName
minLevel := req.MinLevel
pattern := req.Pattern
log.Printf("Streaming logs for service=%s, minLevel=%v, pattern=%s",
serviceName, minLevel, pattern)
// 创建日志过滤器
filter := &LogFilter{
ServiceName: serviceName,
MinLevel: minLevel,
Pattern: pattern,
}
// 订阅日志流
logChan := s.logCollector.Subscribe(filter)
defer s.logCollector.Unsubscribe(filter, logChan)
for {
select {
case entry := <-logChan:
if err := stream.Send(entry); err != nil {
return err
}
case <-stream.Context().Done():
return nil
}
}
}
type LogCollector struct {
mu sync.RWMutex
subscribers map[*LogFilter]chan *pb.LogEntry
}
type LogFilter struct {
ServiceName string
MinLevel pb.LogLevel
Pattern string
}
func (c *LogCollector) Subscribe(filter *LogFilter) chan *pb.LogEntry {
c.mu.Lock()
defer c.mu.Unlock()
ch := make(chan *pb.LogEntry, 256)
c.subscribers[filter] = ch
return ch
}
func (c *LogCollector) Unsubscribe(filter *LogFilter, ch chan *pb.LogEntry) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.subscribers, filter)
close(ch)
}
func (c *LogCollector) Collect(entry *pb.LogEntry) {
c.mu.RLock()
defer c.mu.RUnlock()
for filter, ch := range c.subscribers {
if matchesFilter(entry, filter) {
select {
case ch <- entry:
default:
// 通道满,丢弃
}
}
}
}
func matchesFilter(entry *pb.LogEntry, filter *LogFilter) bool {
if filter.ServiceName != "" && entry.ServiceName != filter.ServiceName {
return false
}
if entry.Level < filter.MinLevel {
return false
}
if filter.Pattern != "" {
matched, _ := regexp.MatchString(filter.Pattern, entry.Message)
return matched
}
return true
}
客户端流实现
批量消息发送
func (s *ChatServer) SendMessages(stream pb.ChatService_SendMessagesServer) error {
var totalReceived int32
var totalSuccess int32
var failedIDs []string
for {
msg, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.BatchAck{
TotalReceived: totalReceived,
TotalSuccess: totalSuccess,
FailedIds: failedIDs,
})
}
if err != nil {
return err
}
totalReceived++
// 处理消息
msg.Id = generateID()
msg.Timestamp = time.Now().UnixMilli()
if err := s.processMessage(msg); err != nil {
failedIDs = append(failedIDs, msg.Id)
log.Printf("Failed to process message: %v", err)
continue
}
totalSuccess++
}
}
日志批量上传
func (s *LogServer) UploadLogs(stream pb.LogService_UploadLogsServer) error {
var totalReceived int64
batch := make([]*pb.LogEntry, 0, 100)
for {
entry, err := stream.Recv()
if err == io.EOF {
// 处理剩余的批次
if len(batch) > 0 {
if err := s.flushBatch(batch); err != nil {
log.Printf("Failed to flush final batch: %v", err)
}
}
return stream.SendAndClose(&pb.UploadResult{
Success: true,
TotalReceived: totalReceived,
})
}
if err != nil {
return err
}
totalReceived++
batch = append(batch, entry)
// 达到批次大小,刷盘
if len(batch) >= 100 {
if err := s.flushBatch(batch); err != nil {
log.Printf("Failed to flush batch: %v", err)
}
batch = batch[:0]
}
}
}
func (s *LogServer) flushBatch(batch []*pb.LogEntry) error {
// 批量写入数据库或Elasticsearch
return s.logStorage.BulkInsert(batch)
}
文件上传
func (s *FileServer) UploadFile(stream pb.FileService_UploadFileServer) error {
var fileID string
var file *os.File
var totalBytes int64
var meta *pb.FileMeta
for {
chunk, err := stream.Recv()
if err == io.EOF {
file.Close()
// 验证文件大小
if meta != nil && totalBytes != meta.TotalSize {
os.Remove(file.Name())
return status.Errorf(codes.DataLoss,
"file size mismatch: expected %d, got %d",
meta.TotalSize, totalBytes)
}
// 验证校验和
if meta != nil && meta.Checksum != "" {
actualChecksum := calculateChecksum(file.Name())
if actualChecksum != meta.Checksum {
os.Remove(file.Name())
return status.Errorf(codes.DataLoss,
"checksum mismatch: expected %s, got %s",
meta.Checksum, actualChecksum)
}
}
return stream.SendAndClose(&pb.UploadResult{
Success: true,
TotalReceived: totalBytes,
FileId: fileID,
})
}
if err != nil {
return err
}
// 第一个chunk包含文件元信息
if file == nil {
fileID = chunk.FileId
if fileID == "" {
fileID = generateID()
}
if chunk.Meta != nil {
meta = chunk.Meta
}
filePath := fmt.Sprintf("/storage/files/%s", fileID)
file, err = os.Create(filePath)
if err != nil {
return status.Errorf(codes.Internal, "failed to create file: %v", err)
}
}
// 写入数据
n, err := file.WriteAt(chunk.Data, chunk.Offset)
if err != nil {
file.Close()
os.Remove(file.Name())
return status.Errorf(codes.Internal, "failed to write chunk: %v", err)
}
totalBytes += int64(n)
if chunk.IsLast {
file.Close()
return stream.SendAndClose(&pb.UploadResult{
Success: true,
TotalReceived: totalBytes,
FileId: fileID,
})
}
}
}
双向流实现
实时聊天
func (s *ChatServer) Chat(stream pb.ChatService_ChatServer) error {
// 获取用户信息(从metadata)
md, ok := metadata.FromIncomingContext(stream.Context())
if !ok {
return status.Error(codes.Unauthenticated, "missing metadata")
}
userID := md.Get("user_id")[0]
channelID := md.Get("channel_id")[0]
log.Printf("User %s joined chat in channel %s", userID, channelID)
// 注册到频道
msgChan := make(chan *pb.Message, 100)
s.mu.Lock()
s.channels[channelID] = append(s.channels[channelID], msgChan)
s.mu.Unlock()
// 发送加入通知
joinMsg := &pb.Message{
Id: generateID(),
ChannelId: channelID,
UserId: "system",
Content: fmt.Sprintf("User %s joined the chat", userID),
Timestamp: time.Now().UnixMilli(),
Type: pb.MessageType_SYSTEM,
}
s.broadcastToChannel(channelID, joinMsg)
// 退出时清理
defer func() {
s.mu.Lock()
chans := s.channels[channelID]
for i, ch := range chans {
if ch == msgChan {
s.channels[channelID] = append(chans[:i], chans[i+1:]...)
break
}
}
s.mu.Unlock()
close(msgChan)
// 发送离开通知
leaveMsg := &pb.Message{
Id: generateID(),
ChannelId: channelID,
UserId: "system",
Content: fmt.Sprintf("User %s left the chat", userID),
Timestamp: time.Now().UnixMilli(),
Type: pb.MessageType_SYSTEM,
}
s.broadcastToChannel(channelID, leaveMsg)
log.Printf("User %s left chat in channel %s", userID, channelID)
}()
// 双向通信
errChan := make(chan error, 2)
// 接收消息
go func() {
for {
msg, err := stream.Recv()
if err != nil {
errChan <- err
return
}
msg.Id = generateID()
msg.UserId = userID
msg.Timestamp = time.Now().UnixMilli()
s.broadcastToChannel(channelID, msg)
}
}()
// 发送消息
go func() {
for msg := range msgChan {
if err := stream.Send(msg); err != nil {
errChan <- err
return
}
}
}()
// 等待错误或上下文取消
select {
case err := <-errChan:
if err == io.EOF {
return nil
}
return err
case <-stream.Context().Done():
return stream.Context().Err()
}
}
客户端调用示例
订阅消息流
package main
import (
"context"
"io"
"log"
pb "github.com/example/streamservice/proto"
"google.golang.org/grpc"
)
func subscribeMessages() {
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := pb.NewChatServiceClient(conn)
stream, err := client.SubscribeChannel(context.Background(), &pb.ChannelRequest{
ChannelId: "general",
UserId: "user123",
SinceTimestamp: 0,
})
if err != nil {
log.Fatal(err)
}
for {
msg, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Printf("Error receiving message: %v", err)
break
}
fmt.Printf("[%s] %s: %s\n",
time.UnixMilli(msg.Timestamp).Format("15:04:05"),
msg.UserId,
msg.Content)
}
}
双向聊天客户端
func chatClient() {
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := pb.NewChatServiceClient(conn)
ctx := metadata.AppendToOutgoingContext(context.Background(),
"user_id", "user123",
"channel_id", "general")
stream, err := client.Chat(ctx)
if err != nil {
log.Fatal(err)
}
// 接收消息
go func() {
for {
msg, err := stream.Recv()
if err != nil {
log.Printf("Error: %v", err)
return
}
fmt.Printf("[%s] %s: %s\n",
time.UnixMilli(msg.Timestamp).Format("15:04:05"),
msg.UserId, msg.Content)
}
}()
// 发送消息(从stdin读取)
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
msg := &pb.Message{
Content: scanner.Text(),
Type: pb.MessageType_TEXT,
}
if err := stream.Send(msg); err != nil {
log.Printf("Send error: %v", err)
break
}
}
}
流式通信最佳实践
流式通信最佳实践:
┌─────────────────────────────────────────┐
│ 1. 背压控制 │
│ - 使用带缓冲的channel │
│ - 监控发送队列长度 │
│ - 队列满时丢弃或断开连接 │
│ │
│ 2. 心跳保活 │
│ - 定期发送ping/pong │
│ - 检测连接是否存活 │
│ - 超时自动断开 │
│ │
│ 3. 流量控制 │
│ - 使用gRPC窗口大小控制 │
│ - 限制单个消息大小 │
│ - 限制并发流数量 │
│ │
│ 4. 错误处理 │
│ - 优雅处理stream关闭 │
│ - 支持断线重连 │
│ - 记录详细的错误日志 │
│ │
│ 5. 资源清理 │
│ - 使用defer确保资源释放 │
│ - 监控活跃流数量 │
│ - 设置合理的超时时间 │
│ │
│ 6. 消息确认 │
│ - 重要消息需要ACK │
│ - 支持消息重传 │
│ - 记录消息序列号 │
└─────────────────────────────────────────┘
总结
gRPC流式通信选型
| 模式 | 适用场景 | 复杂度 | 性能 |
|---|---|---|---|
| Unary | 简单请求-响应 | 低 | 高 |
| Server Stream | 实时推送、日志流 | 中 | 高 |
| Client Stream | 批量上传、数据采集 | 中 | 高 |
| Bidirectional | 聊天、协作编辑 | 高 | 中 |
关键原则
- 选择合适的模式:不要过度使用双向流
- 实现背压控制:防止慢消费者拖垮系统
- 心跳保活:检测并清理死连接
- 优雅关闭:确保资源正确释放
- 监控指标:跟踪活跃流数量、消息吞吐量
- 错误恢复:支持断线重连和消息重传
延伸阅读
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。