Go 与 Kubernetes:构建自定义 Operator

系统讲解 Kubernetes Operator 模式,使用 kubebuilder 和 controller-runtime 构建自定义控制器,覆盖 CRD 定义、Reconciliation Loop、Finalizer、Status 管理、Webhook、测试和部署的完整实战

Go 与 Kubernetes:构建自定义 Operator

你有没有遇到过这样的场景:公司里有一套复杂的中间件,每次部署都需要手动配置一大堆 YAML 文件,稍有不慎就会配错。你心里想,如果 Kubernetes 能"认识"这个中间件,自动帮我管理该多好?

这就是 Operator 的价值所在。Operator 是 Kubernetes 的扩展机制,它让你把运维知识编码成软件,让 Kubernetes 自动管理复杂的应用和自定义资源。今天,我们就来深入探讨如何用 Go 和 kubebuilder 构建一个生产级的 Kubernetes Operator。

什么是 Operator?

在深入代码之前,让我们先理解 Operator 的核心概念:

  • Custom Resource(CR):Kubernetes 允许你定义自己的资源类型,就像内置的 Pod、Deployment 一样
  • Custom Resource Definition(CRD):定义 CR 的 schema 和结构
  • Controller:监听 CR 的变化,执行 Reconciliation(调谐)逻辑,确保系统状态符合期望
  • Operator = CRD + Controller + 领域知识

用一个类比来说:Kubernetes 内置的 Controller 就像是"通用管家",知道如何管理 Pod 和 Service;而 Operator 就像是你自己请的"专业管家",知道如何管理你的特定应用。

项目准备:使用 kubebuilder

kubebuilder 是 Kubernetes SIG 推荐的 Operator 开发框架,它能自动生成大量样板代码:

# 安装 kubebuilder
curl -L -o kubebuilder https://go.kubebuilder.io/dl/latest/$(go env GOOS)/$(go env GOARCH)
chmod +x kubebuilder && mv kubebuilder /usr/local/bin/

# 创建项目
mkdir webapp-operator && cd webapp-operator
go mod init github.com/yourusername/webapp-operator

# 初始化 kubebuilder 项目
kubebuilder init --domain example.com --repo github.com/yourusername/webapp-operator

# 创建 API(CRD + Controller)
kubebuilder create api --group webapp --version v1alpha1 --kind WebApp --resource --controller

执行完毕后,你会看到这样的项目结构:

webapp-operator/
├── api/
│   └── v1alpha1/
│       ├── groupversion_info.go
│       ├── webapp_types.go          # CRD 定义
│       └── zz_generated.deepcopy.go # 自动生成的 DeepCopy
├── cmd/
│   └── main.go                       # 控制器入口
├── config/
│   ├── crd/                          # CRD YAML
│   ├── rbac/                         # RBAC 配置
│   └── manager/                      # Deployment 配置
├── internal/
│   └── controller/
│       └── webapp_controller.go      # 控制器逻辑
└── Makefile

定义 CRD(Custom Resource Definition)

CRD 是 Operator 的灵魂。它定义了用户可以创建什么样的资源。让我们设计一个 WebApp CRD:

// api/v1alpha1/webapp_types.go
package v1alpha1

import (
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// WebAppSpec 定义期望状态
type WebAppSpec struct {
	//+kubebuilder:validation:Required
	//+kubebuilder:validation:MinLength=1
	// 应用名称
	Name string `json:"name"`

	//+kubebuilder:validation:Required
	//+kubebuilder:validation:Pattern=`^[a-z0-9.-]+(/[a-z0-9.-]+)*(:[a-z0-9.-]+)?$`
	// 镜像地址
	Image string `json:"image"`

	//+kubebuilder:validation:Minimum=1
	//+kubebuilder:validation:Maximum=100
	//+kubebuilder:default=1
	// 副本数
	Replicas int32 `json:"replicas,omitempty"`

	//+kubebuilder:validation:Minimum=1
	//+kubebuilder:validation:Maximum=65535
	//+kubebuilder:default=8080
	// 服务端口
	Port int32 `json:"port,omitempty"`

	//+kubebuilder:validation:Enum=small;medium;large
	//+kubebuilder:default=small
	// 资源规格
	Size ResourceSize `json:"size,omitempty"`

	// 环境变量
	Env []corev1.EnvVar `json:"env,omitempty"`

	// 数据库配置
	Database *DatabaseConfig `json:"database,omitempty"`

	// 自动扩缩配置
	AutoScaling *AutoScalingConfig `json:"autoScaling,omitempty"`

	// Ingress 配置
	Ingress *IngressConfig `json:"ingress,omitempty"`
}

// ResourceSize 资源规格
//+kubebuilder:validation:Enum=small;medium;large
type ResourceSize string

const (
	SmallSize  ResourceSize = "small"
	MediumSize ResourceSize = "medium"
	LargeSize  ResourceSize = "large"
)

// DatabaseConfig 数据库配置
type DatabaseConfig struct {
	//+kubebuilder:validation:Required
	Host string `json:"host"`

	//+kubebuilder:validation:Minimum=1
	//+kubebuilder:validation:Maximum=65535
	//+kubebuilder:default=5432
	Port int32 `json:"port,omitempty"`

	//+kubebuilder:validation:Required
	Name string `json:"name"`

	//+kubebuilder:validation:Required
	// Secret 引用(不要明文存密码)
	CredentialsSecret string `json:"credentialsSecret"`
}

// AutoScalingConfig 自动扩缩配置
type AutoScalingConfig struct {
	//+kubebuilder:validation:Minimum=1
	MinReplicas int32 `json:"minReplicas"`

	//+kubebuilder:validation:Minimum=1
	MaxReplicas int32 `json:"maxReplicas"`

	//+kubebuilder:validation:Minimum=1
	//+kubebuilder:validation:Maximum=100
	TargetCPUUtilization int32 `json:"targetCPUUtilization,omitempty"`
}

// IngressConfig Ingress 配置
type IngressConfig struct {
	//+kubebuilder:validation:Required
	Host string `json:"host"`

	//+kubebuilder:default=false
	TLS bool `json:"tls,omitempty"`

	// TLS Secret 名称
	TLSSecret string `json:"tlsSecret,omitempty"`
}

// WebAppStatus 定义观测状态
type WebAppStatus struct {
	// 当前可用的副本数
	AvailableReplicas int32 `json:"availableReplicas"`

	// 当前状态
	//+kubebuilder:validation:Enum=Pending;Running;Failed;Updating
	Phase WebAppPhase `json:"phase,omitempty"`

	// 服务地址
	ServiceURL string `json:"serviceURL,omitempty"`

	// 最近的部署时间
	LastDeployed *metav1.Time `json:"lastDeployed,omitempty"`

	// 状态条件
	Conditions []metav1.Condition `json:"conditions,omitempty"`
}

// WebAppPhase 应用阶段
type WebAppPhase string

const (
	PhasePending  WebAppPhase = "Pending"
	PhaseRunning  WebAppPhase = "Running"
	PhaseFailed   WebAppPhase = "Failed"
	PhaseUpdating WebAppPhase = "Updating"
)

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:printcolumn:name="Phase",type=string,JSONPath=`.status.phase`
//+kubebuilder:printcolumn:name="Replicas",type=integer,JSONPath=`.status.availableReplicas`
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
//+kubebuilder:resource:shortName=wa

// WebApp 是 Schema 定义
type WebApp struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   WebAppSpec   `json:"spec,omitempty"`
	Status WebAppStatus `json:"status,omitempty"`
}

//+kubebuilder:object:root=true

// WebAppList 是 WebApp 列表
type WebAppList struct {
	metav1.TypeMeta `json:",inline"`
	metav1.ListMeta `json:"metadata,omitempty"`
	Items           []WebApp `json:"items"`
}

func init() {
	SchemeBuilder.Register(&WebApp{}, &WebAppList{})
}

生成 CRD 和 DeepCopy 代码:

# 生成 DeepCopy 方法
make generate

# 生成 CRD YAML
make manifests

生成的 CRD YAML 会包含 OpenAPI v3 校验规则,这样 Kubernetes 就能验证用户提交的 WebApp 资源是否合法。

实现 Reconciliation Loop

Reconciliation Loop 是 Operator 的核心。它是一个无限循环,不断检查当前状态和期望状态的差异,并执行修复操作。

// internal/controller/webapp_controller.go
package controller

import (
	"context"
	"fmt"
	"time"

	appsv1 "k8s.io/api/apps/v1"
	corev1 "k8s.io/api/core/v1"
	networkingv1 "k8s.io/api/networking/v1"
	"k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/api/meta"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/apimachinery/pkg/util/intstr"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
	"sigs.k8s.io/controller-runtime/pkg/log"

	webappv1alpha1 "github.com/yourusername/webapp-operator/api/v1alpha1"
)

const (
	webappFinalizer = "webapp.example.com/finalizer"
	requeueAfter    = 30 * time.Second
)

// WebAppReconciler 调谐 WebApp 资源
type WebAppReconciler struct {
	client.Client
	Scheme *runtime.Scheme
}

//+kubebuilder:rbac:groups=webapp.example.com,resources=webapps,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=webapp.example.com,resources=webapps/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=webapp.example.com,resources=webapps/finalizers,verbs=update
//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=get;list;watch;create;update;patch;delete

