Go 与微服务:服务发现与注册实战
想象一下这样的场景:你的电商系统从单体架构拆成了 20 个微服务。用户服务、订单服务、支付服务、库存服务……它们分布在不同的服务器上,IP 地址随时可能变化(容器重启、自动扩缩容)。这时候问题来了:订单服务怎么知道支付服务的地址?
硬编码 IP 地址?别开玩笑了。在云原生时代,服务实例的 IP 是动态的,今天可能是 10.0.1.5,明天容器重启就变成 10.0.2.8。你需要一个"电话簿"——这就是服务发现要解决的问题。
今天,我们深入探讨 Go 微服务中的服务发现与注册,从理论到实战,手把手带你搭建一套完整的服务发现体系。
服务发现的核心概念
在动手写代码之前,我们先搞清楚几个关键概念。
服务注册与发现的工作流程
┌─────────────┐
│ 服务提供者 │──注册──>┌──────────────┐
│ (Order Svc) │ │ │
└─────────────┘ │ 服务注册中心 │<──查询──┌─────────────┐
│ (Consul/etcd)│ │ 服务消费者 │
┌─────────────┐ │ │──返回──>│ (Payment Svc)│
│ 服务提供者 │──注册──>│ │ └─────────────┘
│ (User Svc) │ └──────────────┘
└─────────────┘
整个流程分为三步:
- 服务注册:服务启动时,向注册中心报告自己的地址、端口、版本等信息
- 服务发现:消费者需要调用其他服务时,向注册中心查询目标服务的地址列表
- 健康检查:注册中心定期检查服务是否存活,剔除不健康的实例
两种服务发现模式
客户端发现模式(Client-side Discovery):
消费者 ──查询──> 注册中心 ──返回地址列表──> 消费者 ──负载均衡──> 选择一个实例 ──调用──> 提供者
消费者自己负责查询注册中心、负载均衡、失败重试。Netflix Eureka + Ribbon 就是这种模式。
服务端发现模式(Server-side Discovery):
消费者 ──调用──> 负载均衡器 ──查询──> 注册中心 ──返回地址──> 负载均衡器 ──转发──> 提供者
消费者只需要知道负载均衡器的地址,由负载均衡器负责服务发现和路由。AWS ELB + Route 53 就是这种模式。
今天我们主要讨论客户端发现模式,因为它在 Go 微服务中更常见。
Consul:HashiCorp 的服务发现利器
Consul 是 HashiCorp 公司开源的分布式服务发现工具,它提供了服务注册、健康检查、KV 存储、多数据中心等完整功能。
启动 Consul 开发环境
# 安装 Consul(macOS)
brew install consul
# 启动开发模式(单节点,数据不持久化)
consul agent -dev -ui
# 访问 Web UI
# http://localhost:8500
Go 服务注册到 Consul
让我们写一个用户服务,启动时自动注册到 Consul:
go get github.com/hashicorp/consul/api
package main
import (
"fmt"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"github.com/hashicorp/consul/api"
)
type Service struct {
Name string
ID string
Address string
Port int
Tags []string
}
func (s *Service) Register() error {
config := api.DefaultConfig()
config.Address = "http://localhost:8500"
client, err := api.NewClient(config)
if err != nil {
return fmt.Errorf("failed to create consul client: %w", err)
}
registration := &api.AgentServiceRegistration{
ID: s.ID,
Name: s.Name,
Address: s.Address,
Port: s.Port,
Tags: s.Tags,
Check: &api.AgentServiceCheck{
// HTTP 健康检查
HTTP: fmt.Sprintf("http://%s:%d/health", s.Address, s.Port),
Interval: "10s",
Timeout: "5s",
DeregisterCriticalServiceAfter: "30s", // 30秒后如果还不健康,注销服务
},
}
if err := client.Agent().ServiceRegister(registration); err != nil {
return fmt.Errorf("failed to register service: %w", err)
}
log.Printf("✅ Service %s (%s) registered to Consul", s.Name, s.ID)
return nil
}
func (s *Service) Deregister() error {
config := api.DefaultConfig()
client, err := api.NewClient(config)
if err != nil {
return err
}
if err := client.Agent().ServiceDeregister(s.ID); err != nil {
return fmt.Errorf("failed to deregister service: %w", err)
}
log.Printf("❌ Service %s (%s) deregistered from Consul", s.Name, s.ID)
return nil
}
func main() {
hostname, _ := os.Hostname()
service := &Service{
Name: "user-service",
ID: fmt.Sprintf("user-service-%s", hostname),
Address: "127.0.0.1",
Port: 8081,
Tags: []string{"v1", "production"},
}
// 注册服务
if err := service.Register(); err != nil {
log.Fatal(err)
}
// 优雅关闭时注销服务
go func() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
log.Println("Shutting down...")
service.Deregister()
os.Exit(0)
}()
// 健康检查端点
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"status": "healthy"}`))
})
// 业务端点
http.HandleFunc("/api/users", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"users": [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]}`))
})
log.Printf("User service listening on :%d", service.Port)
if err := http.ListenAndServe(fmt.Sprintf(":%d", service.Port), nil); err != nil {
log.Fatal(err)
}
}
从 Consul 发现服务
现在写一个订单服务,它需要调用用户服务:
package main
import (
"encoding/json"
"fmt"
"io"
"log"
"math/rand"
"net/http"
"time"
"github.com/hashicorp/consul/api"
)
// ServiceDiscovery 服务发现客户端
type ServiceDiscovery struct {
client *api.Client
}
func NewServiceDiscovery() (*ServiceDiscovery, error) {
config := api.DefaultConfig()
client, err := api.NewClient(config)
if err != nil {
return nil, err
}
return &ServiceDiscovery{client: client}, nil
}
// Discover 发现服务实例
func (sd *ServiceDiscovery) Discover(serviceName string) ([]*api.ServiceEntry, error) {
// 只返回健康的服务实例
entries, _, err := sd.client.Health().Service(serviceName, "", true, nil)
if err != nil {
return nil, fmt.Errorf("failed to discover service %s: %w", serviceName, err)
}
if len(entries) == 0 {
return nil, fmt.Errorf("no healthy instances found for service %s", serviceName)
}
return entries, nil
}
// GetServiceAddress 获取一个服务地址(随机负载均衡)
func (sd *ServiceDiscovery) GetServiceAddress(serviceName string) (string, error) {
entries, err := sd.Discover(serviceName)
if err != nil {
return "", err
}
// 简单的随机负载均衡
entry := entries[rand.Intn(len(entries))]
address := fmt.Sprintf("http://%s:%d", entry.Service.Address, entry.Service.Port)
return address, nil
}
func main() {
sd, err := NewServiceDiscovery()
if err != nil {
log.Fatal(err)
}
http.HandleFunc("/api/orders", func(w http.ResponseWriter, r *http.Request) {
// 调用用户服务
userSvcAddr, err := sd.GetServiceAddress("user-service")
if err != nil {
http.Error(w, "User service unavailable", http.StatusServiceUnavailable)
return
}
resp, err := http.Get(userSvcAddr + "/api/users")
if err != nil {
http.Error(w, "Failed to call user service", http.StatusInternalServerError)
return
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(fmt.Sprintf(`{"orders": [...], "users": %s}`, body)))
})
log.Println("Order service listening on :8082")
http.ListenAndServe(":8082", nil)
}
etcd:Kubernetes 的选择
etcd 是 CoreOS 开发的分布式键值存储,Kubernetes 用它来存储集群状态。etcd 也可以作为服务注册中心,它的 Watch 机制非常适合服务发现。
启动 etcd
# 安装 etcd(macOS)
brew install etcd
# 启动单节点
etcd
# 默认监听 http://localhost:2379
使用 etcd 实现服务注册
go get go.etcd.io/etcd/client/v3
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
type ServiceInfo struct {
Name string `json:"name"`
Address string `json:"address"`
Port int `json:"port"`
Version string `json:"version"`
}
type EtcdRegistry struct {
client *clientv3.Client
ttl int64 // 租约 TTL(秒)
}
func NewEtcdRegistry(endpoints []string, ttl int64) (*EtcdRegistry, error) {
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, err
}
return &EtcdRegistry{client: client, ttl: ttl}, nil
}
// Register 注册服务并维护心跳
func (r *EtcdRegistry) Register(ctx context.Context, info ServiceInfo) error {
// 创建租约
leaseResp, err := r.client.Grant(ctx, r.ttl)
if err != nil {
return fmt.Errorf("failed to create lease: %w", err)
}
// 序列化服务信息
data, _ := json.Marshal(info)
key := fmt.Sprintf("/services/%s/%s:%d", info.Name, info.Address, info.Port)
// 将服务信息绑定到租约
_, err = r.client.Put(ctx, key, string(data), clientv3.WithLease(leaseResp.ID))
if err != nil {
return fmt.Errorf("failed to register service: %w", err)
}
log.Printf("✅ Service registered: %s", key)
// 启动协程维护租约(心跳)
go func() {
ch, err := r.client.KeepAlive(ctx, leaseResp.ID)
if err != nil {
log.Printf("Failed to keep alive: %v", err)
return
}
for {
select {
case <-ctx.Done():
return
case _, ok := <-ch:
if !ok {
log.Println("Lease expired, service deregistered")
return
}
}
}
}()
return nil
}
// Discover 发现服务
func (r *EtcdRegistry) Discover(ctx context.Context, serviceName string) ([]ServiceInfo, error) {
prefix := fmt.Sprintf("/services/%s/", serviceName)
resp, err := r.client.Get(ctx, prefix, clientv3.WithPrefix())
if err != nil {
return nil, err
}
var services []ServiceInfo
for _, kv := range resp.Kvs {
var info ServiceInfo
if err := json.Unmarshal(kv.Value, &info); err != nil {
continue
}
services = append(services, info)
}
return services, nil
}
// Watch 监听服务变化
func (r *EtcdRegistry) Watch(ctx context.Context, serviceName string) {
prefix := fmt.Sprintf("/services/%s/", serviceName)
watchCh := r.client.Watch(ctx, prefix, clientv3.WithPrefix())
for {
select {
case <-ctx.Done():
return
case resp := <-watchCh:
for _, event := range resp.Events {
switch event.Type {
case clientv3.EventTypePut:
log.Printf("📝 Service updated: %s", event.Kv.Key)
case clientv3.EventTypeDelete:
log.Printf("❌ Service removed: %s", event.Kv.Key)
}
}
}
}
}
func main() {
ctx := context.Background()
registry, err := NewEtcdRegistry([]string{"localhost:2379"}, 10)
if err != nil {
log.Fatal(err)
}
defer registry.client.Close()
// 注册服务
err = registry.Register(ctx, ServiceInfo{
Name: "payment-service",
Address: "192.168.1.100",
Port: 8083,
Version: "v1.2.0",
})
if err != nil {
log.Fatal(err)
}
// 监听服务变化
go registry.Watch(ctx, "user-service")
// 定期发现服务
ticker := time.NewTicker(5 * time.Second)
for {
select {
case <-ticker.C:
services, err := registry.Discover(ctx, "user-service")
if err != nil {
log.Printf("Discovery failed: %v", err)
continue
}
log.Printf("Found %d instances of user-service", len(services))
for _, svc := range services {
log.Printf(" - %s:%d (version: %s)", svc.Address, svc.Port, svc.Version)
}
}
}
}
Nacos:阿里巴巴的服务发现与配置管理
Nacos 是阿里巴巴开源的动态服务发现、配置管理和服务管理平台。它在 Spring Cloud Alibaba 生态中非常流行,也支持 Go 语言。
使用 Nacos SDK
go get github.com/nacos-group/nacos-sdk-go/v2
package main
import (
"fmt"
"log"
"time"
"github.com/nacos-group/nacos-sdk-go/v2/clients"
"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
)
func main() {
// Nacos 服务器配置
sc := []constant.ServerConfig{
*constant.NewServerConfig("127.0.0.1", 8848),
}
// 客户端配置
cc := *constant.NewClientConfig(
constant.WithNamespaceId("public"),
constant.WithTimeoutMs(5000),
constant.WithNotLoadCacheAtStart(true),
constant.WithLogLevel("debug"),
)
// 创建命名客户端
namingClient, err := clients.NewNamingClient(
vo.NacosClientParam{
ClientConfig: &cc,
ServerConfigs: sc,
},
)
if err != nil {
log.Fatal(err)
}
// 注册服务实例
success, err := namingClient.RegisterInstance(vo.RegisterInstanceParam{
Ip: "192.168.1.101",
Port: 8084,
ServiceName: "inventory-service",
Weight: 10,
Enable: true,
Healthy: true,
Ephemeral: true, // 临时实例
Metadata: map[string]string{"version": "v2.0"},
})
if err != nil || !success {
log.Fatalf("Failed to register: %v", err)
}
log.Println("✅ Service registered to Nacos")
// 订阅服务变化
err = namingClient.Subscribe(&vo.SubscribeParam{
ServiceName: "user-service",
SubscribeCallback: func(services []model.Instance, err error) {
if err != nil {
log.Printf("Subscribe error: %v", err)
return
}
log.Printf("Service instances changed, now %d instances:", len(services))
for _, inst := range services {
log.Printf(" - %s:%d (healthy: %v, weight: %d)",
inst.Ip, inst.Port, inst.Healthy, inst.Weight)
}
},
})
// 发现服务
time.Sleep(2 * time.Second)
instances, err := namingClient.SelectInstances(vo.SelectInstancesParam{
ServiceName: "user-service",
HealthyOnly: true,
})
if err != nil {
log.Printf("Discovery failed: %v", err)
} else {
log.Printf("Found %d healthy instances", len(instances))
}
// 保持运行
select {}
}
健康检查:确保服务可用
健康检查是服务发现的核心环节。一个服务可能进程还在,但数据库连接池耗尽、依赖服务不可用,这时候它应该被标记为不健康。
实现多层次健康检查
package health
import (
"context"
"database/sql"
"net/http"
"sync"
"time"
)
type Status string
const (
StatusHealthy Status = "healthy"
StatusDegraded Status = "degraded"
StatusUnhealthy Status = "unhealthy"
)
type CheckResult struct {
Name string `json:"name"`
Status Status `json:"status"`
Message string `json:"message,omitempty"`
Latency time.Duration `json:"latency"`
}
type Checker struct {
checks []HealthCheck
mu sync.RWMutex
}
type HealthCheck interface {
Name() string
Check(ctx context.Context) CheckResult
}
func NewChecker() *Checker {
return &Checker{}
}
func (c *Checker) Add(check HealthCheck) {
c.mu.Lock()
defer c.mu.Unlock()
c.checks = append(c.checks, check)
}
func (c *Checker) CheckAll(ctx context.Context) []CheckResult {
c.mu.RLock()
defer c.mu.RUnlock()
var wg sync.WaitGroup
results := make([]CheckResult, len(c.checks))
for i, check := range c.checks {
wg.Add(1)
go func(idx int, hc HealthCheck) {
defer wg.Done()
results[idx] = hc.Check(ctx)
}(i, check)
}
wg.Wait()
return results
}
// DatabaseCheck 数据库健康检查
type DatabaseCheck struct {
db *sql.DB
}
func (d *DatabaseCheck) Name() string {
return "database"
}
func (d *DatabaseCheck) Check(ctx context.Context) CheckResult {
start := time.Now()
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
if err := d.db.PingContext(ctx); err != nil {
return CheckResult{
Name: d.Name(),
Status: StatusUnhealthy,
Message: err.Error(),
Latency: time.Since(start),
}
}
return CheckResult{
Name: d.Name(),
Status: StatusHealthy,
Latency: time.Since(start),
}
}
// HTTPCheck HTTP 依赖健康检查
type HTTPCheck struct {
name string
url string
}
func (h *HTTPCheck) Name() string {
return h.name
}
func (h *HTTPCheck) Check(ctx context.Context) CheckResult {
start := time.Now()
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
req, _ := http.NewRequestWithContext(ctx, "GET", h.url, nil)
resp, err := http.DefaultClient.Do(req)
latency := time.Since(start)
if err != nil {
return CheckResult{
Name: h.Name(),
Status: StatusUnhealthy,
Message: err.Error(),
Latency: latency,
}
}
defer resp.Body.Close()
if resp.StatusCode >= 500 {
return CheckResult{
Name: h.Name(),
Status: StatusUnhealthy,
Message: "5xx error",
Latency: latency,
}
}
return CheckResult{
Name: h.Name(),
Status: StatusHealthy,
Latency: latency,
}
}
使用这个健康检查框架:
func main() {
db, _ := sql.Open("postgres", "postgres://...")
checker := health.NewChecker()
checker.Add(&health.DatabaseCheck{db: db})
checker.Add(&health.HTTPCheck{name: "user-service", url: "http://user-svc:8080/health"})
checker.Add(&health.HTTPCheck{name: "payment-service", url: "http://payment-svc:8080/health"})
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
results := checker.CheckAll(r.Context())
overallStatus := health.StatusHealthy
for _, result := range results {
if result.Status == health.StatusUnhealthy {
overallStatus = health.StatusUnhealthy
break
}
}
w.Header().Set("Content-Type", "application/json")
if overallStatus == health.StatusUnhealthy {
w.WriteHeader(http.StatusServiceUnavailable)
}
json.NewEncoder(w).Encode(map[string]interface{}{
"status": overallStatus,
"checks": results,
})
})
http.ListenAndServe(":8080", nil)
}
负载均衡:选择合适的策略
服务发现返回了多个实例,该调用哪一个?这就需要负载均衡策略。
package balancer
import (
"errors"
"hash/crc32"
"sync/atomic"
)
type Instance struct {
Address string
Weight int
}
type LoadBalancer interface {
Select(instances []Instance, key string) (Instance, error)
}
// RoundRobinBalancer 轮询
type RoundRobinBalancer struct {
counter uint64
}
func (r *RoundRobinBalancer) Select(instances []Instance, _ string) (Instance, error) {
if len(instances) == 0 {
return Instance{}, errors.New("no instances available")
}
idx := atomic.AddUint64(&r.counter, 1) - 1
return instances[idx%uint64(len(instances))], nil
}
// WeightedRoundRobinBalancer 加权轮询
type WeightedRoundRobinBalancer struct {
counter uint64
}
func (w *WeightedRoundRobinBalancer) Select(instances []Instance, _ string) (Instance, error) {
if len(instances) == 0 {
return Instance{}, errors.New("no instances available")
}
totalWeight := 0
for _, inst := range instances {
totalWeight += inst.Weight
}
idx := atomic.AddUint64(&w.counter, 1) - 1
pos := idx % uint64(totalWeight)
currentWeight := 0
for _, inst := range instances {
currentWeight += inst.Weight
if pos < uint64(currentWeight) {
return inst, nil
}
}
return instances[0], nil
}
// ConsistentHashBalancer 一致性哈希
type ConsistentHashBalancer struct{}
func (c *ConsistentHashBalancer) Select(instances []Instance, key string) (Instance, error) {
if len(instances) == 0 {
return Instance{}, errors.New("no instances available")
}
hash := crc32.ChecksumIEEE([]byte(key))
idx := hash % uint32(len(instances))
return instances[idx], nil
}
熔断器:防止雪崩效应
当某个下游服务不可用时,如果没有保护机制,调用方会不断重试,最终耗尽资源导致级联故障。熔断器就是用来防止这种雪崩效应的。
package circuitbreaker
import (
"errors"
"sync"
"time"
)
type State int
const (
StateClosed State = iota // 关闭状态:正常放行
StateOpen // 开启状态:快速失败
StateHalfOpen // 半开状态:尝试恢复
)
type CircuitBreaker struct {
mu sync.Mutex
state State
failureCount int
successCount int
lastFailureTime time.Time
failureThreshold int // 失败阈值
successThreshold int // 半开状态成功阈值
timeout time.Duration // 从开启到半开的超时时间
}
func NewCircuitBreaker(failureThreshold, successThreshold int, timeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
state: StateClosed,
failureThreshold: failureThreshold,
successThreshold: successThreshold,
timeout: timeout,
}
}
var ErrCircuitOpen = errors.New("circuit breaker is open")
func (cb *CircuitBreaker) Execute(fn func() error) error {
cb.mu.Lock()
state := cb.state
// 状态转换逻辑
if state == StateOpen {
if time.Since(cb.lastFailureTime) > cb.timeout {
cb.state = StateHalfOpen
cb.successCount = 0
state = StateHalfOpen
} else {
cb.mu.Unlock()
return ErrCircuitOpen
}
}
cb.mu.Unlock()
// 执行实际调用
err := fn()
cb.mu.Lock()
defer cb.mu.Unlock()
if err != nil {
cb.failureCount++
cb.lastFailureTime = time.Now()
if state == StateHalfOpen || cb.failureCount >= cb.failureThreshold {
cb.state = StateOpen
}
return err
}
// 成功
if state == StateHalfOpen {
cb.successCount++
if cb.successCount >= cb.successThreshold {
cb.state = StateClosed
cb.failureCount = 0
}
} else {
cb.failureCount = 0
}
return nil
}
使用熔断器:
func main() {
// 5次失败后熔断,半开状态需要3次成功才能恢复,超时时间30秒
breaker := circuitbreaker.NewCircuitBreaker(5, 3, 30*time.Second)
http.HandleFunc("/api/pay", func(w http.ResponseWriter, r *http.Request) {
err := breaker.Execute(func() error {
// 调用支付服务
resp, err := http.Get("http://payment-service:8083/pay")
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 500 {
return errors.New("payment service error")
}
return nil
})
if err != nil {
if errors.Is(err, circuitbreaker.ErrCircuitOpen) {
http.Error(w, "Payment service temporarily unavailable", http.StatusServiceUnavailable)
} else {
http.Error(w, "Payment failed", http.StatusInternalServerError)
}
return
}
w.Write([]byte("Payment successful"))
})
http.ListenAndServe(":8082", nil)
}
服务网格:下一代服务发现
传统的客户端服务发现需要在每个服务中集成 SDK。服务网格(Service Mesh)将服务发现、负载均衡、熔断等功能下沉到基础设施层,应用代码完全无感知。
Istio + Envoy 是最流行的服务网格方案。在 Kubernetes 中,你只需要声明 Service,剩下的交给网格:
# user-service.yaml
apiVersion: v1
kind: Service
metadata:
name: user-service
spec:
selector:
app: user-service
ports:
- port: 80
targetPort: 8080
Go 服务只需要监听端口,不需要任何服务发现代码。Envoy sidecar 会自动处理服务发现、负载均衡、重试、熔断。
总结
今天我们深入探讨了 Go 微服务中的服务发现与注册:
- 服务发现概念:注册、发现、健康检查的完整流程
- Consul:HashiCorp 的成熟方案,功能全面
- etcd:Kubernetes 的选择,Watch 机制强大
- Nacos:阿里巴巴的方案,配置管理一体化
- 健康检查:多层次检查确保服务真正可用
- 负载均衡:轮询、加权轮询、一致性哈希等策略
- 熔断器:防止级联故障的保护机制
- 服务网格:将服务发现下沉到基础设施层
选择哪种方案取决于你的技术栈和规模。小团队可以从 Consul 开始,Kubernetes 环境用 etcd + Service,大规模系统考虑服务网格。
无论选择哪种,核心思想不变:服务实例动态变化,需要一个"电话簿"来管理它们。
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。