gRPC 基础:构建高性能微服务通信
在微服务架构中,服务间通信是关键。RESTful API 虽然流行,但在性能、类型安全和代码生成方面存在局限。gRPC 作为 Google 开源的高性能 RPC 框架,提供了更优的解决方案。
本文将介绍如何使用 gRPC 和 Protocol Buffers 构建高效的微服务通信。
什么是 gRPC?
gRPC 是一个现代开源的高性能 RPC 框架,具有以下特点:
- 基于 HTTP/2:支持多路复用、头部压缩、双向流
- Protocol Buffers:高效的二进制序列化格式
- 强类型:通过 .proto 文件定义接口
- 代码生成:自动生成客户端和服务端代码
- 跨语言:支持多种编程语言
安装工具
# 安装 Protocol Buffers 编译器
brew install protobuf # macOS
# 或从 https://github.com/protocolbuffers/protobuf/releases 下载
# 安装 Go 插件
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
定义服务(.proto 文件)
syntax = "proto3";
package user;
option go_package = "example.com/user";
// 用户服务
service UserService {
// 一元 RPC
rpc GetUser (GetUserRequest) returns (User);
// 服务器流 RPC
rpc ListUsers (ListUsersRequest) returns (stream User);
// 客户端流 RPC
rpc CreateUser (stream CreateUserRequest) returns (CreateUserResponse);
// 双向流 RPC
rpc Chat (stream ChatMessage) returns (stream ChatMessage);
}
// 用户消息
message User {
int32 id = 1;
string name = 2;
string email = 3;
int32 age = 4;
}
// 请求消息
message GetUserRequest {
int32 id = 1;
}
message ListUsersRequest {
int32 page = 1;
int32 page_size = 2;
}
message CreateUserRequest {
string name = 1;
string email = 2;
int32 age = 3;
}
message CreateUserResponse {
int32 created_count = 1;
}
message ChatMessage {
string sender = 1;
string content = 2;
}
生成代码
protoc --go_out=. --go-grpc_out=. user.proto
这会生成两个文件:
user.pb.go:消息类型定义user_grpc.pb.go:服务接口和客户端
实现服务端
package main
import (
"context"
"fmt"
"io"
"log"
"net"
"sync"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
pb "example.com/user"
)
type userServer struct {
pb.UnimplementedUserServiceServer
mu sync.RWMutex
users map[int32]*pb.User
}
func newServer() *userServer {
return &userServer{
users: make(map[int32]*pb.User),
}
}
// GetUser 一元 RPC
func (s *userServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
s.mu.RLock()
defer s.mu.RUnlock()
user, exists := s.users[req.Id]
if !exists {
return nil, status.Errorf(codes.NotFound, "user not found: %d", req.Id)
}
return user, nil
}
// ListUsers 服务器流 RPC
func (s *userServer) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {
s.mu.RLock()
defer s.mu.RUnlock()
start := (req.Page - 1) * req.PageSize
end := start + req.PageSize
count := int32(0)
for _, user := range s.users {
if count >= start && count < end {
if err := stream.Send(user); err != nil {
return err
}
}
count++
}
return nil
}
// CreateUser 客户端流 RPC
func (s *userServer) CreateUser(stream pb.UserService_CreateUserServer) error {
s.mu.Lock()
defer s.mu.Unlock()
createdCount := int32(0)
for {
req, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.CreateUserResponse{
CreatedCount: createdCount,
})
}
if err != nil {
return err
}
id := int32(len(s.users) + 1)
user := &pb.User{
Id: id,
Name: req.Name,
Email: req.Email,
Age: req.Age,
}
s.users[id] = user
createdCount++
log.Printf("Created user: %d", id)
}
}
// Chat 双向流 RPC
func (s *userServer) Chat(stream pb.UserService_ChatServer) error {
for {
msg, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
log.Printf("Received from %s: %s", msg.Sender, msg.Content)
// Echo 回复
reply := &pb.ChatMessage{
Sender: "Server",
Content: fmt.Sprintf("Echo: %s", msg.Content),
}
if err := stream.Send(reply); err != nil {
return err
}
}
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterUserServiceServer(s, newServer())
log.Println("gRPC server listening on :50051")
if err := s.Serve(lis); err != nil {
log.Fatalf("Failed to serve: %v", err)
}
}
实现客户端
package main
import (
"context"
"io"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "example.com/user"
)
func main() {
conn, err := grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
client := pb.NewUserServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// 一元 RPC
log.Println("=== Unary RPC ===")
user, err := client.GetUser(ctx, &pb.GetUserRequest{Id: 1})
if err != nil {
log.Printf("GetUser error: %v", err)
} else {
log.Printf("User: %v", user)
}
// 服务器流 RPC
log.Println("\n=== Server Streaming RPC ===")
stream, err := client.ListUsers(ctx, &pb.ListUsersRequest{
Page: 1,
PageSize: 10,
})
if err != nil {
log.Fatalf("ListUsers error: %v", err)
}
for {
user, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("Stream error: %v", err)
}
log.Printf("User: %v", user)
}
// 客户端流 RPC
log.Println("\n=== Client Streaming RPC ===")
stream2, err := client.CreateUser(ctx)
if err != nil {
log.Fatalf("CreateUser error: %v", err)
}
users := []*pb.CreateUserRequest{
{Name: "Alice", Email: "alice@example.com", Age: 30},
{Name: "Bob", Email: "bob@example.com", Age: 25},
{Name: "Charlie", Email: "charlie@example.com", Age: 35},
}
for _, req := range users {
if err := stream2.Send(req); err != nil {
log.Fatalf("Send error: %v", err)
}
log.Printf("Sent: %v", req)
}
resp, err := stream2.CloseAndRecv()
if err != nil {
log.Fatalf("CloseAndRecv error: %v", err)
}
log.Printf("Created %d users", resp.CreatedCount)
// 双向流 RPC
log.Println("\n=== Bidirectional Streaming RPC ===")
stream3, err := client.Chat(ctx)
if err != nil {
log.Fatalf("Chat error: %v", err)
}
// 发送消息
go func() {
messages := []string{"Hello", "How are you?", "Goodbye"}
for _, msg := range messages {
if err := stream3.Send(&pb.ChatMessage{
Sender: "Client",
Content: msg,
}); err != nil {
log.Printf("Send error: %v", err)
return
}
log.Printf("Sent: %s", msg)
time.Sleep(1 * time.Second)
}
stream3.CloseSend()
}()
// 接收消息
for {
reply, err := stream3.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("Recv error: %v", err)
}
log.Printf("Received from %s: %s", reply.Sender, reply.Content)
}
}
拦截器(中间件)
服务端拦截器
// 日志拦截器
func loggingInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
start := time.Now()
resp, err := handler(ctx, req)
log.Printf("Method: %s, Duration: %v, Error: %v",
info.FullMethod, time.Since(start), err)
return resp, err
}
// 认证拦截器
func authInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
// 从 metadata 获取 token
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "missing metadata")
}
tokens := md["authorization"]
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "missing token")
}
// 验证 token
token := tokens[0]
if !validateToken(token) {
return nil, status.Error(codes.Unauthenticated, "invalid token")
}
return handler(ctx, req)
}
func validateToken(token string) bool {
return token == "Bearer valid-token"
}
// 使用拦截器
func main() {
s := grpc.NewServer(
grpc.UnaryInterceptor(
grpc_middleware.ChainUnaryServer(
loggingInterceptor,
authInterceptor,
),
),
)
// ...
}
客户端拦截器
// 重试拦截器
func retryInterceptor(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
maxRetries := 3
for i := 0; i < maxRetries; i++ {
err := invoker(ctx, method, req, reply, cc, opts...)
if err == nil {
return nil
}
st, ok := status.FromError(err)
if !ok || st.Code() != codes.Unavailable {
return err
}
log.Printf("Retry %d/%d for %s", i+1, maxRetries, method)
time.Sleep(time.Duration(i+1) * time.Second)
}
return status.Error(codes.Unavailable, "max retries exceeded")
}
// 使用拦截器
conn, err := grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(retryInterceptor),
)
错误处理
// 服务端:返回错误
func (s *userServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
user, exists := s.users[req.Id]
if !exists {
return nil, status.Errorf(codes.NotFound, "user %d not found", req.Id)
}
return user, nil
}
// 客户端:处理错误
user, err := client.GetUser(ctx, &pb.GetUserRequest{Id: 1})
if err != nil {
st, ok := status.FromError(err)
if ok {
switch st.Code() {
case codes.NotFound:
log.Println("User not found")
case codes.Unauthenticated:
log.Println("Authentication failed")
case codes.Unavailable:
log.Println("Service unavailable")
default:
log.Printf("Error: %v", err)
}
}
return
}
总结
gRPC 为微服务通信提供了强大的支持:
- 一元 RPC:简单的请求-响应模式
- 服务器流:服务器发送多个响应
- 客户端流:客户端发送多个请求
- 双向流:双方都可以随时发送消息
主要优势:
- 高性能:基于 HTTP/2 和 Protocol Buffers
- 强类型:编译时检查
- 代码生成:减少样板代码
- 跨语言:支持多种语言
常见应用场景:
- 微服务间通信
- 移动端与后端通信
- 实时数据流
- 多语言系统集成
记住:gRPC 适合内部服务通信,对于面向外部的 API,REST 可能更合适。
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。