// Reconcile 调谐循环
func (r *WebAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	logger := log.FromContext(ctx)
	logger.Info("Reconciling WebApp", "namespace", req.Namespace, "name", req.Name)

	// 第 1 步:获取 WebApp 资源
	webapp := &webappv1alpha1.WebApp{}
	if err := r.Get(ctx, req.NamespacedName, webapp); err != nil {
		if errors.IsNotFound(err) {
			logger.Info("WebApp resource not found, ignoring")
			return ctrl.Result{}, nil
		}
		logger.Error(err, "Failed to get WebApp")
		return ctrl.Result{}, err
	}

	// 第 2 步:处理 Finalizer(删除前的清理工作)
	if webapp.ObjectMeta.DeletionTimestamp.IsZero() {
		// 资源未被删除,确保 finalizer 存在
		if !controllerutil.ContainsFinalizer(webapp, webappFinalizer) {
			controllerutil.AddFinalizer(webapp, webappFinalizer)
			if err := r.Update(ctx, webapp); err != nil {
				return ctrl.Result{}, err
			}
		}
	} else {
		// 资源正在被删除
		if controllerutil.ContainsFinalizer(webapp, webappFinalizer) {
			// 执行清理逻辑
			if err := r.cleanupExternalResources(ctx, webapp); err != nil {
				logger.Error(err, "Failed to cleanup external resources")
				return ctrl.Result{}, err
			}

			// 移除 finalizer
			controllerutil.RemoveFinalizer(webapp, webappFinalizer)
			if err := r.Update(ctx, webapp); err != nil {
				return ctrl.Result{}, err
			}
		}
		return ctrl.Result{}, nil
	}

	// 第 3 步:调谐 Deployment
	deploymentResult, err := r.reconcileDeployment(ctx, webapp)
	if err != nil {
		r.setCondition(ctx, webapp, "DeploymentReady", metav1.ConditionFalse,
			"DeploymentFailed", err.Error())
		return deploymentResult, err
	}

	// 第 4 步:调谐 Service
	serviceResult, err := r.reconcileService(ctx, webapp)
	if err != nil {
		r.setCondition(ctx, webapp, "ServiceReady", metav1.ConditionFalse,
			"ServiceFailed", err.Error())
		return serviceResult, err
	}

	// 第 5 步:调谐 Ingress(如果配置了)
	if webapp.Spec.Ingress != nil {
		ingressResult, err := r.reconcileIngress(ctx, webapp)
		if err != nil {
			r.setCondition(ctx, webapp, "IngressReady", metav1.ConditionFalse,
				"IngressFailed", err.Error())
			return ingressResult, err
		}
	}

	// 第 6 步:调谐 HPA(如果配置了自动扩缩)
	if webapp.Spec.AutoScaling != nil {
		hpaResult, err := r.reconcileHPA(ctx, webapp)
		if err != nil {
			logger.Error(err, "Failed to reconcile HPA")
			// HPA 失败不影响主流程,只记录日志
		} else {
			_ = hpaResult
		}
	}

	// 第 7 步:更新 Status
	if err := r.updateStatus(ctx, webapp); err != nil {
		logger.Error(err, "Failed to update status")
		return ctrl.Result{}, err
	}

	// 设置成功条件
	r.setCondition(ctx, webapp, "Ready", metav1.ConditionTrue,
		"ReconcileSuccess", "All resources reconciled successfully")

	logger.Info("Reconciliation completed successfully")
	return ctrl.Result{RequeueAfter: requeueAfter}, nil
}

// reconcileDeployment 调谐 Deployment
func (r *WebAppReconciler) reconcileDeployment(ctx context.Context, webapp *webappv1alpha1.WebApp) (ctrl.Result, error) {
	logger := log.FromContext(ctx)

	deployment := &appsv1.Deployment{
		ObjectMeta: metav1.ObjectMeta{
			Name:      webapp.Name,
			Namespace: webapp.Namespace,
		},
	}

	// CreateOrUpdate 是 kubebuilder 的魔法:
	// 如果资源不存在就创建,存在就更新
	op, err := controllerutil.CreateOrUpdate(ctx, r.Client, deployment, func() error {
		// 设置 Deployment 的期望状态
		r.mutateDeployment(deployment, webapp)

		// 设置 OwnerReference(这样删除 WebApp 时会自动删除 Deployment)
		return controllerutil.SetControllerReference(webapp, deployment, r.Scheme)
	})

	if err != nil {
		logger.Error(err, "Failed to reconcile Deployment")
		return ctrl.Result{}, err
	}

	logger.Info("Deployment reconciled", "operation", op)
	return ctrl.Result{}, nil
}

// mutateDeployment 设置 Deployment 的期望状态
func (r *WebAppReconciler) mutateDeployment(deployment *appsv1.Deployment, webapp *webappv1alpha1.WebApp) {
	labels := map[string]string{
		"app":                          webapp.Name,
		"app.kubernetes.io/name":       webapp.Name,
		"app.kubernetes.io/managed-by": "webapp-operator",
	}

	replicas := webapp.Spec.Replicas
	if webapp.Spec.AutoScaling != nil {
		replicas = webapp.Spec.AutoScaling.MinReplicas
	}

	resources := r.getResourcesForSize(webapp.Spec.Size)

	deployment.Spec = appsv1.DeploymentSpec{
		Replicas: &replicas,
		Selector: &metav1.LabelSelector{
			MatchLabels: labels,
		},
		Template: corev1.PodTemplateSpec{
			ObjectMeta: metav1.ObjectMeta{
				Labels: labels,
			},
			Spec: corev1.PodSpec{
				Containers: []corev1.Container{
					{
						Name:      webapp.Name,
						Image:     webapp.Spec.Image,
						Resources: resources,
						Ports: []corev1.ContainerPort{
							{
								Name:          "http",
								ContainerPort: webapp.Spec.Port,
								Protocol:      corev1.ProtocolTCP,
							},
						},
						Env: webapp.Spec.Env,
						// 健康检查
						LivenessProbe: &corev1.Probe{
							ProbeHandler: corev1.ProbeHandler{
								HTTPGet: &corev1.HTTPGetAction{
									Path: "/health",
									Port: intstr.FromInt(int(webapp.Spec.Port)),
								},
							},
							InitialDelaySeconds: 30,
							PeriodSeconds:       10,
							FailureThreshold:    3,
						},
						ReadinessProbe: &corev1.Probe{
							ProbeHandler: corev1.ProbeHandler{
								HTTPGet: &corev1.HTTPGetAction{
									Path: "/health",
									Port: intstr.FromInt(int(webapp.Spec.Port)),
								},
							},
							InitialDelaySeconds: 5,
							PeriodSeconds:       5,
							FailureThreshold:    3,
						},
					},
				},
			},
		},
	}

	// 注入数据库环境变量
	if webapp.Spec.Database != nil {
		deployment.Spec.Template.Spec.Containers[0].Env = append(
			deployment.Spec.Template.Spec.Containers[0].Env,
			corev1.EnvVar{
				Name:  "DATABASE_HOST",
				Value: webapp.Spec.Database.Host,
			},
			corev1.EnvVar{
				Name:  "DATABASE_PORT",
				Value: fmt.Sprintf("%d", webapp.Spec.Database.Port),
			},
			corev1.EnvVar{
				Name:  "DATABASE_NAME",
				Value: webapp.Spec.Database.Name,
			},
			corev1.EnvVar{
				Name: "DATABASE_PASSWORD",
				ValueFrom: &corev1.EnvVarSource{
					SecretKeyRef: &corev1.SecretKeySelector{
						LocalObjectReference: corev1.LocalObjectReference{
							Name: webapp.Spec.Database.CredentialsSecret,
						},
						Key: "password",
					},
				},
			},
		)
	}
}

// getResourcesForSize 根据规格返回资源限制
func (r *WebAppReconciler) getResourcesForSize(size webappv1alpha1.ResourceSize) corev1.ResourceRequirements {
	switch size {
	case webappv1alpha1.MediumSize:
		return corev1.ResourceRequirements{
			Requests: corev1.ResourceList{
				corev1.ResourceCPU:    resource.MustParse("250m"),
				corev1.ResourceMemory: resource.MustParse("256Mi"),
			},
			Limits: corev1.ResourceList{
				corev1.ResourceCPU:    resource.MustParse("500m"),
				corev1.ResourceMemory: resource.MustParse("512Mi"),
			},
		}
	case webappv1alpha1.LargeSize:
		return corev1.ResourceRequirements{
			Requests: corev1.ResourceList{
				corev1.ResourceCPU:    resource.MustParse("500m"),
				corev1.ResourceMemory: resource.MustParse("512Mi"),
			},
			Limits: corev1.ResourceList{
				corev1.ResourceCPU:    resource.MustParse("1"),
				corev1.ResourceMemory: resource.MustParse("1Gi"),
			},
		}
	default: // Small
		return corev1.ResourceRequirements{
			Requests: corev1.ResourceList{
				corev1.ResourceCPU:    resource.MustParse("100m"),
				corev1.ResourceMemory: resource.MustParse("128Mi"),
			},
			Limits: corev1.ResourceList{
				corev1.ResourceCPU:    resource.MustParse("200m"),
				corev1.ResourceMemory: resource.MustParse("256Mi"),
			},
		}
	}
}

