gRPC 基础:构建高性能微服务通信

学习使用 gRPC 和 Protocol Buffers 构建高效的微服务通信,掌握一元、服务器流、客户端流和双向流 RPC

gRPC 基础:构建高性能微服务通信

在微服务架构中,服务间通信是关键。RESTful API 虽然流行,但在性能、类型安全和代码生成方面存在局限。gRPC 作为 Google 开源的高性能 RPC 框架,提供了更优的解决方案。

本文将介绍如何使用 gRPC 和 Protocol Buffers 构建高效的微服务通信。

什么是 gRPC?

gRPC 是一个现代开源的高性能 RPC 框架,具有以下特点:

  • 基于 HTTP/2:支持多路复用、头部压缩、双向流
  • Protocol Buffers:高效的二进制序列化格式
  • 强类型:通过 .proto 文件定义接口
  • 代码生成:自动生成客户端和服务端代码
  • 跨语言:支持多种编程语言

安装工具

# 安装 Protocol Buffers 编译器
brew install protobuf  # macOS
# 或从 https://github.com/protocolbuffers/protobuf/releases 下载

# 安装 Go 插件
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

定义服务(.proto 文件)

syntax = "proto3";

package user;

option go_package = "example.com/user";

// 用户服务
service UserService {
  // 一元 RPC
  rpc GetUser (GetUserRequest) returns (User);
  
  // 服务器流 RPC
  rpc ListUsers (ListUsersRequest) returns (stream User);
  
  // 客户端流 RPC
  rpc CreateUser (stream CreateUserRequest) returns (CreateUserResponse);
  
  // 双向流 RPC
  rpc Chat (stream ChatMessage) returns (stream ChatMessage);
}

// 用户消息
message User {
  int32 id = 1;
  string name = 2;
  string email = 3;
  int32 age = 4;
}

// 请求消息
message GetUserRequest {
  int32 id = 1;
}

message ListUsersRequest {
  int32 page = 1;
  int32 page_size = 2;
}

message CreateUserRequest {
  string name = 1;
  string email = 2;
  int32 age = 3;
}

message CreateUserResponse {
  int32 created_count = 1;
}

message ChatMessage {
  string sender = 1;
  string content = 2;
}

生成代码

protoc --go_out=. --go-grpc_out=. user.proto

这会生成两个文件:

  • user.pb.go:消息类型定义
  • user_grpc.pb.go:服务接口和客户端

实现服务端

package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "net"
    "sync"
    
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    
    pb "example.com/user"
)

type userServer struct {
    pb.UnimplementedUserServiceServer
    mu    sync.RWMutex
    users map[int32]*pb.User
}

func newServer() *userServer {
    return &userServer{
        users: make(map[int32]*pb.User),
    }
}

// GetUser 一元 RPC
func (s *userServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
    s.mu.RLock()
    defer s.mu.RUnlock()
    
    user, exists := s.users[req.Id]
    if !exists {
        return nil, status.Errorf(codes.NotFound, "user not found: %d", req.Id)
    }
    
    return user, nil
}

// ListUsers 服务器流 RPC
func (s *userServer) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {
    s.mu.RLock()
    defer s.mu.RUnlock()
    
    start := (req.Page - 1) * req.PageSize
    end := start + req.PageSize
    
    count := int32(0)
    for _, user := range s.users {
        if count >= start && count < end {
            if err := stream.Send(user); err != nil {
                return err
            }
        }
        count++
    }
    
    return nil
}

// CreateUser 客户端流 RPC
func (s *userServer) CreateUser(stream pb.UserService_CreateUserServer) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    
    createdCount := int32(0)
    
    for {
        req, err := stream.Recv()
        if err == io.EOF {
            return stream.SendAndClose(&pb.CreateUserResponse{
                CreatedCount: createdCount,
            })
        }
        if err != nil {
            return err
        }
        
        id := int32(len(s.users) + 1)
        user := &pb.User{
            Id:    id,
            Name:  req.Name,
            Email: req.Email,
            Age:   req.Age,
        }
        
        s.users[id] = user
        createdCount++
        
        log.Printf("Created user: %d", id)
    }
}

// Chat 双向流 RPC
func (s *userServer) Chat(stream pb.UserService_ChatServer) error {
    for {
        msg, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }
        
        log.Printf("Received from %s: %s", msg.Sender, msg.Content)
        
        // Echo 回复
        reply := &pb.ChatMessage{
            Sender:  "Server",
            Content: fmt.Sprintf("Echo: %s", msg.Content),
        }
        
        if err := stream.Send(reply); err != nil {
            return err
        }
    }
}

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        log.Fatalf("Failed to listen: %v", err)
    }
    
    s := grpc.NewServer()
    pb.RegisterUserServiceServer(s, newServer())
    
    log.Println("gRPC server listening on :50051")
    if err := s.Serve(lis); err != nil {
        log.Fatalf("Failed to serve: %v", err)
    }
}

实现客户端

package main

import (
    "context"
    "io"
    "log"
    "time"
    
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    
    pb "example.com/user"
)

