gRPC:构建高性能微服务
在微服务架构中,服务之间的通信至关重要。传统的 RESTful API 虽然简单易用,但在性能、类型安全和代码生成方面存在不足。gRPC 是 Google 开源的高性能 RPC 框架,它使用 Protocol Buffers 作为接口定义语言和序列化格式,为微服务通信提供了更好的解决方案。
今天我们就来学习如何在 Go 中使用 gRPC。
什么是 gRPC?
gRPC 是一个现代的、开源的、高性能的远程过程调用(RPC)框架。它的核心特性包括:
- Protocol Buffers:高效的二进制序列化格式
- HTTP/2:支持多路复用、流控制、头部压缩
- 强类型:通过 .proto 文件定义接口,自动生成代码
- 双向流:支持客户端流、服务器流、双向流
- 跨语言:支持 Go、Java、Python、C++ 等多种语言
安装 gRPC 工具
# 安装 Go gRPC 插件
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
# 安装 Protocol Buffers 编译器
# macOS
brew install protobuf
# Ubuntu
sudo apt install protobuf-compiler
# 验证安装
protoc --version
定义服务接口
创建一个 user.proto 文件:
syntax = "proto3";
package user;
option go_package = "./user";
// 用户服务
service UserService {
// 获取用户
rpc GetUser (GetUserRequest) returns (User);
// 创建用户
rpc CreateUser (CreateUserRequest) returns (User);
// 列出用户
rpc ListUsers (ListUsersRequest) returns (stream User);
// 更新用户信息(双向流)
rpc UpdateUsers (stream UpdateUserRequest) returns (stream User);
}
// 用户消息
message User {
int64 id = 1;
string name = 2;
string email = 3;
int32 age = 4;
int64 created_at = 5;
}
// 请求消息
message GetUserRequest {
int64 id = 1;
}
message CreateUserRequest {
string name = 1;
string email = 2;
int32 age = 3;
}
message ListUsersRequest {
int32 page = 1;
int32 page_size = 2;
}
message UpdateUserRequest {
int64 id = 1;
string name = 2;
string email = 3;
int32 age = 4;
}
生成 Go 代码
protoc --go_out=. --go-grpc_out=. user.proto
这会生成两个文件:
user.pb.go:Protocol Buffers 消息定义user_grpc.pb.go:gRPC 服务代码
实现 gRPC 服务器
package main
import (
"context"
"log"
"net"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
pb "example/user"
)
type User struct {
ID int64
Name string
Email string
Age int32
CreatedAt int64
}
type UserServiceServer struct {
pb.UnimplementedUserServiceServer
mu sync.RWMutex
users map[int64]*User
nextID int64
}
func NewUserServiceServer() *UserServiceServer {
return &UserServiceServer{
users: make(map[int64]*User),
nextID: 1,
}
}
// GetUser 实现获取用户
func (s *UserServiceServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
s.mu.RLock()
defer s.mu.RUnlock()
user, ok := s.users[req.Id]
if !ok {
return nil, status.Errorf(codes.NotFound, "用户 %d 不存在", req.Id)
}
return &pb.User{
Id: user.ID,
Name: user.Name,
Email: user.Email,
Age: user.Age,
CreatedAt: user.CreatedAt,
}, nil
}
// CreateUser 实现创建用户
func (s *UserServiceServer) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.User, error) {
s.mu.Lock()
defer s.mu.Unlock()
user := &User{
ID: s.nextID,
Name: req.Name,
Email: req.Email,
Age: req.Age,
CreatedAt: time.Now().Unix(),
}
s.users[user.ID] = user
s.nextID++
log.Printf("创建用户: %d (%s)", user.ID, user.Name)
return &pb.User{
Id: user.ID,
Name: user.Name,
Email: user.Email,
Age: user.Age,
CreatedAt: user.CreatedAt,
}, nil
}
// ListUsers 实现列出用户(服务器流)
func (s *UserServiceServer) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {
s.mu.RLock()
defer s.mu.RUnlock()
count := 0
for _, user := range s.users {
if count >= int(req.PageSize) {
break
}
err := stream.Send(&pb.User{
Id: user.ID,
Name: user.Name,
Email: user.Email,
Age: user.Age,
CreatedAt: user.CreatedAt,
})
if err != nil {
return err
}
count++
}
return nil
}
// UpdateUsers 实现批量更新(双向流)
func (s *UserServiceServer) UpdateUsers(stream pb.UserService_UpdateUsersServer) error {
for {
req, err := stream.Recv()
if err != nil {
return err
}
s.mu.Lock()
user, ok := s.users[req.Id]
if !ok {
s.mu.Unlock()
stream.Send(&pb.User{})
continue
}
// 更新字段
if req.Name != "" {
user.Name = req.Name
}
if req.Email != "" {
user.Email = req.Email
}
if req.Age > 0 {
user.Age = req.Age
}
s.mu.Unlock()
// 发送更新后的用户
err = stream.Send(&pb.User{
Id: user.ID,
Name: user.Name,
Email: user.Email,
Age: user.Age,
CreatedAt: user.CreatedAt,
})
if err != nil {
return err
}
}
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("监听失败: %v", err)
}
s := grpc.NewServer()
pb.RegisterUserServiceServer(s, NewUserServiceServer())
log.Println("gRPC 服务器启动在 :50051")
if err := s.Serve(lis); err != nil {
log.Fatalf("服务失败: %v", err)
}
}
实现 gRPC 客户端
package main
import (
"context"
"io"
"log"
"time"
"google.golang.org/grpc"
pb "example/user"
)
func main() {
// 连接服务器
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()
client := pb.NewUserServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// 1. 创建用户
log.Println("=== 创建用户 ===")
user1, err := client.CreateUser(ctx, &pb.CreateUserRequest{
Name: "张三",
Email: "zhangsan@example.com",
Age: 25,
})
if err != nil {
log.Fatalf("创建用户失败: %v", err)
}
log.Printf("创建成功: ID=%d, Name=%s", user1.Id, user1.Name)
user2, err := client.CreateUser(ctx, &pb.CreateUserRequest{
Name: "李四",
Email: "lisi@example.com",
Age: 30,
})
if err != nil {
log.Fatalf("创建用户失败: %v", err)
}
log.Printf("创建成功: ID=%d, Name=%s", user2.Id, user2.Name)
// 2. 获取用户
log.Println("\n=== 获取用户 ===")
user, err := client.GetUser(ctx, &pb.GetUserRequest{Id: user1.Id})
if err != nil {
log.Fatalf("获取用户失败: %v", err)
}
log.Printf("用户信息: %+v", user)
// 3. 列出用户(服务器流)
log.Println("\n=== 列出用户 ===")
stream, err := client.ListUsers(ctx, &pb.ListUsersRequest{
Page: 1,
PageSize: 10,
})
if err != nil {
log.Fatalf("列出用户失败: %v", err)
}
for {
user, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("接收失败: %v", err)
}
log.Printf("用户: ID=%d, Name=%s", user.Id, user.Name)
}
// 4. 批量更新(双向流)
log.Println("\n=== 批量更新 ===")
updateStream, err := client.UpdateUsers(ctx)
if err != nil {
log.Fatalf("更新用户失败: %v", err)
}
// 发送更新请求
updates := []*pb.UpdateUserRequest{
{Id: user1.Id, Name: "张三(已更新)", Age: 26},
{Id: user2.Id, Email: "lisi_new@example.com"},
}
for _, update := range updates {
err := updateStream.Send(update)
if err != nil {
log.Fatalf("发送更新失败: %v", err)
}
// 接收更新后的用户
user, err := updateStream.Recv()
if err != nil {
log.Fatalf("接收更新失败: %v", err)
}
log.Printf("更新成功: ID=%d, Name=%s, Email=%s", user.Id, user.Name, user.Email)
}
updateStream.CloseSend()
}
拦截器(中间件)
gRPC 支持拦截器,类似于 HTTP 的中间件:
// 服务器拦截器
func loggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
start := time.Now()
log.Printf("开始处理: %s", info.FullMethod)
resp, err := handler(ctx, req)
duration := time.Since(start)
log.Printf("完成处理: %s (%v)", info.FullMethod, duration)
return resp, err
}
// 客户端拦截器
func clientLoggingInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
start := time.Now()
log.Printf("客户端调用: %s", method)
err := invoker(ctx, method, req, reply, cc, opts...)
duration := time.Since(start)
log.Printf("客户端完成: %s (%v)", method, duration)
return err
}
// 使用拦截器
func main() {
// 服务器
s := grpc.NewServer(
grpc.UnaryInterceptor(loggingInterceptor),
)
// 客户端
conn, err := grpc.Dial(
"localhost:50051",
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(clientLoggingInterceptor),
)
}
错误处理
gRPC 使用状态码来表示错误:
import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// 服务器端返回错误
func (s *Server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
user, ok := s.users[req.Id]
if !ok {
return nil, status.Errorf(codes.NotFound, "用户 %d 不存在", req.Id)
}
return user, nil
}
// 客户端处理错误
user, err := client.GetUser(ctx, &pb.GetUserRequest{Id: 123})
if err != nil {
st, ok := status.FromError(err)
if ok {
switch st.Code() {
case codes.NotFound:
log.Println("用户不存在")
case codes.InvalidArgument:
log.Println("参数错误")
case codes.Unauthenticated:
log.Println("未认证")
default:
log.Printf("错误: %v", err)
}
}
}
认证和授权
// 服务器拦截器:验证 Token
func authInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// 从 metadata 获取 token
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "缺少 metadata")
}
tokens := md.Get("authorization")
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "缺少 token")
}
// 验证 token
token := tokens[0]
userID, err := validateToken(token)
if err != nil {
return nil, status.Error(codes.Unauthenticated, "无效的 token")
}
// 将用户信息放入 context
ctx = context.WithValue(ctx, "userID", userID)
return handler(ctx, req)
}
// 客户端:发送 Token
type tokenAuth struct {
token string
}
func (t *tokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"authorization": t.token,
}, nil
}
func (t *tokenAuth) RequireTransportSecurity() bool {
return false
}
// 使用
conn, err := grpc.Dial(
"localhost:50051",
grpc.WithInsecure(),
grpc.WithPerRPCCredentials(&tokenAuth{token: "your-token"}),
)
小结
今天我们学习了 gRPC:
- 基础概念:gRPC vs REST,Protocol Buffers
- 服务定义:编写 .proto 文件,生成代码
- 服务器实现:Unary RPC、服务器流、双向流
- 客户端调用:同步调用、流式调用
- 拦截器:日志、认证等中间件
- 错误处理:状态码和错误信息
gRPC 为微服务通信提供了高性能、强类型的解决方案。在构建复杂的分布式系统时,gRPC 是一个值得考虑的选择。
练习时间
- 实现一个 gRPC 网关,将 gRPC 服务暴露为 REST API
- 创建一个带有认证和授权的 gRPC 服务
- 实现 gRPC 服务的服务发现和负载均衡
- 使用 gRPC 的反射功能实现动态客户端
我们下篇见!
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。