// reconcileService 调谐 Service
func (r *WebAppReconciler) reconcileService(ctx context.Context, webapp *webappv1alpha1.WebApp) (ctrl.Result, error) {
	logger := log.FromContext(ctx)

	service := &corev1.Service{
		ObjectMeta: metav1.ObjectMeta{
			Name:      webapp.Name,
			Namespace: webapp.Namespace,
		},
	}

	op, err := controllerutil.CreateOrUpdate(ctx, r.Client, service, func() error {
		service.Spec = corev1.ServiceSpec{
			Selector: map[string]string{
				"app": webapp.Name,
			},
			Ports: []corev1.ServicePort{
				{
					Name:       "http",
					Port:       80,
					TargetPort: intstr.FromInt(int(webapp.Spec.Port)),
					Protocol:   corev1.ProtocolTCP,
				},
			},
			Type: corev1.ServiceTypeClusterIP,
		}
		return controllerutil.SetControllerReference(webapp, service, r.Scheme)
	})

	if err != nil {
		logger.Error(err, "Failed to reconcile Service")
		return ctrl.Result{}, err
	}

	logger.Info("Service reconciled", "operation", op)
	return ctrl.Result{}, nil
}

// reconcileIngress 调谐 Ingress
func (r *WebAppReconciler) reconcileIngress(ctx context.Context, webapp *webappv1alpha1.WebApp) (ctrl.Result, error) {
	logger := log.FromContext(ctx)

	ingress := &networkingv1.Ingress{
		ObjectMeta: metav1.ObjectMeta{
			Name:      webapp.Name,
			Namespace: webapp.Namespace,
		},
	}

	op, err := controllerutil.CreateOrUpdate(ctx, r.Client, ingress, func() error {
		pathType := networkingv1.PathTypePrefix
		ingress.Spec = networkingv1.IngressSpec{
			Rules: []networkingv1.IngressRule{
				{
					Host: webapp.Spec.Ingress.Host,
					IngressRuleValue: networkingv1.IngressRuleValue{
						HTTP: &networkingv1.HTTPIngressRuleValue{
							Paths: []networkingv1.HTTPIngressPath{
								{
									Path:     "/",
									PathType: &pathType,
									Backend: networkingv1.IngressBackend{
										Service: &networkingv1.IngressServiceBackend{
											Name: webapp.Name,
											Port: networkingv1.ServiceBackendPort{
												Number: 80,
											},
										},
									},
								},
							},
						},
					},
				},
			},
		}

		// 配置 TLS
		if webapp.Spec.Ingress.TLS && webapp.Spec.Ingress.TLSSecret != "" {
			ingress.Spec.TLS = []networkingv1.IngressTLS{
				{
					Hosts:      []string{webapp.Spec.Ingress.Host},
					SecretName: webapp.Spec.Ingress.TLSSecret,
				},
			}
		}

		return controllerutil.SetControllerReference(webapp, ingress, r.Scheme)
	})

	if err != nil {
		logger.Error(err, "Failed to reconcile Ingress")
		return ctrl.Result{}, err
	}

	logger.Info("Ingress reconciled", "operation", op)
	return ctrl.Result{}, nil
}

Status 管理与 Condition

Status 是 Operator 向用户反馈状态的窗口。良好的 Status 设计能让运维人员一眼看出资源的健康状态:

// updateStatus 更新 WebApp 状态
func (r *WebAppReconciler) updateStatus(ctx context.Context, webapp *webappv1alpha1.WebApp) error {
	logger := log.FromContext(ctx)

	// 获取当前 Deployment 状态
	deployment := &appsv1.Deployment{}
	err := r.Get(ctx, types.NamespacedName{
		Name:      webapp.Name,
		Namespace: webapp.Namespace,
	}, deployment)

	if err != nil {
		if errors.IsNotFound(err) {
			webapp.Status.Phase = webappv1alpha1.PhasePending
			webapp.Status.AvailableReplicas = 0
		} else {
			return err
		}
	} else {
		webapp.Status.AvailableReplicas = deployment.Status.AvailableReplicas

		// 根据 Deployment 状态判断 Phase
		if deployment.Status.AvailableReplicas == *deployment.Spec.Replicas {
			webapp.Status.Phase = webappv1alpha1.PhaseRunning
			webapp.Status.ServiceURL = fmt.Sprintf("http://%s.%s.svc.cluster.local",
				webapp.Name, webapp.Namespace)
		} else if deployment.Status.UpdatedReplicas < *deployment.Spec.Replicas {
			webapp.Status.Phase = webappv1alpha1.PhaseUpdating
		} else {
			webapp.Status.Phase = webappv1alpha1.PhasePending
		}

		// 更新最后部署时间
		now := metav1.Now()
		webapp.Status.LastDeployed = &now
	}

	// 更新 Status 子资源
	if err := r.Status().Update(ctx, webapp); err != nil {
		logger.Error(err, "Failed to update WebApp status")
		return err
	}

	return nil
}

// setCondition 设置 Condition(标准化的状态条件)
func (r *WebAppReconciler) setCondition(ctx context.Context, webapp *webappv1alpha1.WebApp,
	condType string, status metav1.ConditionStatus, reason, message string) {

	condition := metav1.Condition{
		Type:               condType,
		Status:             status,
		ObservedGeneration: webapp.Generation,
		LastTransitionTime: metav1.Now(),
		Reason:             reason,
		Message:            message,
	}

	meta.SetStatusCondition(&webapp.Status.Conditions, condition)

	// 尝试更新 status
	if err := r.Status().Update(ctx, webapp); err != nil {
		log.FromContext(ctx).Error(err, "Failed to update condition")
	}
}

Finalizer:优雅清理资源

Finalizer 是 Kubernetes 的"删除拦截器"。当资源被删除时,Kubernetes 会等待所有 Finalizer 清理完毕才真正删除资源:

// cleanupExternalResources 清理外部资源
func (r *WebAppReconciler) cleanupExternalResources(ctx context.Context, webapp *webappv1alpha1.WebApp) error {
	logger := log.FromContext(ctx)
	logger.Info("Cleaning up external resources for WebApp", "name", webapp.Name)

	// 示例:清理外部数据库中的相关记录
	if webapp.Spec.Database != nil {
		logger.Info("Cleaning up database records",
			"host", webapp.Spec.Database.Host,
			"database", webapp.Spec.Database.Name,
		)
		// 这里可以调用外部 API 清理数据库...
	}

	// 示例:清理外部存储桶
	logger.Info("Cleaning up storage bucket", "name", webapp.Name)
	// 这里可以调用云厂商 API 删除存储桶...

	// 示例:通知监控系统
	logger.Info("Removing monitoring alerts", "name", webapp.Name)
	// 这里可以调用 Prometheus API 删除告警规则...

	return nil
}

Webhook:资源校验与默认值

Webhook 让你在资源被创建、更新、删除时执行自定义逻辑。kubebuilder 提供了两种 Webhook:

  • Mutating Webhook:修改资源(例如注入默认值)
  • Validating Webhook:验证资源是否合法
// api/v1alpha1/webapp_webhook.go
package v1alpha1

