服务发现与健康检查:构建自适应的微服务架构

深入讲解服务发现的核心机制与实现模式,涵盖客户端发现、服务端发现、Consul、Eureka、Nacos等方案,以及健康检查、服务注册、负载均衡的实战代码。

引言

在微服务架构中,服务实例的动态变化使得服务发现成为关键基础设施。本文将深入探讨服务发现的核心机制、实现模式以及健康检查的最佳实践。

服务发现模式

客户端发现模式

┌──────────────┐
│   Service A  │
└──────┬───────┘
       │ 1. 查询注册中心
       ▼
┌──────────────┐
│   Registry   │
└──────┬───────┘
       │ 2. 返回服务实例列表
       ▼
┌──────────────┐      ┌──────────────┐
│   Service A  │─────▶│   Service B  │
│  (负载均衡)  │      │  (实例2)     │
└──────────────┘      └──────────────┘

客户端负责:

  • 查询注册中心获取可用实例
  • 实现负载均衡算法
  • 处理实例故障和重试

服务端发现模式

┌──────────────┐
│   Service A  │
└──────┬───────┘
       │ 请求
       ▼
┌──────────────┐      ┌──────────────┐
│  Load        │─────▶│   Registry   │
│  Balancer    │◀─────│              │
└──────┬───────┘      └──────────────┘
       │
       ▼
┌──────────────┐
│   Service B  │
└──────────────┘

负载均衡器负责:

  • 接收所有服务请求
  • 查询注册中心
  • 路由到健康的实例

Consul实现方案

服务注册

// registry/consul.go
package registry

import (
	"fmt"
	"log"

	"github.com/hashicorp/consul/api"
)

type ConsulRegistry struct {
	client *api.Client
}

func NewConsulRegistry(address string) (*ConsulRegistry, error) {
	config := api.DefaultConfig()
	config.Address = address

	client, err := api.NewClient(config)
	if err != nil {
		return nil, fmt.Errorf("failed to create Consul client: %w", err)
	}

	return &ConsulRegistry{client: client}, nil
}

type ServiceRegistration struct {
	ID      string
	Name    string
	Address string
	Port    int
	Tags    []string
	Meta    map[string]string
}

func (r *ConsulRegistry) Register(service ServiceRegistration) error {
	registration := &api.AgentServiceRegistration{
		ID:      service.ID,
		Name:    service.Name,
		Address: service.Address,
		Port:    service.Port,
		Tags:    service.Tags,
		Meta:    service.Meta,
		Check: &api.AgentServiceCheck{
			HTTP:                           fmt.Sprintf("http://%s:%d/health", service.Address, service.Port),
			Interval:                       "10s",
			Timeout:                        "5s",
			DeregisterCriticalServiceAfter: "30s",
		},
	}

	err := r.client.Agent().ServiceRegister(registration)
	if err != nil {
		return fmt.Errorf("failed to register service: %w", err)
	}

	log.Printf("Service registered: %s (%s:%d)", service.Name, service.Address, service.Port)
	return nil
}

func (r *ConsulRegistry) Deregister(serviceID string) error {
	err := r.client.Agent().ServiceDeregister(serviceID)
	if err != nil {
		return fmt.Errorf("failed to deregister service: %w", err)
	}

	log.Printf("Service deregistered: %s", serviceID)
	return nil
}

func (r *ConsulRegistry) Discover(serviceName string) ([]*api.ServiceEntry, error) {
	entries, _, err := r.client.Health().Service(serviceName, "", true, nil)
	if err != nil {
		return nil, fmt.Errorf("failed to discover service: %w", err)
	}

	return entries, nil
}

客户端负载均衡

// discovery/client.go
package discovery

import (
	"context"
	"fmt"
	"math/rand"
	"net/http"
	"sync"
	"time"

	"github.com/hashicorp/consul/api"
)

type ServiceInstance struct {
	ID      string
	Address string
	Port    int
}

type ServiceDiscovery struct {
	registry     *ConsulRegistry
	cache        map[string][]ServiceInstance
	cacheExpiry  map[string]time.Time
	cacheTTL     time.Duration
	mu           sync.RWMutex
}

func NewServiceDiscovery(registry *ConsulRegistry) *ServiceDiscovery {
	return &ServiceDiscovery{
		registry:    registry,
		cache:       make(map[string][]ServiceInstance),
		cacheExpiry: make(map[string]time.Time),
		cacheTTL:    30 * time.Second,
	}
}