func main() {
    conn, err := grpc.Dial("localhost:50051", 
        grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        log.Fatalf("Failed to connect: %v", err)
    }
    defer conn.Close()
    
    client := pb.NewUserServiceClient(conn)
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    
    // 一元 RPC
    log.Println("=== Unary RPC ===")
    user, err := client.GetUser(ctx, &pb.GetUserRequest{Id: 1})
    if err != nil {
        log.Printf("GetUser error: %v", err)
    } else {
        log.Printf("User: %v", user)
    }
    
    // 服务器流 RPC
    log.Println("\n=== Server Streaming RPC ===")
    stream, err := client.ListUsers(ctx, &pb.ListUsersRequest{
        Page:     1,
        PageSize: 10,
    })
    if err != nil {
        log.Fatalf("ListUsers error: %v", err)
    }
    
    for {
        user, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatalf("Stream error: %v", err)
        }
        log.Printf("User: %v", user)
    }
    
    // 客户端流 RPC
    log.Println("\n=== Client Streaming RPC ===")
    stream2, err := client.CreateUser(ctx)
    if err != nil {
        log.Fatalf("CreateUser error: %v", err)
    }
    
    users := []*pb.CreateUserRequest{
        {Name: "Alice", Email: "alice@example.com", Age: 30},
        {Name: "Bob", Email: "bob@example.com", Age: 25},
        {Name: "Charlie", Email: "charlie@example.com", Age: 35},
    }
    
    for _, req := range users {
        if err := stream2.Send(req); err != nil {
            log.Fatalf("Send error: %v", err)
        }
        log.Printf("Sent: %v", req)
    }
    
    resp, err := stream2.CloseAndRecv()
    if err != nil {
        log.Fatalf("CloseAndRecv error: %v", err)
    }
    log.Printf("Created %d users", resp.CreatedCount)
    
    // 双向流 RPC
    log.Println("\n=== Bidirectional Streaming RPC ===")
    stream3, err := client.Chat(ctx)
    if err != nil {
        log.Fatalf("Chat error: %v", err)
    }
    
    // 发送消息
    go func() {
        messages := []string{"Hello", "How are you?", "Goodbye"}
        for _, msg := range messages {
            if err := stream3.Send(&pb.ChatMessage{
                Sender:  "Client",
                Content: msg,
            }); err != nil {
                log.Printf("Send error: %v", err)
                return
            }
            log.Printf("Sent: %s", msg)
            time.Sleep(1 * time.Second)
        }
        stream3.CloseSend()
    }()
    
    // 接收消息
    for {
        reply, err := stream3.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatalf("Recv error: %v", err)
        }
        log.Printf("Received from %s: %s", reply.Sender, reply.Content)
    }
}

拦截器(中间件)

服务端拦截器

// 日志拦截器
func loggingInterceptor(
    ctx context.Context,
    req interface{},
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler,
) (interface{}, error) {
    start := time.Now()
    
    resp, err := handler(ctx, req)
    
    log.Printf("Method: %s, Duration: %v, Error: %v", 
        info.FullMethod, time.Since(start), err)
    
    return resp, err
}

// 认证拦截器
func authInterceptor(
    ctx context.Context,
    req interface{},
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler,
) (interface{}, error) {
    // 从 metadata 获取 token
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return nil, status.Error(codes.Unauthenticated, "missing metadata")
    }
    
    tokens := md["authorization"]
    if len(tokens) == 0 {
        return nil, status.Error(codes.Unauthenticated, "missing token")
    }
    
    // 验证 token
    token := tokens[0]
    if !validateToken(token) {
        return nil, status.Error(codes.Unauthenticated, "invalid token")
    }
    
    return handler(ctx, req)
}

func validateToken(token string) bool {
    return token == "Bearer valid-token"
}

// 使用拦截器
func main() {
    s := grpc.NewServer(
        grpc.UnaryInterceptor(
            grpc_middleware.ChainUnaryServer(
                loggingInterceptor,
                authInterceptor,
            ),
        ),
    )
    // ...
}

客户端拦截器

// 重试拦截器
func retryInterceptor(
    ctx context.Context,
    method string,
    req, reply interface{},
    cc *grpc.ClientConn,
    invoker grpc.UnaryInvoker,
    opts ...grpc.CallOption,
) error {
    maxRetries := 3
    for i := 0; i < maxRetries; i++ {
        err := invoker(ctx, method, req, reply, cc, opts...)
        if err == nil {
            return nil
        }
        
        st, ok := status.FromError(err)
        if !ok || st.Code() != codes.Unavailable {
            return err
        }
        
        log.Printf("Retry %d/%d for %s", i+1, maxRetries, method)
        time.Sleep(time.Duration(i+1) * time.Second)
    }
    
    return status.Error(codes.Unavailable, "max retries exceeded")
}

// 使用拦截器
conn, err := grpc.Dial("localhost:50051",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithUnaryInterceptor(retryInterceptor),
)

错误处理

// 服务端:返回错误
func (s *userServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
    user, exists := s.users[req.Id]
    if !exists {
        return nil, status.Errorf(codes.NotFound, "user %d not found", req.Id)
    }
    return user, nil
}

// 客户端:处理错误
user, err := client.GetUser(ctx, &pb.GetUserRequest{Id: 1})
if err != nil {
    st, ok := status.FromError(err)
    if ok {
        switch st.Code() {
        case codes.NotFound:
            log.Println("User not found")
        case codes.Unauthenticated:
            log.Println("Authentication failed")
        case codes.Unavailable:
            log.Println("Service unavailable")
        default:
            log.Printf("Error: %v", err)
        }
    }
    return
}

总结

gRPC 为微服务通信提供了强大的支持:

  1. 一元 RPC:简单的请求-响应模式
  2. 服务器流:服务器发送多个响应
  3. 客户端流:客户端发送多个请求
  4. 双向流:双方都可以随时发送消息

主要优势:

  • 高性能:基于 HTTP/2 和 Protocol Buffers
  • 强类型:编译时检查
  • 代码生成:减少样板代码
  • 跨语言:支持多种语言

常见应用场景:

  • 微服务间通信
  • 移动端与后端通信
  • 实时数据流
  • 多语言系统集成

记住:gRPC 适合内部服务通信,对于面向外部的 API,REST 可能更合适。

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页