import (
	"fmt"
	"regexp"

	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/util/validation/field"
	ctrl "sigs.k8s.io/controller-runtime"
	logf "sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/webhook"
	"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)

var webapplog = logf.Log.WithName("webapp-resource")

// SetupWebhookWithManager 注册 Webhook
func (r *WebApp) SetupWebhookWithManager(mgr ctrl.Manager) error {
	return ctrl.NewWebhookManagedBy(mgr).
		For(r).
		WithDefaulter(r).
		WithValidator(r).
		Complete()
}

//+kubebuilder:webhook:path=/mutate-webapp-example-com-v1alpha1-webapp,mutating=true,failurePolicy=fail,sideEffects=None,groups=webapp.example.com,resources=webapps,verbs=create;update,versions=v1alpha1,name=mwebapp.kb.io,admissionReviewVersions=v1

var _ webhook.Defaulter = &WebApp{}

// Default 实现默认值注入
func (r *WebApp) Default() {
	webapplog.Info("default", "name", r.Name)

	// 设置默认副本数
	if r.Spec.Replicas == 0 {
		r.Spec.Replicas = 1
	}

	// 设置默认端口
	if r.Spec.Port == 0 {
		r.Spec.Port = 8080
	}

	// 设置默认规格
	if r.Spec.Size == "" {
		r.Spec.Size = SmallSize
	}

	// 设置默认数据库端口
	if r.Spec.Database != nil && r.Spec.Database.Port == 0 {
		r.Spec.Database.Port = 5432
	}

	// 设置自动扩缩的默认值
	if r.Spec.AutoScaling != nil {
		if r.Spec.AutoScaling.MinReplicas == 0 {
			r.Spec.AutoScaling.MinReplicas = 1
		}
		if r.Spec.AutoScaling.TargetCPUUtilization == 0 {
			r.Spec.AutoScaling.TargetCPUUtilization = 80
		}
	}
}

//+kubebuilder:webhook:path=/validate-webapp-example-com-v1alpha1-webapp,mutating=false,failurePolicy=fail,sideEffects=None,groups=webapp.example.com,resources=webapps,verbs=create;update,versions=v1alpha1,name=vwebapp.kb.io,admissionReviewVersions=v1

var _ webhook.Validator = &WebApp{}

// ValidateCreate 创建时验证
func (r *WebApp) ValidateCreate() (admission.Warnings, error) {
	webapplog.Info("validate create", "name", r.Name)
	return nil, r.validate()
}

// ValidateUpdate 更新时验证
func (r *WebApp) ValidateUpdate(old runtime.Object) (admission.Warnings, error) {
	webapplog.Info("validate update", "name", r.Name)

	oldWebApp := old.(*WebApp)

	// 禁止修改某些字段(不可变字段)
	if oldWebApp.Spec.Database != nil && r.Spec.Database != nil {
		if oldWebApp.Spec.Database.Host != r.Spec.Database.Host {
			return nil, fmt.Errorf("database host is immutable, cannot change from %s to %s",
				oldWebApp.Spec.Database.Host, r.Spec.Database.Host)
		}
	}

	return nil, r.validate()
}

// ValidateDelete 删除时验证
func (r *WebApp) ValidateDelete() (admission.Warnings, error) {
	webapplog.Info("validate delete", "name", r.Name)
	// 可以检查是否还有依赖这个 WebApp 的其他资源
	return nil, nil
}

// validate 通用验证逻辑
func (r *WebApp) validate() error {
	var allErrs field.ErrorList

	// 验证镜像格式
	imagePattern := regexp.MustCompile(`^[a-z0-9.-]+(/[a-z0-9.-]+)*(:[a-z0-9.-]+)?$`)
	if !imagePattern.MatchString(r.Spec.Image) {
		allErrs = append(allErrs, field.Invalid(
			field.NewPath("spec").Child("image"),
			r.Spec.Image,
			"invalid image format",
		))
	}

	// 验证自动扩缩配置
	if r.Spec.AutoScaling != nil {
		if r.Spec.AutoScaling.MinReplicas > r.Spec.AutoScaling.MaxReplicas {
			allErrs = append(allErrs, field.Invalid(
				field.NewPath("spec").Child("autoScaling").Child("minReplicas"),
				r.Spec.AutoScaling.MinReplicas,
				"minReplicas cannot be greater than maxReplicas",
			))
		}
	}

	// 验证 TLS 配置
	if r.Spec.Ingress != nil && r.Spec.Ingress.TLS {
		if r.Spec.Ingress.TLSSecret == "" {
			allErrs = append(allErrs, field.Required(
				field.NewPath("spec").Child("ingress").Child("tlsSecret"),
				"tlsSecret is required when TLS is enabled",
			))
		}
	}

	if len(allErrs) == 0 {
		return nil
	}

	return fmt.Errorf("validation failed: %v", allErrs)
}

启用 Webhook 需要在 main.go 中配置:

// cmd/main.go
package main

import (
	"flag"
	"os"

	"k8s.io/apimachinery/pkg/runtime"
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
	clientgoscheme "k8s.io/client-go/kubernetes/scheme"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/healthz"
	"sigs.k8s.io/controller-runtime/pkg/log/zap"
	metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
	"sigs.k8s.io/controller-runtime/pkg/webhook"

	webappv1alpha1 "github.com/yourusername/webapp-operator/api/v1alpha1"
	"github.com/yourusername/webapp-operator/internal/controller"
)

var (
	scheme   = runtime.NewScheme()
	setupLog = ctrl.Log.WithName("setup")
)

func init() {
	utilruntime.Must(clientgoscheme.AddToScheme(scheme))
	utilruntime.Must(webappv1alpha1.AddToScheme(scheme))
}

func main() {
	var metricsAddr string
	var enableLeaderElection bool
	var probeAddr string
	var enableWebhooks bool

	flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
	flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
	flag.BoolVar(&enableLeaderElection, "leader-elect", false,
		"Enable leader election for controller manager.")
	flag.BoolVar(&enableWebhooks, "enable-webhooks", true, "Enable webhooks.")

	opts := zap.Options{
		Development: true,
	}
	opts.BindFlags(flag.CommandLine)
	flag.Parse()

	ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))

	mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
		Scheme:                 scheme,
		Metrics:                metricsserver.Options{BindAddress: metricsAddr},
		WebhookServer:          webhook.NewServer(webhook.Options{Port: 9443}),
		HealthProbeBindAddress: probeAddr,
		LeaderElection:         enableLeaderElection,
		LeaderElectionID:       "webapp-operator.example.com",
	})
	if err != nil {
		setupLog.Error(err, "unable to start manager")
		os.Exit(1)
	}

	// 注册 Controller
	if err = (&controller.WebAppReconciler{
		Client: mgr.GetClient(),
		Scheme: mgr.GetScheme(),
	}).SetupWithManager(mgr); err != nil {
		setupLog.Error(err, "unable to create controller", "controller", "WebApp")
		os.Exit(1)
	}

	// 注册 Webhook
	if enableWebhooks {
		if err = (&webappv1alpha1.WebApp{}).SetupWebhookWithManager(mgr); err != nil {
			setupLog.Error(err, "unable to create webhook", "webhook", "WebApp")
			os.Exit(1)
		}
	}

	// 健康检查
	if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
		setupLog.Error(err, "unable to set up health check")
		os.Exit(1)
	}
	if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
		setupLog.Error(err, "unable to set up ready check")
		os.Exit(1)
	}

	setupLog.Info("starting manager")
	if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
		setupLog.Error(err, "problem running manager")
		os.Exit(1)
	}
}

测试 Operator

测试 Operator 是开发中非常重要的一环。kubebuilder 提供了两种测试方式:

单元测试(使用 fake client)

// internal/controller/webapp_controller_test.go
package controller

import (
	"context"
	"testing"

	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/require"
	appsv1 "k8s.io/api/apps/v1"
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/types"
	clientgoscheme "k8s.io/client-go/kubernetes/scheme"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/client/fake"

	webappv1alpha1 "github.com/yourusername/webapp-operator/api/v1alpha1"
)

func newTestScheme() *runtime.Scheme {
	scheme := runtime.NewScheme()
	_ = clientgoscheme.AddToScheme(scheme)
	_ = webappv1alpha1.AddToScheme(scheme)
	return scheme
}

func TestReconcile_CreateDeployment(t *testing.T) {
	scheme := newTestScheme()

	// 创建测试用的 WebApp 资源
	webapp := &webappv1alpha1.WebApp{
		ObjectMeta: metav1.ObjectMeta{
			Name:      "test-webapp",
			Namespace: "default",
		},
		Spec: webappv1alpha1.WebAppSpec{
			Name:     "test-webapp",
			Image:    "nginx:latest",
			Replicas: 2,
			Port:     8080,
			Size:     webappv1alpha1.SmallSize,
		},
	}

	// 使用 fake client 模拟 Kubernetes API
	fakeClient := fake.NewClientBuilder().
		WithScheme(scheme).
		WithObjects(webapp).
		WithStatusSubresource(webapp).
		Build()

	reconciler := &WebAppReconciler{
		Client: fakeClient,
		Scheme: scheme,
	}

	// 执行 Reconcile
	req := ctrl.Request{
		NamespacedName: types.NamespacedName{
			Name:      "test-webapp",
			Namespace: "default",
		},
	}

	result, err := reconciler.Reconcile(context.Background(), req)
	require.NoError(t, err)
	assert.False(t, result.Requeue)

	// 验证 Deployment 是否被创建
	deployment := &appsv1.Deployment{}
	err = fakeClient.Get(context.Background(), req.NamespacedName, deployment)
	require.NoError(t, err)

	// 验证 Deployment 的属性
	assert.Equal(t, "test-webapp", deployment.Name)
	assert.Equal(t, int32(2), *deployment.Spec.Replicas)
	assert.Equal(t, "nginx:latest", deployment.Spec.Template.Spec.Containers[0].Image)
	assert.Equal(t, int32(8080), deployment.Spec.Template.Spec.Containers[0].Ports[0].ContainerPort)

	// 验证 OwnerReference
	assert.Len(t, deployment.OwnerReferences, 1)
	assert.Equal(t, "test-webapp", deployment.OwnerReferences[0].Name)
}

func TestReconcile_CreateService(t *testing.T) {
	scheme := newTestScheme()

	webapp := &webappv1alpha1.WebApp{
		ObjectMeta: metav1.ObjectMeta{
			Name:      "test-webapp",
			Namespace: "default",
		},
		Spec: webappv1alpha1.WebAppSpec{
			Name:     "test-webapp",
			Image:    "nginx:latest",
			Replicas: 1,
			Port:     8080,
		},
	}

	fakeClient := fake.NewClientBuilder().
		WithScheme(scheme).
		WithObjects(webapp).
		WithStatusSubresource(webapp).
		Build()

	reconciler := &WebAppReconciler{
		Client: fakeClient,
		Scheme: scheme,
	}

	req := ctrl.Request{
		NamespacedName: types.NamespacedName{
			Name:      "test-webapp",
			Namespace: "default",
		},
	}

	_, err := reconciler.Reconcile(context.Background(), req)
	require.NoError(t, err)

	// 验证 Service 是否被创建
	service := &corev1.Service{}
	err = fakeClient.Get(context.Background(), req.NamespacedName, service)
	require.NoError(t, err)

	assert.Equal(t, "test-webapp", service.Name)
	assert.Equal(t, int32(80), service.Spec.Ports[0].Port)
}