func (sd *ServiceDiscovery) GetInstances(serviceName string) ([]ServiceInstance, error) {
	sd.mu.RLock()
	instances, cached := sd.cache[serviceName]
	expiry, hasExpiry := sd.cacheExpiry[serviceName]
	sd.mu.RUnlock()

	// 检查缓存是否有效
	if cached && hasExpiry && time.Now().Before(expiry) {
		return instances, nil
	}

	// 从注册中心获取
	entries, err := sd.registry.Discover(serviceName)
	if err != nil {
		// 如果有缓存,返回旧数据
		if cached {
			log.Printf("Using stale cache for service %s", serviceName)
			return instances, nil
		}
		return nil, err
	}

	// 转换为实例列表
	instances = make([]ServiceInstance, len(entries))
	for i, entry := range entries {
		instances[i] = ServiceInstance{
			ID:      entry.Service.ID,
			Address: entry.Service.Address,
			Port:    entry.Service.Port,
		}
	}

	// 更新缓存
	sd.mu.Lock()
	sd.cache[serviceName] = instances
	sd.cacheExpiry[serviceName] = time.Now().Add(sd.cacheTTL)
	sd.mu.Unlock()

	return instances, nil
}

// 负载均衡策略
type LoadBalancer interface {
	Select(instances []ServiceInstance) (*ServiceInstance, error)
}

type RandomLoadBalancer struct{}

func (lb *RandomLoadBalancer) Select(instances []ServiceInstance) (*ServiceInstance, error) {
	if len(instances) == 0 {
		return nil, fmt.Errorf("no instances available")
	}
	idx := rand.Intn(len(instances))
	return &instances[idx], nil
}

type RoundRobinLoadBalancer struct {
	counter uint64
	mu      sync.Mutex
}

func (lb *RoundRobinLoadBalancer) Select(instances []ServiceInstance) (*ServiceInstance, error) {
	if len(instances) == 0 {
		return nil, fmt.Errorf("no instances available")
	}

	lb.mu.Lock()
	idx := lb.counter % uint64(len(instances))
	lb.counter++
	lb.mu.Unlock()

	return &instances[idx], nil
}

// HTTP客户端封装
type DiscoveryHTTPClient struct {
	discovery    *ServiceDiscovery
	loadBalancer LoadBalancer
	httpClient   *http.Client
}

func NewDiscoveryHTTPClient(discovery *ServiceDiscovery, lb LoadBalancer) *DiscoveryHTTPClient {
	return &DiscoveryHTTPClient{
		discovery:    discovery,
		loadBalancer: lb,
		httpClient: &http.Client{
			Timeout: 30 * time.Second,
		},
	}
}

func (c *DiscoveryHTTPClient) Do(ctx context.Context, serviceName string, req *http.Request) (*http.Response, error) {
	instances, err := c.discovery.GetInstances(serviceName)
	if err != nil {
		return nil, fmt.Errorf("service discovery failed: %w", err)
	}

	// 尝试多个实例
	var lastErr error
	for attempt := 0; attempt < 3; attempt++ {
		instance, err := c.loadBalancer.Select(instances)
		if err != nil {
			return nil, err
		}

		// 修改请求URL
		req.URL.Scheme = "http"
		req.URL.Host = fmt.Sprintf("%s:%d", instance.Address, instance.Port)

		resp, err := c.httpClient.Do(req.WithContext(ctx))
		if err != nil {
			lastErr = err
			log.Printf("Request to %s failed: %v", instance.ID, err)
			continue
		}

		// 检查是否需要重试
		if resp.StatusCode >= 500 && attempt < 2 {
			resp.Body.Close()
			lastErr = fmt.Errorf("server error: %d", resp.StatusCode)
			continue
		}

		return resp, nil
	}

	return nil, fmt.Errorf("all retries failed: %w", lastErr)
}

服务自注册

// main.go
package main

import (
	"context"
	"log"
	"net/http"
	"os"
	"os/signal"
	"syscall"

	"github.com/google/uuid"
)

type Application struct {
	registry    *ConsulRegistry
	serviceID   string
	serviceName string
	address     string
	port        int
}

func NewApplication(name, address string, port int, consulAddr string) (*Application, error) {
	registry, err := NewConsulRegistry(consulAddr)
	if err != nil {
		return nil, err
	}

	return &Application{
		registry:    registry,
		serviceID:   fmt.Sprintf("%s-%s", name, uuid.New().String()),
		serviceName: name,
		address:     address,
		port:        port,
	}, nil
}

