引言
在微服务架构中,服务实例的动态变化使得服务发现成为关键基础设施。本文将深入探讨服务发现的核心机制、实现模式以及健康检查的最佳实践。
服务发现模式
客户端发现模式
┌──────────────┐
│ Service A │
└──────┬───────┘
│ 1. 查询注册中心
▼
┌──────────────┐
│ Registry │
└──────┬───────┘
│ 2. 返回服务实例列表
▼
┌──────────────┐ ┌──────────────┐
│ Service A │─────▶│ Service B │
│ (负载均衡) │ │ (实例2) │
└──────────────┘ └──────────────┘
客户端负责:
- 查询注册中心获取可用实例
- 实现负载均衡算法
- 处理实例故障和重试
服务端发现模式
┌──────────────┐
│ Service A │
└──────┬───────┘
│ 请求
▼
┌──────────────┐ ┌──────────────┐
│ Load │─────▶│ Registry │
│ Balancer │◀─────│ │
└──────┬───────┘ └──────────────┘
│
▼
┌──────────────┐
│ Service B │
└──────────────┘
负载均衡器负责:
- 接收所有服务请求
- 查询注册中心
- 路由到健康的实例
Consul实现方案
服务注册
// registry/consul.go
package registry
import (
"fmt"
"log"
"github.com/hashicorp/consul/api"
)
type ConsulRegistry struct {
client *api.Client
}
func NewConsulRegistry(address string) (*ConsulRegistry, error) {
config := api.DefaultConfig()
config.Address = address
client, err := api.NewClient(config)
if err != nil {
return nil, fmt.Errorf("failed to create Consul client: %w", err)
}
return &ConsulRegistry{client: client}, nil
}
type ServiceRegistration struct {
ID string
Name string
Address string
Port int
Tags []string
Meta map[string]string
}
func (r *ConsulRegistry) Register(service ServiceRegistration) error {
registration := &api.AgentServiceRegistration{
ID: service.ID,
Name: service.Name,
Address: service.Address,
Port: service.Port,
Tags: service.Tags,
Meta: service.Meta,
Check: &api.AgentServiceCheck{
HTTP: fmt.Sprintf("http://%s:%d/health", service.Address, service.Port),
Interval: "10s",
Timeout: "5s",
DeregisterCriticalServiceAfter: "30s",
},
}
err := r.client.Agent().ServiceRegister(registration)
if err != nil {
return fmt.Errorf("failed to register service: %w", err)
}
log.Printf("Service registered: %s (%s:%d)", service.Name, service.Address, service.Port)
return nil
}
func (r *ConsulRegistry) Deregister(serviceID string) error {
err := r.client.Agent().ServiceDeregister(serviceID)
if err != nil {
return fmt.Errorf("failed to deregister service: %w", err)
}
log.Printf("Service deregistered: %s", serviceID)
return nil
}
func (r *ConsulRegistry) Discover(serviceName string) ([]*api.ServiceEntry, error) {
entries, _, err := r.client.Health().Service(serviceName, "", true, nil)
if err != nil {
return nil, fmt.Errorf("failed to discover service: %w", err)
}
return entries, nil
}
客户端负载均衡
// discovery/client.go
package discovery
import (
"context"
"fmt"
"math/rand"
"net/http"
"sync"
"time"
"github.com/hashicorp/consul/api"
)
type ServiceInstance struct {
ID string
Address string
Port int
}
type ServiceDiscovery struct {
registry *ConsulRegistry
cache map[string][]ServiceInstance
cacheExpiry map[string]time.Time
cacheTTL time.Duration
mu sync.RWMutex
}
func NewServiceDiscovery(registry *ConsulRegistry) *ServiceDiscovery {
return &ServiceDiscovery{
registry: registry,
cache: make(map[string][]ServiceInstance),
cacheExpiry: make(map[string]time.Time),
cacheTTL: 30 * time.Second,
}
}
func (sd *ServiceDiscovery) GetInstances(serviceName string) ([]ServiceInstance, error) {
sd.mu.RLock()
instances, cached := sd.cache[serviceName]
expiry, hasExpiry := sd.cacheExpiry[serviceName]
sd.mu.RUnlock()
// 检查缓存是否有效
if cached && hasExpiry && time.Now().Before(expiry) {
return instances, nil
}
// 从注册中心获取
entries, err := sd.registry.Discover(serviceName)
if err != nil {
// 如果有缓存,返回旧数据
if cached {
log.Printf("Using stale cache for service %s", serviceName)
return instances, nil
}
return nil, err
}
// 转换为实例列表
instances = make([]ServiceInstance, len(entries))
for i, entry := range entries {
instances[i] = ServiceInstance{
ID: entry.Service.ID,
Address: entry.Service.Address,
Port: entry.Service.Port,
}
}
// 更新缓存
sd.mu.Lock()
sd.cache[serviceName] = instances
sd.cacheExpiry[serviceName] = time.Now().Add(sd.cacheTTL)
sd.mu.Unlock()
return instances, nil
}
// 负载均衡策略
type LoadBalancer interface {
Select(instances []ServiceInstance) (*ServiceInstance, error)
}
type RandomLoadBalancer struct{}
func (lb *RandomLoadBalancer) Select(instances []ServiceInstance) (*ServiceInstance, error) {
if len(instances) == 0 {
return nil, fmt.Errorf("no instances available")
}
idx := rand.Intn(len(instances))
return &instances[idx], nil
}
type RoundRobinLoadBalancer struct {
counter uint64
mu sync.Mutex
}
func (lb *RoundRobinLoadBalancer) Select(instances []ServiceInstance) (*ServiceInstance, error) {
if len(instances) == 0 {
return nil, fmt.Errorf("no instances available")
}
lb.mu.Lock()
idx := lb.counter % uint64(len(instances))
lb.counter++
lb.mu.Unlock()
return &instances[idx], nil
}
// HTTP客户端封装
type DiscoveryHTTPClient struct {
discovery *ServiceDiscovery
loadBalancer LoadBalancer
httpClient *http.Client
}
func NewDiscoveryHTTPClient(discovery *ServiceDiscovery, lb LoadBalancer) *DiscoveryHTTPClient {
return &DiscoveryHTTPClient{
discovery: discovery,
loadBalancer: lb,
httpClient: &http.Client{
Timeout: 30 * time.Second,
},
}
}
func (c *DiscoveryHTTPClient) Do(ctx context.Context, serviceName string, req *http.Request) (*http.Response, error) {
instances, err := c.discovery.GetInstances(serviceName)
if err != nil {
return nil, fmt.Errorf("service discovery failed: %w", err)
}
// 尝试多个实例
var lastErr error
for attempt := 0; attempt < 3; attempt++ {
instance, err := c.loadBalancer.Select(instances)
if err != nil {
return nil, err
}
// 修改请求URL
req.URL.Scheme = "http"
req.URL.Host = fmt.Sprintf("%s:%d", instance.Address, instance.Port)
resp, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
lastErr = err
log.Printf("Request to %s failed: %v", instance.ID, err)
continue
}
// 检查是否需要重试
if resp.StatusCode >= 500 && attempt < 2 {
resp.Body.Close()
lastErr = fmt.Errorf("server error: %d", resp.StatusCode)
continue
}
return resp, nil
}
return nil, fmt.Errorf("all retries failed: %w", lastErr)
}
服务自注册
// main.go
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"github.com/google/uuid"
)
type Application struct {
registry *ConsulRegistry
serviceID string
serviceName string
address string
port int
}
func NewApplication(name, address string, port int, consulAddr string) (*Application, error) {
registry, err := NewConsulRegistry(consulAddr)
if err != nil {
return nil, err
}
return &Application{
registry: registry,
serviceID: fmt.Sprintf("%s-%s", name, uuid.New().String()),
serviceName: name,
address: address,
port: port,
}, nil
}
func (app *Application) Start() error {
// 注册服务
err := app.registry.Register(ServiceRegistration{
ID: app.serviceID,
Name: app.serviceName,
Address: app.address,
Port: app.port,
Tags: []string{"v1", "production"},
Meta: map[string]string{
"version": "1.0.0",
},
})
if err != nil {
return err
}
// 启动HTTP服务器
mux := http.NewServeMux()
mux.HandleFunc("/health", app.healthHandler)
mux.HandleFunc("/api/data", app.dataHandler)
server := &http.Server{
Addr: fmt.Sprintf(":%d", app.port),
Handler: mux,
}
// 优雅关闭
go func() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
log.Println("Shutting down...")
// 从注册中心注销
if err := app.registry.Deregister(app.serviceID); err != nil {
log.Printf("Failed to deregister: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
server.Shutdown(ctx)
}()
log.Printf("Service %s listening on %s:%d", app.serviceName, app.address, app.port)
return server.ListenAndServe()
}
func (app *Application) healthHandler(w http.ResponseWriter, r *http.Request) {
// 检查依赖服务
if !app.checkDependencies() {
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte("Dependencies unavailable"))
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
}
func (app *Application) checkDependencies() bool {
// 检查数据库连接
// 检查Redis连接
// 检查其他依赖
return true
}
Kubernetes服务发现
使用CoreDNS
# k8s/service.yaml
apiVersion: v1
kind: Service
metadata:
name: order-service
namespace: production
labels:
app: order-service
spec:
selector:
app: order-service
ports:
- protocol: TCP
port: 80
targetPort: 8080
type: ClusterIP
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: order-service
namespace: production
spec:
replicas: 3
selector:
matchLabels:
app: order-service
template:
metadata:
labels:
app: order-service
spec:
containers:
- name: order-service
image: order-service:1.0.0
ports:
- containerPort: 8080
readinessProbe:
httpGet:
path: /health/ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
httpGet:
path: /health/live
port: 8080
initialDelaySeconds: 15
periodSeconds: 20
客户端使用K8s DNS
// k8s/discovery.go
package k8s
import (
"context"
"fmt"
"net"
"strings"
)
type KubernetesDiscovery struct {
namespace string
}
func NewKubernetesDiscovery(namespace string) *KubernetesDiscovery {
return &KubernetesDiscovery{namespace: namespace}
}
// 通过DNS SRV记录发现服务
func (kd *KubernetesDiscovery) DiscoverSRV(serviceName string) ([]string, error) {
// 查询SRV记录: _http._tcp.service-name.namespace.svc.cluster.local
_, addrs, err := net.LookupSRV("http", "tcp",
fmt.Sprintf("%s.%s.svc.cluster.local", serviceName, kd.namespace))
if err != nil {
return nil, fmt.Errorf("SRV lookup failed: %w", err)
}
endpoints := make([]string, len(addrs))
for i, addr := range addrs {
endpoints[i] = fmt.Sprintf("%s:%d", strings.TrimSuffix(addr.Target, "."), addr.Port)
}
return endpoints, nil
}
// 通过A记录发现服务(返回所有Pod IP)
func (kd *KubernetesDiscovery) DiscoverA(serviceName string) ([]string, error) {
// 查询Headless Service的A记录
ips, err := net.LookupHost(
fmt.Sprintf("%s.%s.svc.cluster.local", serviceName, kd.namespace))
if err != nil {
return nil, fmt.Errorf("A record lookup failed: %w", err)
}
return ips, nil
}
// 使用示例
func main() {
discovery := NewKubernetesDiscovery("production")
// 发现订单服务
endpoints, err := discovery.DiscoverSRV("order-service")
if err != nil {
log.Fatal(err)
}
for _, endpoint := range endpoints {
log.Printf("Found endpoint: %s", endpoint)
}
}
健康检查最佳实践
多层次健康检查
// health/checker.go
package health
import (
"context"
"database/sql"
"fmt"
"sync"
"time"
"github.com/go-redis/redis/v8"
)
type HealthStatus string
const (
StatusHealthy HealthStatus = "healthy"
StatusDegraded HealthStatus = "degraded"
StatusUnhealthy HealthStatus = "unhealthy"
)
type ComponentHealth struct {
Name string `json:"name"`
Status HealthStatus `json:"status"`
Message string `json:"message,omitempty"`
Latency time.Duration `json:"latency,omitempty"`
}
type HealthCheck struct {
db *sql.DB
redis *redis.Client
services []string
}
func NewHealthCheck(db *sql.DB, redis *redis.Client) *HealthCheck {
return &HealthCheck{
db: db,
redis: redis,
}
}
// 存活检查 - 应用是否运行
func (hc *HealthCheck) Liveness() HealthStatus {
return StatusHealthy
}
// 就绪检查 - 应用是否可以处理请求
func (hc *HealthCheck) Readiness(ctx context.Context) (HealthStatus, []ComponentHealth) {
var wg sync.WaitGroup
results := make(chan ComponentHealth, 3)
// 检查数据库
wg.Add(1)
go func() {
defer wg.Done()
results <- hc.checkDatabase(ctx)
}()
// 检查Redis
wg.Add(1)
go func() {
defer wg.Done()
results <- hc.checkRedis(ctx)
}()
// 检查依赖服务
wg.Add(1)
go func() {
defer wg.Done()
results <- hc.checkDependencies(ctx)
}()
go func() {
wg.Wait()
close(results)
}()
components := make([]ComponentHealth, 0)
overallStatus := StatusHealthy
for component := range results {
components = append(components, component)
if component.Status == StatusUnhealthy {
overallStatus = StatusUnhealthy
} else if component.Status == StatusDegraded && overallStatus != StatusUnhealthy {
overallStatus = StatusDegraded
}
}
return overallStatus, components
}
func (hc *HealthCheck) checkDatabase(ctx context.Context) ComponentHealth {
start := time.Now()
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
err := hc.db.PingContext(ctx)
latency := time.Since(start)
if err != nil {
return ComponentHealth{
Name: "database",
Status: StatusUnhealthy,
Message: fmt.Sprintf("Database connection failed: %v", err),
Latency: latency,
}
}
if latency > 1*time.Second {
return ComponentHealth{
Name: "database",
Status: StatusDegraded,
Message: "Database response slow",
Latency: latency,
}
}
return ComponentHealth{
Name: "database",
Status: StatusHealthy,
Latency: latency,
}
}
func (hc *HealthCheck) checkRedis(ctx context.Context) ComponentHealth {
start := time.Now()
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
err := hc.redis.Ping(ctx).Err()
latency := time.Since(start)
if err != nil {
return ComponentHealth{
Name: "redis",
Status: StatusUnhealthy,
Message: fmt.Sprintf("Redis connection failed: %v", err),
Latency: latency,
}
}
return ComponentHealth{
Name: "redis",
Status: StatusHealthy,
Latency: latency,
}
}
func (hc *HealthCheck) checkDependencies(ctx context.Context) ComponentHealth {
// 检查依赖的下游服务
return ComponentHealth{
Name: "dependencies",
Status: StatusHealthy,
}
}
// HTTP处理器
func (hc *HealthCheck) LivenessHandler(w http.ResponseWriter, r *http.Request) {
status := hc.Liveness()
w.Header().Set("Content-Type", "application/json")
if status == StatusHealthy {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusServiceUnavailable)
}
json.NewEncoder(w).Encode(map[string]interface{}{
"status": status,
})
}
func (hc *HealthCheck) ReadinessHandler(w http.ResponseWriter, r *http.Request) {
overallStatus, components := hc.Readiness(r.Context())
w.Header().Set("Content-Type", "application/json")
if overallStatus == StatusUnhealthy {
w.WriteHeader(http.StatusServiceUnavailable)
} else {
w.WriteHeader(http.StatusOK)
}
json.NewEncoder(w).Encode(map[string]interface{}{
"status": overallStatus,
"components": components,
})
}
总结
服务发现与健康检查是微服务架构的核心基础设施:
- 服务发现模式:客户端发现提供灵活性,服务端发现简化客户端
- 注册中心选择:Consul适合多数据中心,Eureka适合Spring生态,K8s内置DNS最简便
- 健康检查:区分存活检查和就绪检查,实现多层次健康评估
- 负载均衡:结合服务发现实现智能路由和故障转移
- 缓存策略:合理缓存服务实例列表,减少注册中心压力
关键原则:
- 服务必须能够自注册和自注销
- 健康检查要快速、轻量、可靠
- 实现优雅关闭,先从注册中心注销再停止服务
- 使用熔断和重试机制处理实例故障
- 监控服务发现的性能和可用性
延伸阅读
- Consul官方文档
- Kubernetes Service Discovery
- Netflix Eureka
- Alibaba Nacos
- HashiCorp Learn - Service Discovery
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。