func TestReconcile_WebAppNotFound(t *testing.T) {
	scheme := newTestScheme()

	fakeClient := fake.NewClientBuilder().
		WithScheme(scheme).
		Build()

	reconciler := &WebAppReconciler{
		Client: fakeClient,
		Scheme: scheme,
	}

	req := ctrl.Request{
		NamespacedName: types.NamespacedName{
			Name:      "non-existent",
			Namespace: "default",
		},
	}

	result, err := reconciler.Reconcile(context.Background(), req)
	require.NoError(t, err)
	assert.False(t, result.Requeue) // WebApp 不存在时不应该 requeue
}

func TestReconcile_UpdateDeployment(t *testing.T) {
	scheme := newTestScheme()

	// 创建现有的 WebApp 和 Deployment
	webapp := &webappv1alpha1.WebApp{
		ObjectMeta: metav1.ObjectMeta{
			Name:      "test-webapp",
			Namespace: "default",
		},
		Spec: webappv1alpha1.WebAppSpec{
			Name:     "test-webapp",
			Image:    "nginx:1.19",
			Replicas: 1,
			Port:     8080,
		},
	}

	existingDeployment := &appsv1.Deployment{
		ObjectMeta: metav1.ObjectMeta{
			Name:      "test-webapp",
			Namespace: "default",
		},
		Spec: appsv1.DeploymentSpec{
			Selector: &metav1.LabelSelector{
				MatchLabels: map[string]string{"app": "test-webapp"},
			},
			Template: corev1.PodTemplateSpec{
				ObjectMeta: metav1.ObjectMeta{
					Labels: map[string]string{"app": "test-webapp"},
				},
				Spec: corev1.PodSpec{
					Containers: []corev1.Container{
						{
							Name:  "test-webapp",
							Image: "nginx:1.19",
						},
					},
				},
			},
		},
	}

	fakeClient := fake.NewClientBuilder().
		WithScheme(scheme).
		WithObjects(webapp, existingDeployment).
		WithStatusSubresource(webapp).
		Build()

	// 更新 WebApp 的镜像和副本数
	webapp.Spec.Image = "nginx:1.21"
	webapp.Spec.Replicas = 3
	_ = fakeClient.Update(context.Background(), webapp)

	reconciler := &WebAppReconciler{
		Client: fakeClient,
		Scheme: scheme,
	}

	req := ctrl.Request{
		NamespacedName: types.NamespacedName{
			Name:      "test-webapp",
			Namespace: "default",
		},
	}

	_, err := reconciler.Reconcile(context.Background(), req)
	require.NoError(t, err)

	// 验证 Deployment 是否被更新
	deployment := &appsv1.Deployment{}
	err = fakeClient.Get(context.Background(), req.NamespacedName, deployment)
	require.NoError(t, err)

	assert.Equal(t, "nginx:1.21", deployment.Spec.Template.Spec.Containers[0].Image)
	assert.Equal(t, int32(3), *deployment.Spec.Replicas)
}

集成测试(使用 envtest)

envtest 会启动一个真实的 Kubernetes API 服务器(etcd + kube-apiserver),用于更真实的集成测试:

// internal/controller/suite_test.go
package controller

import (
	"context"
	"path/filepath"
	"testing"

	. "github.com/onsi/ginkgo/v2"
	. "github.com/onsi/gomega"
	"k8s.io/client-go/kubernetes/scheme"
	"k8s.io/client-go/rest"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/envtest"
	logf "sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/log/zap"

	webappv1alpha1 "github.com/yourusername/webapp-operator/api/v1alpha1"
)

var (
	cfg       *rest.Config
	k8sClient client.Client
	testEnv   *envtest.Environment
	ctx       context.Context
	cancel    context.CancelFunc
)

func TestControllers(t *testing.T) {
	RegisterFailHandler(Fail)
	RunSpecs(t, "Controller Suite")
}

var _ = BeforeSuite(func() {
	logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))

	ctx, cancel = context.WithCancel(context.TODO())

	By("bootstrapping test environment")
	testEnv = &envtest.Environment{
		CRDDirectoryPaths:     []string{filepath.Join("..", "..", "config", "crd", "bases")},
		ErrorIfCRDPathMissing: true,
	}

	var err error
	cfg, err = testEnv.Start()
	Expect(err).NotTo(HaveOccurred())
	Expect(cfg).NotTo(BeNil())

	err = webappv1alpha1.AddToScheme(scheme.Scheme)
	Expect(err).NotTo(HaveOccurred())

	k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
	Expect(err).NotTo(HaveOccurred())
	Expect(k8sClient).NotTo(BeNil())

	// 启动 Controller Manager
	mgr, err := ctrl.NewManager(cfg, ctrl.Options{
		Scheme: scheme.Scheme,
	})
	Expect(err).ToNot(HaveOccurred())

	err = (&WebAppReconciler{
		Client: mgr.GetClient(),
		Scheme: mgr.GetScheme(),
	}).SetupWithManager(mgr)
	Expect(err).ToNot(HaveOccurred())

	go func() {
		defer GinkgoRecover()
		err = mgr.Start(ctx)
		Expect(err).ToNot(HaveOccurred())
	}()
})

var _ = AfterSuite(func() {
	cancel()
	By("tearing down the test environment")
	err := testEnv.Stop()
	Expect(err).NotTo(HaveOccurred())
})

集成测试用例:

// internal/controller/webapp_integration_test.go
package controller

import (
	"time"

	. "github.com/onsi/ginkgo/v2"
	. "github.com/onsi/gomega"
	appsv1 "k8s.io/api/apps/v1"
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/types"

	webappv1alpha1 "github.com/yourusername/webapp-operator/api/v1alpha1"
)

var _ = Describe("WebApp Controller", func() {
	const (
		timeout  = time.Second * 30
		interval = time.Millisecond * 250
	)

	Context("When creating a WebApp", func() {
		It("Should create Deployment and Service", func() {
			webapp := &webappv1alpha1.WebApp{
				ObjectMeta: metav1.ObjectMeta{
					Name:      "test-webapp",
					Namespace: "default",
				},
				Spec: webappv1alpha1.WebAppSpec{
					Name:     "test-webapp",
					Image:    "nginx:latest",
					Replicas: 2,
					Port:     8080,
					Size:     webappv1alpha1.MediumSize,
				},
			}

			Expect(k8sClient.Create(ctx, webapp)).Should(Succeed())

			// 等待 Deployment 被创建
			deployment := &appsv1.Deployment{}
			Eventually(func() error {
				return k8sClient.Get(ctx, types.NamespacedName{
					Name:      "test-webapp",
					Namespace: "default",
				}, deployment)
			}, timeout, interval).Should(Succeed())

			// 验证 Deployment 属性
			Expect(*deployment.Spec.Replicas).To(Equal(int32(2)))
			Expect(deployment.Spec.Template.Spec.Containers[0].Image).To(Equal("nginx:latest"))

			// 等待 Service 被创建
			service := &corev1.Service{}
			Eventually(func() error {
				return k8sClient.Get(ctx, types.NamespacedName{
					Name:      "test-webapp",
					Namespace: "default",
				}, service)
			}, timeout, interval).Should(Succeed())

			Expect(service.Spec.Ports[0].Port).To(Equal(int32(80)))
		})
	})

	Context("When updating a WebApp", func() {
		It("Should update the Deployment", func() {
			webapp := &webappv1alpha1.WebApp{}
			Expect(k8sClient.Get(ctx, types.NamespacedName{
				Name:      "test-webapp",
				Namespace: "default",
			}, webapp)).Should(Succeed())

			// 更新副本数
			webapp.Spec.Replicas = 5
			Expect(k8sClient.Update(ctx, webapp)).Should(Succeed())

			// 验证 Deployment 是否被更新
			deployment := &appsv1.Deployment{}
			Eventually(func() int32 {
				err := k8sClient.Get(ctx, types.NamespacedName{
					Name:      "test-webapp",
					Namespace: "default",
				}, deployment)
				if err != nil {
					return 0
				}
				return *deployment.Spec.Replicas
			}, timeout, interval).Should(Equal(int32(5)))
		})
	})

	Context("When deleting a WebApp", func() {
		It("Should cleanup all resources", func() {
			webapp := &webappv1alpha1.WebApp{}
			Expect(k8sClient.Get(ctx, types.NamespacedName{
				Name:      "test-webapp",
				Namespace: "default",
			}, webapp)).Should(Succeed())

			Expect(k8sClient.Delete(ctx, webapp)).Should(Succeed())

			// 验证 Deployment 是否被删除(因为有 OwnerReference)
			deployment := &appsv1.Deployment{}
			Eventually(func() error {
				return k8sClient.Get(ctx, types.NamespacedName{
					Name:      "test-webapp",
					Namespace: "default",
				}, deployment)
			}, timeout, interval).ShouldNot(Succeed())
		})
	})
})

部署 Operator

kubebuilder 提供了一键部署的方式:

# 安装 CRD 到集群
make install

# 本地运行(开发调试)
make run

# 构建镜像并部署到集群
make docker-build docker-push IMG=yourregistry/webapp-operator:v0.1.0
make deploy IMG=yourregistry/webapp-operator:v0.1.0

测试使用 WebApp CR:

# config/samples/webapp_v1alpha1_webapp.yaml
apiVersion: webapp.example.com/v1alpha1
kind: WebApp
metadata:
  name: my-webapp
  namespace: default