func (app *Application) Start() error {
	// 注册服务
	err := app.registry.Register(ServiceRegistration{
		ID:      app.serviceID,
		Name:    app.serviceName,
		Address: app.address,
		Port:    app.port,
		Tags:    []string{"v1", "production"},
		Meta: map[string]string{
			"version": "1.0.0",
		},
	})
	if err != nil {
		return err
	}

	// 启动HTTP服务器
	mux := http.NewServeMux()
	mux.HandleFunc("/health", app.healthHandler)
	mux.HandleFunc("/api/data", app.dataHandler)

	server := &http.Server{
		Addr:    fmt.Sprintf(":%d", app.port),
		Handler: mux,
	}

	// 优雅关闭
	go func() {
		sigChan := make(chan os.Signal, 1)
		signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
		<-sigChan

		log.Println("Shutting down...")

		// 从注册中心注销
		if err := app.registry.Deregister(app.serviceID); err != nil {
			log.Printf("Failed to deregister: %v", err)
		}

		ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
		defer cancel()
		server.Shutdown(ctx)
	}()

	log.Printf("Service %s listening on %s:%d", app.serviceName, app.address, app.port)
	return server.ListenAndServe()
}

func (app *Application) healthHandler(w http.ResponseWriter, r *http.Request) {
	// 检查依赖服务
	if !app.checkDependencies() {
		w.WriteHeader(http.StatusServiceUnavailable)
		w.Write([]byte("Dependencies unavailable"))
		return
	}

	w.WriteHeader(http.StatusOK)
	w.Write([]byte("OK"))
}

func (app *Application) checkDependencies() bool {
	// 检查数据库连接
	// 检查Redis连接
	// 检查其他依赖
	return true
}

Kubernetes服务发现

使用CoreDNS

# k8s/service.yaml
apiVersion: v1
kind: Service
metadata:
  name: order-service
  namespace: production
  labels:
    app: order-service
spec:
  selector:
    app: order-service
  ports:
    - protocol: TCP
      port: 80
      targetPort: 8080
  type: ClusterIP
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: order-service
  namespace: production
spec:
  replicas: 3
  selector:
    matchLabels:
      app: order-service
  template:
    metadata:
      labels:
        app: order-service
    spec:
      containers:
        - name: order-service
          image: order-service:1.0.0
          ports:
            - containerPort: 8080
          readinessProbe:
            httpGet:
              path: /health/ready
              port: 8080
            initialDelaySeconds: 5
            periodSeconds: 10
          livenessProbe:
            httpGet:
              path: /health/live
              port: 8080
            initialDelaySeconds: 15
            periodSeconds: 20

客户端使用K8s DNS

// k8s/discovery.go
package k8s

import (
	"context"
	"fmt"
	"net"
	"strings"
)

type KubernetesDiscovery struct {
	namespace string
}

func NewKubernetesDiscovery(namespace string) *KubernetesDiscovery {
	return &KubernetesDiscovery{namespace: namespace}
}

// 通过DNS SRV记录发现服务
func (kd *KubernetesDiscovery) DiscoverSRV(serviceName string) ([]string, error) {
	// 查询SRV记录: _http._tcp.service-name.namespace.svc.cluster.local
	_, addrs, err := net.LookupSRV("http", "tcp", 
		fmt.Sprintf("%s.%s.svc.cluster.local", serviceName, kd.namespace))
	if err != nil {
		return nil, fmt.Errorf("SRV lookup failed: %w", err)
	}

	endpoints := make([]string, len(addrs))
	for i, addr := range addrs {
		endpoints[i] = fmt.Sprintf("%s:%d", strings.TrimSuffix(addr.Target, "."), addr.Port)
	}

	return endpoints, nil
}

// 通过A记录发现服务(返回所有Pod IP)
func (kd *KubernetesDiscovery) DiscoverA(serviceName string) ([]string, error) {
	// 查询Headless Service的A记录
	ips, err := net.LookupHost(
		fmt.Sprintf("%s.%s.svc.cluster.local", serviceName, kd.namespace))
	if err != nil {
		return nil, fmt.Errorf("A record lookup failed: %w", err)
	}

	return ips, nil
}

// 使用示例
func main() {
	discovery := NewKubernetesDiscovery("production")
	
	// 发现订单服务
	endpoints, err := discovery.DiscoverSRV("order-service")
	if err != nil {
		log.Fatal(err)
	}
	
	for _, endpoint := range endpoints {
		log.Printf("Found endpoint: %s", endpoint)
	}
}

健康检查最佳实践

多层次健康检查

// health/checker.go
package health

import (
	"context"
	"database/sql"
	"fmt"
	"sync"
	"time"

	"github.com/go-redis/redis/v8"
)

type HealthStatus string

const (
	StatusHealthy   HealthStatus = "healthy"
	StatusDegraded  HealthStatus = "degraded"
	StatusUnhealthy HealthStatus = "unhealthy"
)

type ComponentHealth struct {
	Name    string        `json:"name"`
	Status  HealthStatus  `json:"status"`
	Message string        `json:"message,omitempty"`
	Latency time.Duration `json:"latency,omitempty"`
}

type HealthCheck struct {
	db       *sql.DB
	redis    *redis.Client
	services []string
}