spec:
  name: my-webapp
  image: nginx:1.21
  replicas: 3
  port: 8080
  size: medium
  env:
    - name: ENV
      value: production
  database:
    host: postgres.default.svc.cluster.local
    port: 5432
    name: mydb
    credentialsSecret: db-credentials
  autoScaling:
    minReplicas: 2
    maxReplicas: 10
    targetCPUUtilization: 75
  ingress:
    host: my-webapp.example.com
    tls: true
    tlsSecret: my-webapp-tls

应用 CR:

kubectl apply -f config/samples/webapp_v1alpha1_webapp.yaml

# 查看 WebApp 状态
kubectl get webapps
# NAME        PHASE     REPLICAS   AGE
# my-webapp   Running   3          5m

# 查看详细状态
kubectl describe webapp my-webapp

# 查看生成的资源
kubectl get deployments,services,ingresses

Operator 高级模式

1. 多版本 API 支持

随着 Operator 的发展,你可能需要支持多个 API 版本:

// api/v1beta1/webapp_types.go(新版本)
package v1beta1

type WebAppSpec struct {
	// 新增字段
	IngressClassName string `json:"ingressClassName,omitempty"`
	
	// 保留旧版本字段
	Name     string `json:"name"`
	Image    string `json:"image"`
	Replicas int32  `json:"replicas"`
}

// 提供版本转换 Webhook
func (r *WebApp) ConvertTo(dst conversion.Hub) error {
	// v1alpha1 -> v1beta1 转换逻辑
	target := dst.(*v1beta1.WebApp)
	target.Spec.Name = r.Spec.Name
	target.Spec.Image = r.Spec.Image
	target.Spec.Replicas = r.Spec.Replicas
	return nil
}

func (r *WebApp) ConvertFrom(src conversion.Hub) error {
	// v1beta1 -> v1alpha1 转换逻辑
	source := src.(*v1beta1.WebApp)
	r.Spec.Name = source.Spec.Name
	r.Spec.Image = source.Spec.Image
	r.Spec.Replicas = source.Spec.Replicas
	return nil
}

2. 多集群管理

如果你的 Operator 需要管理跨集群资源:

import (
	"sigs.k8s.io/controller-runtime/pkg/client"
	"k8s.io/client-go/tools/clientcmd"
)

// 创建多集群客户端
func getRemoteClient(kubeconfigPath string) (client.Client, error) {
	config, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
	if err != nil {
		return nil, err
	}

	scheme := runtime.NewScheme()
	_ = clientgoscheme.AddToScheme(scheme)
	_ = webappv1alpha1.AddToScheme(scheme)

	return client.New(config, client.Options{Scheme: scheme})
}

// 在 Controller 中管理多个集群
type MultiClusterReconciler struct {
	client.Client
	Scheme       *runtime.Scheme
	RemoteClients map[string]client.Client // cluster-name -> client
}

func (r *MultiClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	webapp := &webappv1alpha1.WebApp{}
	if err := r.Get(ctx, req.NamespacedName, webapp); err != nil {
		return ctrl.Result{}, err
	}

	// 根据 WebApp 的注解决定部署到哪个集群
	targetCluster := webapp.Annotations["target-cluster"]
	remoteClient, ok := r.RemoteClients[targetCluster]
	if !ok {
		return ctrl.Result{}, fmt.Errorf("cluster %s not found", targetCluster)
	}

	// 在远程集群创建资源
	deployment := buildDeployment(webapp)
	if err := remoteClient.Create(ctx, deployment); err != nil {
		return ctrl.Result{}, err
	}

	return ctrl.Result{}, nil
}

3. Leader Election(领导者选举)

在多副本部署时,只有一个 Controller 实例应该执行调谐:

// cmd/main.go
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
	Scheme:                 scheme,
	LeaderElection:         true,  // 启用领导者选举
	LeaderElectionID:       "webapp-operator-leader",
	LeaderElectionNamespace: "webapp-operator-system",
	LeaseDuration:          ptr.To(15 * time.Second),
	RenewDeadline:          ptr.To(10 * time.Second),
	RetryPeriod:            ptr.To(2 * time.Second),
})

4. 资源配额与限制

为 Operator 创建的资源设置默认配额:

func (r *WebAppReconciler) mutateDeployment(deployment *appsv1.Deployment, webapp *webappv1alpha1.WebApp) {
	// 根据用户选择的 size 设置资源配额
	resources := corev1.ResourceRequirements{
		Requests: corev1.ResourceList{
			corev1.ResourceCPU:    resource.MustParse("100m"),
			corev1.ResourceMemory: resource.MustParse("128Mi"),
		},
		Limits: corev1.ResourceList{
			corev1.ResourceCPU:    resource.MustParse("500m"),
			corev1.ResourceMemory: resource.MustParse("512Mi"),
		},
	}

	// 允许用户覆盖(但不能超过限制)
	if webapp.Spec.Resources != nil {
		if webapp.Spec.Resources.Requests != nil {
			resources.Requests = webapp.Spec.Resources.Requests
		}
		if webapp.Spec.Resources.Limits != nil {
			resources.Limits = webapp.Spec.Resources.Limits
		}
	}

	deployment.Spec.Template.Spec.Containers[0].Resources = resources
}

5. 优雅的错误处理与重试

func (r *WebAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	logger := log.FromContext(ctx)

	webapp := &webappv1alpha1.WebApp{}
	if err := r.Get(ctx, req.NamespacedName, webapp); err != nil {
		if errors.IsNotFound(err) {
			// 资源被删除,正常情况
			return ctrl.Result{}, nil
		}
		// 临时错误,稍后重试
		logger.Error(err, "Failed to get WebApp")
		return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
	}

	// 尝试调谐,失败时使用指数退避
	if err := r.reconcileDeployment(ctx, webapp); err != nil {
		logger.Error(err, "Failed to reconcile Deployment")
		
		// 更新状态为失败
		r.setCondition(ctx, webapp, "Ready", metav1.ConditionFalse,
			"ReconcileFailed", err.Error())

		// 根据错误类型决定重试策略
		if isTransientError(err) {
			// 临时错误:快速重试
			return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
		}
		// 永久错误:不自动重试,等待人工干预
		return ctrl.Result{}, nil
	}

	return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}

func isTransientError(err error) bool {
	// 判断是否为临时性错误(网络超时、资源不足等)
	if errors.Is(err, context.DeadlineExceeded) {
		return true
	}
	if k8serrors.IsServerTimeout(err) {
		return true
	}
	return false
}

实战案例:数据库 Operator

让我们看一个更真实的场景——构建一个管理 PostgreSQL 数据库的 Operator。这个 Operator 会自动创建数据库实例、管理用户权限、执行数据库迁移。

Database CRD 定义

// api/v1alpha1/database_types.go
package v1alpha1

import (
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// DatabaseSpec 数据库规格
type DatabaseSpec struct {
	//+kubebuilder:validation:Required
	//+kubebuilder:validation:Enum=postgres;mysql
	// 数据库引擎
	Engine string `json:"engine"`

	//+kubebuilder:validation:Required
	//+kubebuilder:validation:Pattern=`^\d+\.\d+$`
	// 版本号
	Version string `json:"version"`

	//+kubebuilder:validation:Required
	//+kubebuilder:validation:MinLength=1
	// 数据库名称
	DatabaseName string `json:"databaseName"`

	//+kubebuilder:validation:Minimum=1
	//+kubebuilder:validation:Maximum=100
	//+kubebuilder:default=1
	// 存储大小 (GB)
	StorageGB int32 `json:"storageGB,omitempty"`

	//+kubebuilder:validation:Enum=small;medium;large
	//+kubebuilder:default=small
	// 实例规格
	InstanceSize string `json:"instanceSize,omitempty"`

	//+kubebuilder:default=true
	// 是否启用高可用
	HighAvailability bool `json:"highAvailability,omitempty"`

	// 数据库用户列表
	Users []DatabaseUser `json:"users,omitempty"`

	// 备份配置
	Backup *BackupConfig `json:"backup,omitempty"`
}

// DatabaseUser 数据库用户
type DatabaseUser struct {
	//+kubebuilder:validation:Required
	Username string `json:"username"`

	//+kubebuilder:validation:Enum=readonly;readwrite;admin
	//+kubebuilder:default=readonly
	Role string `json:"role,omitempty"`

	// 密码 Secret 引用
	PasswordSecret string `json:"passwordSecret"`
}

// BackupConfig 备份配置
type BackupConfig struct {
	//+kubebuilder:default=true
	Enabled bool `json:"enabled,omitempty"`

	//+kubebuilder:validation:Pattern=`^(\d+h|\d+d)$`
	//+kubebuilder:default="24h"
	// 备份间隔
	Interval string `json:"interval,omitempty"`

	//+kubebuilder:validation:Minimum=1
	//+kubebuilder:validation:Maximum=30
	//+kubebuilder:default=7
	// 保留天数
	RetentionDays int32 `json:"retentionDays,omitempty"`

	// 备份存储位置(S3 路径)
	StoragePath string `json:"storagePath,omitempty"`
}

// DatabaseStatus 数据库状态
type DatabaseStatus struct {
	//+kubebuilder:validation:Enum=Pending;Provisioning;Running;Failed;Deleting
	Phase string `json:"phase,omitempty"`

	// 连接地址
	Endpoint string `json:"endpoint,omitempty"`

	// 端口
	Port int32 `json:"port,omitempty"`

	// 实际使用的存储
	StorageUsed string `json:"storageUsed,omitempty"`

	// 最后备份时间
	LastBackup *metav1.Time `json:"lastBackup,omitempty"`

	// 条件
	Conditions []metav1.Condition `json:"conditions,omitempty"`
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:printcolumn:name="Engine",type=string,JSONPath=`.spec.engine`
//+kubebuilder:printcolumn:name="Version",type=string,JSONPath=`.spec.version`
//+kubebuilder:printcolumn:name="Phase",type=string,JSONPath=`.status.phase`
//+kubebuilder:printcolumn:name="Endpoint",type=string,JSONPath=`.status.endpoint`
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
//+kubebuilder:resource:shortName=db

// Database 是 Schema 定义
type Database struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   DatabaseSpec   `json:"spec,omitempty"`
	Status DatabaseStatus `json:"status,omitempty"`
}

//+kubebuilder:object:root=true

// DatabaseList 是 Database 列表
type DatabaseList struct {
	metav1.TypeMeta `json:",inline"`
	metav1.ListMeta `json:"metadata,omitempty"`
	Items           []Database `json:"items"`
}

func init() {
	SchemeBuilder.Register(&Database{}, &DatabaseList{})
}

Database Controller 核心逻辑

// internal/controller/database_controller.go
package controller

import (
	"context"
	"fmt"
	"time"

	appsv1 "k8s.io/api/apps/v1"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/api/resource"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
	"sigs.k8s.io/controller-runtime/pkg/log"

	dbv1alpha1 "github.com/yourusername/db-operator/api/v1alpha1"
)

const dbFinalizer = "db.example.com/finalizer"

// DatabaseReconciler 调谐 Database 资源
type DatabaseReconciler struct {
	client.Client
	Scheme        *runtime.Scheme
	CloudProvider CloudProvider // 云厂商接口
}

// CloudProvider 云厂商抽象接口
type CloudProvider interface {
	CreateInstance(ctx context.Context, spec *dbv1alpha1.DatabaseSpec) (*InstanceInfo, error)
	DeleteInstance(ctx context.Context, instanceID string) error
	GetInstanceStatus(ctx context.Context, instanceID string) (string, error)
	CreateBackup(ctx context.Context, instanceID, storagePath string) error
}

type InstanceInfo struct {
	Endpoint string
	Port     int32
	ID       string
}

func (r *DatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	logger := log.FromContext(ctx)
	logger.Info("Reconciling Database")

	// 获取 Database 资源
	db := &dbv1alpha1.Database{}
	if err := r.Get(ctx, req.NamespacedName, db); err != nil {
		if errors.IsNotFound(err) {
			return ctrl.Result{}, nil
		}
		return ctrl.Result{}, err
	}

	// 处理删除
	if !db.ObjectMeta.DeletionTimestamp.IsZero() {
		return r.handleDeletion(ctx, db)
	}

	// 添加 Finalizer
	if !controllerutil.ContainsFinalizer(db, dbFinalizer) {
		controllerutil.AddFinalizer(db, dbFinalizer)
		if err := r.Update(ctx, db); err != nil {
			return ctrl.Result{}, err
		}
	}

	// 更新状态为 Provisioning
	if db.Status.Phase == "" {
		db.Status.Phase = "Provisioning"
		if err := r.Status().Update(ctx, db); err != nil {
			return ctrl.Result{}, err
		}
	}

	// 检查是否已有实例 ID
	instanceID := db.Annotations["instance-id"]
	if instanceID == "" {
		// 创建新实例
		info, err := r.CloudProvider.CreateInstance(ctx, &db.Spec)
		if err != nil {
			logger.Error(err, "Failed to create database instance")
			db.Status.Phase = "Failed"
			r.Status().Update(ctx, db)
			return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
		}

		// 保存实例信息
		if db.Annotations == nil {
			db.Annotations = make(map[string]string)
		}
		db.Annotations["instance-id"] = info.ID
		db.Status.Endpoint = info.Endpoint
		db.Status.Port = info.Port

		if err := r.Update(ctx, db); err != nil {
			return ctrl.Result{}, err
		}
	}

	// 检查实例状态
	status, err := r.CloudProvider.GetInstanceStatus(ctx, instanceID)
	if err != nil {
		logger.Error(err, "Failed to get instance status")
		return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
	}

	if status == "running" {
		db.Status.Phase = "Running"
	} else {
		db.Status.Phase = "Provisioning"
	}

	// 创建连接 Secret
	if err := r.reconcileConnectionSecret(ctx, db); err != nil {
		logger.Error(err, "Failed to reconcile connection secret")
		return ctrl.Result{}, err
	}

	// 处理备份
	if db.Spec.Backup != nil && db.Spec.Backup.Enabled {
		if err := r.handleBackup(ctx, db, instanceID); err != nil {
			logger.Error(err, "Failed to handle backup")
			// 备份失败不影响主流程
		}
	}

	// 更新状态
	if err := r.Status().Update(ctx, db); err != nil {
		return ctrl.Result{}, err
	}

	return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil
}

// reconcileConnectionSecret 创建数据库连接 Secret
func (r *DatabaseReconciler) reconcileConnectionSecret(ctx context.Context, db *dbv1alpha1.Database) error {
	secret := &corev1.Secret{
		ObjectMeta: metav1.ObjectMeta{
			Name:      db.Name + "-connection",
			Namespace: db.Namespace,
		},
	}

	op, err := controllerutil.CreateOrUpdate(ctx, r.Client, secret, func() error {
		secret.StringData = map[string]string{
			"host":     db.Status.Endpoint,
			"port":     fmt.Sprintf("%d", db.Status.Port),
			"database": db.Spec.DatabaseName,
			"engine":   db.Spec.Engine,
		}

		// 添加 JDBC 和连接字符串
		if db.Spec.Engine == "postgres" {
			secret.StringData["url"] = fmt.Sprintf(
				"postgresql://$(USERNAME):$(PASSWORD)@%s:%d/%s",
				db.Status.Endpoint, db.Status.Port, db.Spec.DatabaseName,
			)
		}

		return controllerutil.SetControllerReference(db, secret, r.Scheme)
	})

	if err != nil {
		return err
	}

	log.FromContext(ctx).Info("Connection secret reconciled", "operation", op)
	return nil
}

// handleBackup 处理备份
func (r *DatabaseReconciler) handleBackup(ctx context.Context, db *dbv1alpha1.Database, instanceID string) error {
	logger := log.FromContext(ctx)

	// 检查是否需要备份
	if db.Status.LastBackup != nil {
		interval, _ := time.ParseDuration(db.Spec.Backup.Interval)
		if time.Since(db.Status.LastBackup.Time) < interval {
			return nil // 还不到备份时间
		}
	}

	// 执行备份
	if err := r.CloudProvider.CreateBackup(ctx, instanceID, db.Spec.Backup.StoragePath); err != nil {
		return err
	}

	// 更新最后备份时间
	now := metav1.Now()
	db.Status.LastBackup = &now

	logger.Info("Backup completed successfully")
	return nil
}

// handleDeletion 处理删除
func (r *DatabaseReconciler) handleDeletion(ctx context.Context, db *dbv1alpha1.Database) (ctrl.Result, error) {
	logger := log.FromContext(ctx)

	if controllerutil.ContainsFinalizer(db, dbFinalizer) {
		instanceID := db.Annotations["instance-id"]
		if instanceID != "" {
			logger.Info("Deleting database instance", "instanceID", instanceID)
			if err := r.CloudProvider.DeleteInstance(ctx, instanceID); err != nil {
				logger.Error(err, "Failed to delete instance")
				return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
			}
		}

		controllerutil.RemoveFinalizer(db, dbFinalizer)
		if err := r.Update(ctx, db); err != nil {
			return ctrl.Result{}, err
		}
	}

	return ctrl.Result{}, nil
}

使用示例:

# database-sample.yaml
apiVersion: db.example.com/v1alpha1
kind: Database
metadata:
  name: my-app-db
spec:
  engine: postgres
  version: "15.3"
  databaseName: myapp
  storageGB: 10
  instanceSize: medium
  highAvailability: true
  users:
    - username: app_user
      role: readwrite
      passwordSecret: db-app-password
    - username: readonly_user
      role: readonly
      passwordSecret: db-readonly-password
  backup:
    enabled: true
    interval: "6h"
    retentionDays: 14
    storagePath: s3://my-backups/databases/my-app-db

应用 Database CR

# 创建数据库连接密码 Secret
kubectl create secret generic db-app-password \
  --from-literal=password=super-secret-password

kubectl create secret generic db-readonly-password \
  --from-literal=password=readonly-password

# 创建 Database 资源
kubectl apply -f database-sample.yaml

# 查看 Database 状态
kubectl get databases
# NAME        ENGINE     VERSION   PHASE     ENDPOINT                              AGE
# my-app-db   postgres   15.3      Running   my-app-db.abc123.us-west-2.rds.amazonaws.com   5m

# 查看详细信息
kubectl describe database my-app-db

# 查看自动创建的连接 Secret
kubectl get secret my-app-db-connection -o yaml

测试数据库连接

# 使用连接 Secret 测试数据库
kubectl run psql-test --rm -it --image postgres:15 -- \
  psql "postgresql://app_user:$(kubectl get secret db-app-password -o jsonpath='{.data.password}' | base64 -d)@my-app-db.abc123.us-west-2.rds.amazonaws.com:5432/myapp"

# 或者创建一个 Pod 来测试
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Pod
metadata:
  name: db-client
spec:
  containers:
  - name: psql
    image: postgres:15
    command: ["sleep", "infinity"]
    env:
    - name: PGPASSWORD
      valueFrom:
        secretKeyRef:
          name: db-app-password
          key: password
    - name: PGHOST
      valueFrom:
        secretKeyRef:
          name: my-app-db-connection
          key: host
    - name: PGDATABASE
      valueFrom:
        secretKeyRef:
          name: my-app-db-connection
          key: database
EOF

# 进入 Pod 测试
kubectl exec -it db-client -- psql -U app_user -c "SELECT version();"

Operator 设计模式与最佳实践

1. 幂等性设计

Operator 的 Reconcile 函数必须是幂等的——无论执行多少次,结果都应该相同:

// ✅ 好的做法:使用 CreateOrUpdate
func (r *WebAppReconciler) reconcileDeployment(ctx context.Context, webapp *webappv1alpha1.WebApp) error {
	deployment := &appsv1.Deployment{
		ObjectMeta: metav1.ObjectMeta{
			Name:      webapp.Name,
			Namespace: webapp.Namespace,
		},
	}

	_, err := controllerutil.CreateOrUpdate(ctx, r.Client, deployment, func() error {
		r.mutateDeployment(deployment, webapp)
		return controllerutil.SetControllerReference(webapp, deployment, r.Scheme)
	})

	return err
}

// ❌ 不好的做法:直接 Create(重复调用会报错)
func (r *WebAppReconciler) reconcileDeploymentBad(ctx context.Context, webapp *webappv1alpha1.WebApp) error {
	deployment := buildDeployment(webapp)
	return r.Create(ctx, deployment) // 如果已存在会失败
}

2. 状态管理策略

合理管理 Status 子资源,避免频繁更新导致冲突:

// 使用 Patch 而不是 Update 来更新 Status
func (r *WebAppReconciler) updateStatus(ctx context.Context, webapp *webappv1alpha1.WebApp, newStatus webappv1alpha1.WebAppStatus) error {
	patch := client.MergeFrom(webapp.DeepCopy())
	webapp.Status = newStatus
	return r.Status().Patch(ctx, webapp, patch)
}

// 或者使用 Server-Side Apply
func (r *WebAppReconciler) updateStatusWithSSA(ctx context.Context, webapp *webappv1alpha1.WebApp) error {
	webapp.Status.Phase = webappv1alpha1.PhaseRunning
	return r.Status().Update(ctx, webapp, client.FieldOwner("webapp-operator"))
}

3. 错误处理与重试策略

func (r *WebAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	webapp := &webappv1alpha1.WebApp{}
	if err := r.Get(ctx, req.NamespacedName, webapp); err != nil {
		if errors.IsNotFound(err) {
			// 资源被删除,不需要重试
			return ctrl.Result{}, nil
		}
		// API 服务器错误,使用指数退避重试
		return ctrl.Result{}, err
	}

	// 业务错误:更新 Status,不重试
	if err := r.reconcileResources(ctx, webapp); err != nil {
		log.FromContext(ctx).Error(err, "Failed to reconcile")
		
		// 更新 Status 为 Failed
		webapp.Status.Phase = webappv1alpha1.PhaseFailed
		r.Status().Update(ctx, webapp)

		// 返回 nil error,避免无限重试
		// 但可以设置 RequeueAfter 来定期检查
		return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil
	}

	// 成功:定期调谐以保持状态一致
	return ctrl.Result{RequeueAfter: 10 * time.Minute}, nil
}

调试与故障排查

开发 Operator 时,调试是一项日常任务。以下是几种常见的调试方法:

本地运行调试

# 安装 CRD 到本地集群
make install

# 本地运行 Operator(可以看到实时日志)
make run ENABLE_WEBHOOKS=false

# 在另一个终端创建测试资源
kubectl apply -f config/samples/webapp_v1alpha1_webapp.yaml

# 观察 Operator 的输出
# 2025-06-20T10:45:00+08:00  INFO  Reconciling WebApp  {"namespace": "default", "name": "my-webapp"}
# 2025-06-20T10:45:00+08:00  INFO  Deployment reconciled  {"operation": "created"}
# 2025-06-20T10:45:00+08:00  INFO  Service reconciled  {"operation": "created"}

使用 Telepresence 进行远程调试

当问题只能在集群环境中复现时,Telepresence 能让你用本地代码替换集群中运行的 Operator:

# 安装 Telepresence
brew install telepresence

# 连接到集群
telepresence connect

# 拦截 Operator 流量
telepresence intercept webapp-operator-controller-manager \
  --port 8080:8080 \
  --env-file /tmp/operator.env

# 现在本地运行的 Operator 会接收集群中的请求
go run cmd/main.go

常见问题排查清单

问题可能原因排查方法
Reconcile 没有被触发RBAC 权限不足检查 Operator 的 ServiceAccount 权限
Deployment 创建失败资源配额不足查看命名空间的 ResourceQuota
Status 不更新使用了错误的 Client使用 r.Status().Update() 而不是 r.Update()
Webhook 报错证书配置问题检查 cert-manager 是否正常运行
Finalizer 卡住清理逻辑超时增加超时时间或异步执行清理

查看 Operator 日志

# 查看 Operator Pod 日志
kubectl logs -n webapp-operator-system \
  -l control-plane=controller-manager \
  --tail=100 -f

# 查看特定资源的 Events
kubectl describe webapp my-webapp
# Events:
#   Type    Reason            Age   From             Message
#   ----    ------            ----  ----             -------
#   Normal  Reconciled        5m    webapp-operator  Successfully reconciled WebApp
#   Normal  DeploymentReady   4m    webapp-operator  Deployment is ready

# 查看 RBAC 权限
kubectl auth can-i --list --as=system:serviceaccount:webapp-operator-system:webapp-operator-controller-manager

监控 Operator

Operator 本身也需要监控。controller-runtime 默认集成了 Prometheus 指标:

// 自定义指标
import (
	"github.com/prometheus/client_golang/prometheus"
	"sigs.k8s.io/controller-runtime/pkg/metrics"
)

var (
	reconcileTotal = prometheus.NewCounterVec(
		prometheus.CounterOpts{
			Name: "webapp_reconcile_total",
			Help: "Total number of reconciliations",
		},
		[]string{"namespace", "name", "result"},
	)

	reconcileDuration = prometheus.NewHistogramVec(
		prometheus.HistogramOpts{
			Name:    "webapp_reconcile_duration_seconds",
			Help:    "Duration of reconciliation in seconds",
			Buckets: prometheus.DefBuckets,
		},
		[]string{"namespace", "name"},
	)
)

func init() {
	metrics.Registry.MustRegister(reconcileTotal, reconcileDuration)
}

性能优化建议

当 Operator 管理大量资源时,需要考虑性能优化。以下是一些关键的优化策略:

使用缓存减少 API 调用:通过配置缓存策略,可以显著减少对 Kubernetes API Server 的调用次数。你可以选择只缓存特定命名空间的资源,或者使用标签选择器过滤不需要的资源。

限制并发调谐数量:默认情况下,Controller 会并发处理所有需要调谐的资源。当资源数量很大时,这可能导致 API Server 压力过大。通过设置 MaxConcurrentReconciles 参数,可以控制并发调谐的数量,避免系统过载。

使用 RateLimiter 控制调用频率:Kubernetes 提供了工作队列的速率限制器,可以防止在短时间内对同一资源进行过多的 API 调用。使用指数退避策略(Exponential Backoff),可以在遇到错误时自动增加重试间隔,避免雪崩效应。

优化 Reconcile 逻辑:在 Reconcile 函数中,尽量减少不必要的 API 调用。例如,先检查资源是否需要更新,再执行 Update 操作;使用 Server-Side Apply 代替传统的 Get-Update 模式;批量处理多个资源的更新操作。

监控 Operator 性能:使用 controller-runtime 内置的 Prometheus 指标,监控 Reconcile 的执行时间、队列深度、错误率等关键指标。当发现性能瓶颈时,及时调整并发数、缓存策略或优化代码逻辑。

记住,性能优化是一个持续的过程。随着资源数量的增长,你需要不断调整和优化 Operator 的配置,确保它能够稳定、高效地运行。

总结

恭喜你完成了 Kubernetes Operator 的学习之旅!让我们回顾核心要点:

  1. CRD 定义:使用 kubebuilder 标记定义资源的 Schema,自动生成校验规则
  2. Reconciliation Loop:不断检查期望状态和实际状态的差异,执行修复
  3. OwnerReference:建立资源之间的父子关系,实现级联删除
  4. Finalizer:在资源删除前执行清理逻辑
  5. Status Management:通过 Status 和 Condition 反馈资源的健康状态
  6. Webhook:实现 Mutating(注入默认值)和 Validating(校验)逻辑
  7. 测试策略:单元测试用 fake client,集成测试用 envtest
  8. 部署方式:支持本地运行和集群部署

Operator 模式是 Kubernetes 生态的核心扩展机制。掌握了它,你就能让 Kubernetes “认识"任何应用,实现真正的 GitOps 和自动化运维。

希望这篇文章能帮助你掌握 Kubernetes Operator 的开发方法。下一篇,我们将深入领域驱动设计(DDD),看看如何用 Go 构建复杂的业务系统!

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页