func NewHealthCheck(db *sql.DB, redis *redis.Client) *HealthCheck {
	return &HealthCheck{
		db:    db,
		redis: redis,
	}
}

// 存活检查 - 应用是否运行
func (hc *HealthCheck) Liveness() HealthStatus {
	return StatusHealthy
}

// 就绪检查 - 应用是否可以处理请求
func (hc *HealthCheck) Readiness(ctx context.Context) (HealthStatus, []ComponentHealth) {
	var wg sync.WaitGroup
	results := make(chan ComponentHealth, 3)

	// 检查数据库
	wg.Add(1)
	go func() {
		defer wg.Done()
		results <- hc.checkDatabase(ctx)
	}()

	// 检查Redis
	wg.Add(1)
	go func() {
		defer wg.Done()
		results <- hc.checkRedis(ctx)
	}()

	// 检查依赖服务
	wg.Add(1)
	go func() {
		defer wg.Done()
		results <- hc.checkDependencies(ctx)
	}()

	go func() {
		wg.Wait()
		close(results)
	}()

	components := make([]ComponentHealth, 0)
	overallStatus := StatusHealthy

	for component := range results {
		components = append(components, component)
		if component.Status == StatusUnhealthy {
			overallStatus = StatusUnhealthy
		} else if component.Status == StatusDegraded && overallStatus != StatusUnhealthy {
			overallStatus = StatusDegraded
		}
	}

	return overallStatus, components
}

func (hc *HealthCheck) checkDatabase(ctx context.Context) ComponentHealth {
	start := time.Now()
	
	ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
	defer cancel()

	err := hc.db.PingContext(ctx)
	latency := time.Since(start)

	if err != nil {
		return ComponentHealth{
			Name:    "database",
			Status:  StatusUnhealthy,
			Message: fmt.Sprintf("Database connection failed: %v", err),
			Latency: latency,
		}
	}

	if latency > 1*time.Second {
		return ComponentHealth{
			Name:    "database",
			Status:  StatusDegraded,
			Message: "Database response slow",
			Latency: latency,
		}
	}

	return ComponentHealth{
		Name:    "database",
		Status:  StatusHealthy,
		Latency: latency,
	}
}

func (hc *HealthCheck) checkRedis(ctx context.Context) ComponentHealth {
	start := time.Now()
	
	ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
	defer cancel()

	err := hc.redis.Ping(ctx).Err()
	latency := time.Since(start)

	if err != nil {
		return ComponentHealth{
			Name:    "redis",
			Status:  StatusUnhealthy,
			Message: fmt.Sprintf("Redis connection failed: %v", err),
			Latency: latency,
		}
	}

	return ComponentHealth{
		Name:    "redis",
		Status:  StatusHealthy,
		Latency: latency,
	}
}

func (hc *HealthCheck) checkDependencies(ctx context.Context) ComponentHealth {
	// 检查依赖的下游服务
	return ComponentHealth{
		Name:   "dependencies",
		Status: StatusHealthy,
	}
}

// HTTP处理器
func (hc *HealthCheck) LivenessHandler(w http.ResponseWriter, r *http.Request) {
	status := hc.Liveness()
	w.Header().Set("Content-Type", "application/json")
	
	if status == StatusHealthy {
		w.WriteHeader(http.StatusOK)
	} else {
		w.WriteHeader(http.StatusServiceUnavailable)
	}
	
	json.NewEncoder(w).Encode(map[string]interface{}{
		"status": status,
	})
}

func (hc *HealthCheck) ReadinessHandler(w http.ResponseWriter, r *http.Request) {
	overallStatus, components := hc.Readiness(r.Context())
	
	w.Header().Set("Content-Type", "application/json")
	
	if overallStatus == StatusUnhealthy {
		w.WriteHeader(http.StatusServiceUnavailable)
	} else {
		w.WriteHeader(http.StatusOK)
	}
	
	json.NewEncoder(w).Encode(map[string]interface{}{
		"status":     overallStatus,
		"components": components,
	})
}

总结

服务发现与健康检查是微服务架构的核心基础设施:

  1. 服务发现模式:客户端发现提供灵活性,服务端发现简化客户端
  2. 注册中心选择:Consul适合多数据中心,Eureka适合Spring生态,K8s内置DNS最简便
  3. 健康检查:区分存活检查和就绪检查,实现多层次健康评估
  4. 负载均衡:结合服务发现实现智能路由和故障转移
  5. 缓存策略:合理缓存服务实例列表,减少注册中心压力

关键原则:

  • 服务必须能够自注册和自注销
  • 健康检查要快速、轻量、可靠
  • 实现优雅关闭,先从注册中心注销再停止服务
  • 使用熔断和重试机制处理实例故障
  • 监控服务发现的性能和可用性

延伸阅读

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页