Go 与 Kubernetes:构建自定义 Operator
你有没有遇到过这样的场景:公司里有一套复杂的中间件,每次部署都需要手动配置一大堆 YAML 文件,稍有不慎就会配错。你心里想,如果 Kubernetes 能"认识"这个中间件,自动帮我管理该多好?
这就是 Operator 的价值所在。Operator 是 Kubernetes 的扩展机制,它让你把运维知识编码成软件,让 Kubernetes 自动管理复杂的应用和自定义资源。今天,我们就来深入探讨如何用 Go 和 kubebuilder 构建一个生产级的 Kubernetes Operator。
什么是 Operator?
在深入代码之前,让我们先理解 Operator 的核心概念:
- Custom Resource(CR):Kubernetes 允许你定义自己的资源类型,就像内置的 Pod、Deployment 一样
- Custom Resource Definition(CRD):定义 CR 的 schema 和结构
- Controller:监听 CR 的变化,执行 Reconciliation(调谐)逻辑,确保系统状态符合期望
- Operator = CRD + Controller + 领域知识
用一个类比来说:Kubernetes 内置的 Controller 就像是"通用管家",知道如何管理 Pod 和 Service;而 Operator 就像是你自己请的"专业管家",知道如何管理你的特定应用。
项目准备:使用 kubebuilder
kubebuilder 是 Kubernetes SIG 推荐的 Operator 开发框架,它能自动生成大量样板代码:
# 安装 kubebuilder
curl -L -o kubebuilder https://go.kubebuilder.io/dl/latest/$(go env GOOS)/$(go env GOARCH)
chmod +x kubebuilder && mv kubebuilder /usr/local/bin/
# 创建项目
mkdir webapp-operator && cd webapp-operator
go mod init github.com/yourusername/webapp-operator
# 初始化 kubebuilder 项目
kubebuilder init --domain example.com --repo github.com/yourusername/webapp-operator
# 创建 API(CRD + Controller)
kubebuilder create api --group webapp --version v1alpha1 --kind WebApp --resource --controller
执行完毕后,你会看到这样的项目结构:
webapp-operator/
├── api/
│ └── v1alpha1/
│ ├── groupversion_info.go
│ ├── webapp_types.go # CRD 定义
│ └── zz_generated.deepcopy.go # 自动生成的 DeepCopy
├── cmd/
│ └── main.go # 控制器入口
├── config/
│ ├── crd/ # CRD YAML
│ ├── rbac/ # RBAC 配置
│ └── manager/ # Deployment 配置
├── internal/
│ └── controller/
│ └── webapp_controller.go # 控制器逻辑
└── Makefile
定义 CRD(Custom Resource Definition)
CRD 是 Operator 的灵魂。它定义了用户可以创建什么样的资源。让我们设计一个 WebApp CRD:
// api/v1alpha1/webapp_types.go
package v1alpha1
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// WebAppSpec 定义期望状态
type WebAppSpec struct {
//+kubebuilder:validation:Required
//+kubebuilder:validation:MinLength=1
// 应用名称
Name string `json:"name"`
//+kubebuilder:validation:Required
//+kubebuilder:validation:Pattern=`^[a-z0-9.-]+(/[a-z0-9.-]+)*(:[a-z0-9.-]+)?$`
// 镜像地址
Image string `json:"image"`
//+kubebuilder:validation:Minimum=1
//+kubebuilder:validation:Maximum=100
//+kubebuilder:default=1
// 副本数
Replicas int32 `json:"replicas,omitempty"`
//+kubebuilder:validation:Minimum=1
//+kubebuilder:validation:Maximum=65535
//+kubebuilder:default=8080
// 服务端口
Port int32 `json:"port,omitempty"`
//+kubebuilder:validation:Enum=small;medium;large
//+kubebuilder:default=small
// 资源规格
Size ResourceSize `json:"size,omitempty"`
// 环境变量
Env []corev1.EnvVar `json:"env,omitempty"`
// 数据库配置
Database *DatabaseConfig `json:"database,omitempty"`
// 自动扩缩配置
AutoScaling *AutoScalingConfig `json:"autoScaling,omitempty"`
// Ingress 配置
Ingress *IngressConfig `json:"ingress,omitempty"`
}
// ResourceSize 资源规格
//+kubebuilder:validation:Enum=small;medium;large
type ResourceSize string
const (
SmallSize ResourceSize = "small"
MediumSize ResourceSize = "medium"
LargeSize ResourceSize = "large"
)
// DatabaseConfig 数据库配置
type DatabaseConfig struct {
//+kubebuilder:validation:Required
Host string `json:"host"`
//+kubebuilder:validation:Minimum=1
//+kubebuilder:validation:Maximum=65535
//+kubebuilder:default=5432
Port int32 `json:"port,omitempty"`
//+kubebuilder:validation:Required
Name string `json:"name"`
//+kubebuilder:validation:Required
// Secret 引用(不要明文存密码)
CredentialsSecret string `json:"credentialsSecret"`
}
// AutoScalingConfig 自动扩缩配置
type AutoScalingConfig struct {
//+kubebuilder:validation:Minimum=1
MinReplicas int32 `json:"minReplicas"`
//+kubebuilder:validation:Minimum=1
MaxReplicas int32 `json:"maxReplicas"`
//+kubebuilder:validation:Minimum=1
//+kubebuilder:validation:Maximum=100
TargetCPUUtilization int32 `json:"targetCPUUtilization,omitempty"`
}
// IngressConfig Ingress 配置
type IngressConfig struct {
//+kubebuilder:validation:Required
Host string `json:"host"`
//+kubebuilder:default=false
TLS bool `json:"tls,omitempty"`
// TLS Secret 名称
TLSSecret string `json:"tlsSecret,omitempty"`
}
// WebAppStatus 定义观测状态
type WebAppStatus struct {
// 当前可用的副本数
AvailableReplicas int32 `json:"availableReplicas"`
// 当前状态
//+kubebuilder:validation:Enum=Pending;Running;Failed;Updating
Phase WebAppPhase `json:"phase,omitempty"`
// 服务地址
ServiceURL string `json:"serviceURL,omitempty"`
// 最近的部署时间
LastDeployed *metav1.Time `json:"lastDeployed,omitempty"`
// 状态条件
Conditions []metav1.Condition `json:"conditions,omitempty"`
}
// WebAppPhase 应用阶段
type WebAppPhase string
const (
PhasePending WebAppPhase = "Pending"
PhaseRunning WebAppPhase = "Running"
PhaseFailed WebAppPhase = "Failed"
PhaseUpdating WebAppPhase = "Updating"
)
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:printcolumn:name="Phase",type=string,JSONPath=`.status.phase`
//+kubebuilder:printcolumn:name="Replicas",type=integer,JSONPath=`.status.availableReplicas`
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
//+kubebuilder:resource:shortName=wa
// WebApp 是 Schema 定义
type WebApp struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec WebAppSpec `json:"spec,omitempty"`
Status WebAppStatus `json:"status,omitempty"`
}
//+kubebuilder:object:root=true
// WebAppList 是 WebApp 列表
type WebAppList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []WebApp `json:"items"`
}
func init() {
SchemeBuilder.Register(&WebApp{}, &WebAppList{})
}
生成 CRD 和 DeepCopy 代码:
# 生成 DeepCopy 方法
make generate
# 生成 CRD YAML
make manifests
生成的 CRD YAML 会包含 OpenAPI v3 校验规则,这样 Kubernetes 就能验证用户提交的 WebApp 资源是否合法。
实现 Reconciliation Loop
Reconciliation Loop 是 Operator 的核心。它是一个无限循环,不断检查当前状态和期望状态的差异,并执行修复操作。
// internal/controller/webapp_controller.go
package controller
import (
"context"
"fmt"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
webappv1alpha1 "github.com/yourusername/webapp-operator/api/v1alpha1"
)
const (
webappFinalizer = "webapp.example.com/finalizer"
requeueAfter = 30 * time.Second
)
// WebAppReconciler 调谐 WebApp 资源
type WebAppReconciler struct {
client.Client
Scheme *runtime.Scheme
}
//+kubebuilder:rbac:groups=webapp.example.com,resources=webapps,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=webapp.example.com,resources=webapps/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=webapp.example.com,resources=webapps/finalizers,verbs=update
//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=get;list;watch;create;update;patch;delete
// Reconcile 调谐循环
func (r *WebAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
logger.Info("Reconciling WebApp", "namespace", req.Namespace, "name", req.Name)
// 第 1 步:获取 WebApp 资源
webapp := &webappv1alpha1.WebApp{}
if err := r.Get(ctx, req.NamespacedName, webapp); err != nil {
if errors.IsNotFound(err) {
logger.Info("WebApp resource not found, ignoring")
return ctrl.Result{}, nil
}
logger.Error(err, "Failed to get WebApp")
return ctrl.Result{}, err
}
// 第 2 步:处理 Finalizer(删除前的清理工作)
if webapp.ObjectMeta.DeletionTimestamp.IsZero() {
// 资源未被删除,确保 finalizer 存在
if !controllerutil.ContainsFinalizer(webapp, webappFinalizer) {
controllerutil.AddFinalizer(webapp, webappFinalizer)
if err := r.Update(ctx, webapp); err != nil {
return ctrl.Result{}, err
}
}
} else {
// 资源正在被删除
if controllerutil.ContainsFinalizer(webapp, webappFinalizer) {
// 执行清理逻辑
if err := r.cleanupExternalResources(ctx, webapp); err != nil {
logger.Error(err, "Failed to cleanup external resources")
return ctrl.Result{}, err
}
// 移除 finalizer
controllerutil.RemoveFinalizer(webapp, webappFinalizer)
if err := r.Update(ctx, webapp); err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
// 第 3 步:调谐 Deployment
deploymentResult, err := r.reconcileDeployment(ctx, webapp)
if err != nil {
r.setCondition(ctx, webapp, "DeploymentReady", metav1.ConditionFalse,
"DeploymentFailed", err.Error())
return deploymentResult, err
}
// 第 4 步:调谐 Service
serviceResult, err := r.reconcileService(ctx, webapp)
if err != nil {
r.setCondition(ctx, webapp, "ServiceReady", metav1.ConditionFalse,
"ServiceFailed", err.Error())
return serviceResult, err
}
// 第 5 步:调谐 Ingress(如果配置了)
if webapp.Spec.Ingress != nil {
ingressResult, err := r.reconcileIngress(ctx, webapp)
if err != nil {
r.setCondition(ctx, webapp, "IngressReady", metav1.ConditionFalse,
"IngressFailed", err.Error())
return ingressResult, err
}
}
// 第 6 步:调谐 HPA(如果配置了自动扩缩)
if webapp.Spec.AutoScaling != nil {
hpaResult, err := r.reconcileHPA(ctx, webapp)
if err != nil {
logger.Error(err, "Failed to reconcile HPA")
// HPA 失败不影响主流程,只记录日志
} else {
_ = hpaResult
}
}
// 第 7 步:更新 Status
if err := r.updateStatus(ctx, webapp); err != nil {
logger.Error(err, "Failed to update status")
return ctrl.Result{}, err
}
// 设置成功条件
r.setCondition(ctx, webapp, "Ready", metav1.ConditionTrue,
"ReconcileSuccess", "All resources reconciled successfully")
logger.Info("Reconciliation completed successfully")
return ctrl.Result{RequeueAfter: requeueAfter}, nil
}
// reconcileDeployment 调谐 Deployment
func (r *WebAppReconciler) reconcileDeployment(ctx context.Context, webapp *webappv1alpha1.WebApp) (ctrl.Result, error) {
logger := log.FromContext(ctx)
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: webapp.Name,
Namespace: webapp.Namespace,
},
}
// CreateOrUpdate 是 kubebuilder 的魔法:
// 如果资源不存在就创建,存在就更新
op, err := controllerutil.CreateOrUpdate(ctx, r.Client, deployment, func() error {
// 设置 Deployment 的期望状态
r.mutateDeployment(deployment, webapp)
// 设置 OwnerReference(这样删除 WebApp 时会自动删除 Deployment)
return controllerutil.SetControllerReference(webapp, deployment, r.Scheme)
})
if err != nil {
logger.Error(err, "Failed to reconcile Deployment")
return ctrl.Result{}, err
}
logger.Info("Deployment reconciled", "operation", op)
return ctrl.Result{}, nil
}
// mutateDeployment 设置 Deployment 的期望状态
func (r *WebAppReconciler) mutateDeployment(deployment *appsv1.Deployment, webapp *webappv1alpha1.WebApp) {
labels := map[string]string{
"app": webapp.Name,
"app.kubernetes.io/name": webapp.Name,
"app.kubernetes.io/managed-by": "webapp-operator",
}
replicas := webapp.Spec.Replicas
if webapp.Spec.AutoScaling != nil {
replicas = webapp.Spec.AutoScaling.MinReplicas
}
resources := r.getResourcesForSize(webapp.Spec.Size)
deployment.Spec = appsv1.DeploymentSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: webapp.Name,
Image: webapp.Spec.Image,
Resources: resources,
Ports: []corev1.ContainerPort{
{
Name: "http",
ContainerPort: webapp.Spec.Port,
Protocol: corev1.ProtocolTCP,
},
},
Env: webapp.Spec.Env,
// 健康检查
LivenessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/health",
Port: intstr.FromInt(int(webapp.Spec.Port)),
},
},
InitialDelaySeconds: 30,
PeriodSeconds: 10,
FailureThreshold: 3,
},
ReadinessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/health",
Port: intstr.FromInt(int(webapp.Spec.Port)),
},
},
InitialDelaySeconds: 5,
PeriodSeconds: 5,
FailureThreshold: 3,
},
},
},
},
},
}
// 注入数据库环境变量
if webapp.Spec.Database != nil {
deployment.Spec.Template.Spec.Containers[0].Env = append(
deployment.Spec.Template.Spec.Containers[0].Env,
corev1.EnvVar{
Name: "DATABASE_HOST",
Value: webapp.Spec.Database.Host,
},
corev1.EnvVar{
Name: "DATABASE_PORT",
Value: fmt.Sprintf("%d", webapp.Spec.Database.Port),
},
corev1.EnvVar{
Name: "DATABASE_NAME",
Value: webapp.Spec.Database.Name,
},
corev1.EnvVar{
Name: "DATABASE_PASSWORD",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: webapp.Spec.Database.CredentialsSecret,
},
Key: "password",
},
},
},
)
}
}
// getResourcesForSize 根据规格返回资源限制
func (r *WebAppReconciler) getResourcesForSize(size webappv1alpha1.ResourceSize) corev1.ResourceRequirements {
switch size {
case webappv1alpha1.MediumSize:
return corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("250m"),
corev1.ResourceMemory: resource.MustParse("256Mi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("500m"),
corev1.ResourceMemory: resource.MustParse("512Mi"),
},
}
case webappv1alpha1.LargeSize:
return corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("500m"),
corev1.ResourceMemory: resource.MustParse("512Mi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
}
default: // Small
return corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100m"),
corev1.ResourceMemory: resource.MustParse("128Mi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("200m"),
corev1.ResourceMemory: resource.MustParse("256Mi"),
},
}
}
}
// reconcileService 调谐 Service
func (r *WebAppReconciler) reconcileService(ctx context.Context, webapp *webappv1alpha1.WebApp) (ctrl.Result, error) {
logger := log.FromContext(ctx)
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: webapp.Name,
Namespace: webapp.Namespace,
},
}
op, err := controllerutil.CreateOrUpdate(ctx, r.Client, service, func() error {
service.Spec = corev1.ServiceSpec{
Selector: map[string]string{
"app": webapp.Name,
},
Ports: []corev1.ServicePort{
{
Name: "http",
Port: 80,
TargetPort: intstr.FromInt(int(webapp.Spec.Port)),
Protocol: corev1.ProtocolTCP,
},
},
Type: corev1.ServiceTypeClusterIP,
}
return controllerutil.SetControllerReference(webapp, service, r.Scheme)
})
if err != nil {
logger.Error(err, "Failed to reconcile Service")
return ctrl.Result{}, err
}
logger.Info("Service reconciled", "operation", op)
return ctrl.Result{}, nil
}
// reconcileIngress 调谐 Ingress
func (r *WebAppReconciler) reconcileIngress(ctx context.Context, webapp *webappv1alpha1.WebApp) (ctrl.Result, error) {
logger := log.FromContext(ctx)
ingress := &networkingv1.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: webapp.Name,
Namespace: webapp.Namespace,
},
}
op, err := controllerutil.CreateOrUpdate(ctx, r.Client, ingress, func() error {
pathType := networkingv1.PathTypePrefix
ingress.Spec = networkingv1.IngressSpec{
Rules: []networkingv1.IngressRule{
{
Host: webapp.Spec.Ingress.Host,
IngressRuleValue: networkingv1.IngressRuleValue{
HTTP: &networkingv1.HTTPIngressRuleValue{
Paths: []networkingv1.HTTPIngressPath{
{
Path: "/",
PathType: &pathType,
Backend: networkingv1.IngressBackend{
Service: &networkingv1.IngressServiceBackend{
Name: webapp.Name,
Port: networkingv1.ServiceBackendPort{
Number: 80,
},
},
},
},
},
},
},
},
},
}
// 配置 TLS
if webapp.Spec.Ingress.TLS && webapp.Spec.Ingress.TLSSecret != "" {
ingress.Spec.TLS = []networkingv1.IngressTLS{
{
Hosts: []string{webapp.Spec.Ingress.Host},
SecretName: webapp.Spec.Ingress.TLSSecret,
},
}
}
return controllerutil.SetControllerReference(webapp, ingress, r.Scheme)
})
if err != nil {
logger.Error(err, "Failed to reconcile Ingress")
return ctrl.Result{}, err
}
logger.Info("Ingress reconciled", "operation", op)
return ctrl.Result{}, nil
}
Status 管理与 Condition
Status 是 Operator 向用户反馈状态的窗口。良好的 Status 设计能让运维人员一眼看出资源的健康状态:
// updateStatus 更新 WebApp 状态
func (r *WebAppReconciler) updateStatus(ctx context.Context, webapp *webappv1alpha1.WebApp) error {
logger := log.FromContext(ctx)
// 获取当前 Deployment 状态
deployment := &appsv1.Deployment{}
err := r.Get(ctx, types.NamespacedName{
Name: webapp.Name,
Namespace: webapp.Namespace,
}, deployment)
if err != nil {
if errors.IsNotFound(err) {
webapp.Status.Phase = webappv1alpha1.PhasePending
webapp.Status.AvailableReplicas = 0
} else {
return err
}
} else {
webapp.Status.AvailableReplicas = deployment.Status.AvailableReplicas
// 根据 Deployment 状态判断 Phase
if deployment.Status.AvailableReplicas == *deployment.Spec.Replicas {
webapp.Status.Phase = webappv1alpha1.PhaseRunning
webapp.Status.ServiceURL = fmt.Sprintf("http://%s.%s.svc.cluster.local",
webapp.Name, webapp.Namespace)
} else if deployment.Status.UpdatedReplicas < *deployment.Spec.Replicas {
webapp.Status.Phase = webappv1alpha1.PhaseUpdating
} else {
webapp.Status.Phase = webappv1alpha1.PhasePending
}
// 更新最后部署时间
now := metav1.Now()
webapp.Status.LastDeployed = &now
}
// 更新 Status 子资源
if err := r.Status().Update(ctx, webapp); err != nil {
logger.Error(err, "Failed to update WebApp status")
return err
}
return nil
}
// setCondition 设置 Condition(标准化的状态条件)
func (r *WebAppReconciler) setCondition(ctx context.Context, webapp *webappv1alpha1.WebApp,
condType string, status metav1.ConditionStatus, reason, message string) {
condition := metav1.Condition{
Type: condType,
Status: status,
ObservedGeneration: webapp.Generation,
LastTransitionTime: metav1.Now(),
Reason: reason,
Message: message,
}
meta.SetStatusCondition(&webapp.Status.Conditions, condition)
// 尝试更新 status
if err := r.Status().Update(ctx, webapp); err != nil {
log.FromContext(ctx).Error(err, "Failed to update condition")
}
}
Finalizer:优雅清理资源
Finalizer 是 Kubernetes 的"删除拦截器"。当资源被删除时,Kubernetes 会等待所有 Finalizer 清理完毕才真正删除资源:
// cleanupExternalResources 清理外部资源
func (r *WebAppReconciler) cleanupExternalResources(ctx context.Context, webapp *webappv1alpha1.WebApp) error {
logger := log.FromContext(ctx)
logger.Info("Cleaning up external resources for WebApp", "name", webapp.Name)
// 示例:清理外部数据库中的相关记录
if webapp.Spec.Database != nil {
logger.Info("Cleaning up database records",
"host", webapp.Spec.Database.Host,
"database", webapp.Spec.Database.Name,
)
// 这里可以调用外部 API 清理数据库...
}
// 示例:清理外部存储桶
logger.Info("Cleaning up storage bucket", "name", webapp.Name)
// 这里可以调用云厂商 API 删除存储桶...
// 示例:通知监控系统
logger.Info("Removing monitoring alerts", "name", webapp.Name)
// 这里可以调用 Prometheus API 删除告警规则...
return nil
}
Webhook:资源校验与默认值
Webhook 让你在资源被创建、更新、删除时执行自定义逻辑。kubebuilder 提供了两种 Webhook:
- Mutating Webhook:修改资源(例如注入默认值)
- Validating Webhook:验证资源是否合法
// api/v1alpha1/webapp_webhook.go
package v1alpha1
import (
"fmt"
"regexp"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
ctrl "sigs.k8s.io/controller-runtime"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)
var webapplog = logf.Log.WithName("webapp-resource")
// SetupWebhookWithManager 注册 Webhook
func (r *WebApp) SetupWebhookWithManager(mgr ctrl.Manager) error {
return ctrl.NewWebhookManagedBy(mgr).
For(r).
WithDefaulter(r).
WithValidator(r).
Complete()
}
//+kubebuilder:webhook:path=/mutate-webapp-example-com-v1alpha1-webapp,mutating=true,failurePolicy=fail,sideEffects=None,groups=webapp.example.com,resources=webapps,verbs=create;update,versions=v1alpha1,name=mwebapp.kb.io,admissionReviewVersions=v1
var _ webhook.Defaulter = &WebApp{}
// Default 实现默认值注入
func (r *WebApp) Default() {
webapplog.Info("default", "name", r.Name)
// 设置默认副本数
if r.Spec.Replicas == 0 {
r.Spec.Replicas = 1
}
// 设置默认端口
if r.Spec.Port == 0 {
r.Spec.Port = 8080
}
// 设置默认规格
if r.Spec.Size == "" {
r.Spec.Size = SmallSize
}
// 设置默认数据库端口
if r.Spec.Database != nil && r.Spec.Database.Port == 0 {
r.Spec.Database.Port = 5432
}
// 设置自动扩缩的默认值
if r.Spec.AutoScaling != nil {
if r.Spec.AutoScaling.MinReplicas == 0 {
r.Spec.AutoScaling.MinReplicas = 1
}
if r.Spec.AutoScaling.TargetCPUUtilization == 0 {
r.Spec.AutoScaling.TargetCPUUtilization = 80
}
}
}
//+kubebuilder:webhook:path=/validate-webapp-example-com-v1alpha1-webapp,mutating=false,failurePolicy=fail,sideEffects=None,groups=webapp.example.com,resources=webapps,verbs=create;update,versions=v1alpha1,name=vwebapp.kb.io,admissionReviewVersions=v1
var _ webhook.Validator = &WebApp{}
// ValidateCreate 创建时验证
func (r *WebApp) ValidateCreate() (admission.Warnings, error) {
webapplog.Info("validate create", "name", r.Name)
return nil, r.validate()
}
// ValidateUpdate 更新时验证
func (r *WebApp) ValidateUpdate(old runtime.Object) (admission.Warnings, error) {
webapplog.Info("validate update", "name", r.Name)
oldWebApp := old.(*WebApp)
// 禁止修改某些字段(不可变字段)
if oldWebApp.Spec.Database != nil && r.Spec.Database != nil {
if oldWebApp.Spec.Database.Host != r.Spec.Database.Host {
return nil, fmt.Errorf("database host is immutable, cannot change from %s to %s",
oldWebApp.Spec.Database.Host, r.Spec.Database.Host)
}
}
return nil, r.validate()
}
// ValidateDelete 删除时验证
func (r *WebApp) ValidateDelete() (admission.Warnings, error) {
webapplog.Info("validate delete", "name", r.Name)
// 可以检查是否还有依赖这个 WebApp 的其他资源
return nil, nil
}
// validate 通用验证逻辑
func (r *WebApp) validate() error {
var allErrs field.ErrorList
// 验证镜像格式
imagePattern := regexp.MustCompile(`^[a-z0-9.-]+(/[a-z0-9.-]+)*(:[a-z0-9.-]+)?$`)
if !imagePattern.MatchString(r.Spec.Image) {
allErrs = append(allErrs, field.Invalid(
field.NewPath("spec").Child("image"),
r.Spec.Image,
"invalid image format",
))
}
// 验证自动扩缩配置
if r.Spec.AutoScaling != nil {
if r.Spec.AutoScaling.MinReplicas > r.Spec.AutoScaling.MaxReplicas {
allErrs = append(allErrs, field.Invalid(
field.NewPath("spec").Child("autoScaling").Child("minReplicas"),
r.Spec.AutoScaling.MinReplicas,
"minReplicas cannot be greater than maxReplicas",
))
}
}
// 验证 TLS 配置
if r.Spec.Ingress != nil && r.Spec.Ingress.TLS {
if r.Spec.Ingress.TLSSecret == "" {
allErrs = append(allErrs, field.Required(
field.NewPath("spec").Child("ingress").Child("tlsSecret"),
"tlsSecret is required when TLS is enabled",
))
}
}
if len(allErrs) == 0 {
return nil
}
return fmt.Errorf("validation failed: %v", allErrs)
}
启用 Webhook 需要在 main.go 中配置:
// cmd/main.go
package main
import (
"flag"
"os"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"
webappv1alpha1 "github.com/yourusername/webapp-operator/api/v1alpha1"
"github.com/yourusername/webapp-operator/internal/controller"
)
var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(webappv1alpha1.AddToScheme(scheme))
}
func main() {
var metricsAddr string
var enableLeaderElection bool
var probeAddr string
var enableWebhooks bool
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager.")
flag.BoolVar(&enableWebhooks, "enable-webhooks", true, "Enable webhooks.")
opts := zap.Options{
Development: true,
}
opts.BindFlags(flag.CommandLine)
flag.Parse()
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Metrics: metricsserver.Options{BindAddress: metricsAddr},
WebhookServer: webhook.NewServer(webhook.Options{Port: 9443}),
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "webapp-operator.example.com",
})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
// 注册 Controller
if err = (&controller.WebAppReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "WebApp")
os.Exit(1)
}
// 注册 Webhook
if enableWebhooks {
if err = (&webappv1alpha1.WebApp{}).SetupWebhookWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create webhook", "webhook", "WebApp")
os.Exit(1)
}
}
// 健康检查
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up ready check")
os.Exit(1)
}
setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
}
测试 Operator
测试 Operator 是开发中非常重要的一环。kubebuilder 提供了两种测试方式:
单元测试(使用 fake client)
// internal/controller/webapp_controller_test.go
package controller
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
webappv1alpha1 "github.com/yourusername/webapp-operator/api/v1alpha1"
)
func newTestScheme() *runtime.Scheme {
scheme := runtime.NewScheme()
_ = clientgoscheme.AddToScheme(scheme)
_ = webappv1alpha1.AddToScheme(scheme)
return scheme
}
func TestReconcile_CreateDeployment(t *testing.T) {
scheme := newTestScheme()
// 创建测试用的 WebApp 资源
webapp := &webappv1alpha1.WebApp{
ObjectMeta: metav1.ObjectMeta{
Name: "test-webapp",
Namespace: "default",
},
Spec: webappv1alpha1.WebAppSpec{
Name: "test-webapp",
Image: "nginx:latest",
Replicas: 2,
Port: 8080,
Size: webappv1alpha1.SmallSize,
},
}
// 使用 fake client 模拟 Kubernetes API
fakeClient := fake.NewClientBuilder().
WithScheme(scheme).
WithObjects(webapp).
WithStatusSubresource(webapp).
Build()
reconciler := &WebAppReconciler{
Client: fakeClient,
Scheme: scheme,
}
// 执行 Reconcile
req := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "test-webapp",
Namespace: "default",
},
}
result, err := reconciler.Reconcile(context.Background(), req)
require.NoError(t, err)
assert.False(t, result.Requeue)
// 验证 Deployment 是否被创建
deployment := &appsv1.Deployment{}
err = fakeClient.Get(context.Background(), req.NamespacedName, deployment)
require.NoError(t, err)
// 验证 Deployment 的属性
assert.Equal(t, "test-webapp", deployment.Name)
assert.Equal(t, int32(2), *deployment.Spec.Replicas)
assert.Equal(t, "nginx:latest", deployment.Spec.Template.Spec.Containers[0].Image)
assert.Equal(t, int32(8080), deployment.Spec.Template.Spec.Containers[0].Ports[0].ContainerPort)
// 验证 OwnerReference
assert.Len(t, deployment.OwnerReferences, 1)
assert.Equal(t, "test-webapp", deployment.OwnerReferences[0].Name)
}
func TestReconcile_CreateService(t *testing.T) {
scheme := newTestScheme()
webapp := &webappv1alpha1.WebApp{
ObjectMeta: metav1.ObjectMeta{
Name: "test-webapp",
Namespace: "default",
},
Spec: webappv1alpha1.WebAppSpec{
Name: "test-webapp",
Image: "nginx:latest",
Replicas: 1,
Port: 8080,
},
}
fakeClient := fake.NewClientBuilder().
WithScheme(scheme).
WithObjects(webapp).
WithStatusSubresource(webapp).
Build()
reconciler := &WebAppReconciler{
Client: fakeClient,
Scheme: scheme,
}
req := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "test-webapp",
Namespace: "default",
},
}
_, err := reconciler.Reconcile(context.Background(), req)
require.NoError(t, err)
// 验证 Service 是否被创建
service := &corev1.Service{}
err = fakeClient.Get(context.Background(), req.NamespacedName, service)
require.NoError(t, err)
assert.Equal(t, "test-webapp", service.Name)
assert.Equal(t, int32(80), service.Spec.Ports[0].Port)
}
func TestReconcile_WebAppNotFound(t *testing.T) {
scheme := newTestScheme()
fakeClient := fake.NewClientBuilder().
WithScheme(scheme).
Build()
reconciler := &WebAppReconciler{
Client: fakeClient,
Scheme: scheme,
}
req := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "non-existent",
Namespace: "default",
},
}
result, err := reconciler.Reconcile(context.Background(), req)
require.NoError(t, err)
assert.False(t, result.Requeue) // WebApp 不存在时不应该 requeue
}
func TestReconcile_UpdateDeployment(t *testing.T) {
scheme := newTestScheme()
// 创建现有的 WebApp 和 Deployment
webapp := &webappv1alpha1.WebApp{
ObjectMeta: metav1.ObjectMeta{
Name: "test-webapp",
Namespace: "default",
},
Spec: webappv1alpha1.WebAppSpec{
Name: "test-webapp",
Image: "nginx:1.19",
Replicas: 1,
Port: 8080,
},
}
existingDeployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test-webapp",
Namespace: "default",
},
Spec: appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"app": "test-webapp"},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"app": "test-webapp"},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test-webapp",
Image: "nginx:1.19",
},
},
},
},
},
}
fakeClient := fake.NewClientBuilder().
WithScheme(scheme).
WithObjects(webapp, existingDeployment).
WithStatusSubresource(webapp).
Build()
// 更新 WebApp 的镜像和副本数
webapp.Spec.Image = "nginx:1.21"
webapp.Spec.Replicas = 3
_ = fakeClient.Update(context.Background(), webapp)
reconciler := &WebAppReconciler{
Client: fakeClient,
Scheme: scheme,
}
req := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "test-webapp",
Namespace: "default",
},
}
_, err := reconciler.Reconcile(context.Background(), req)
require.NoError(t, err)
// 验证 Deployment 是否被更新
deployment := &appsv1.Deployment{}
err = fakeClient.Get(context.Background(), req.NamespacedName, deployment)
require.NoError(t, err)
assert.Equal(t, "nginx:1.21", deployment.Spec.Template.Spec.Containers[0].Image)
assert.Equal(t, int32(3), *deployment.Spec.Replicas)
}
集成测试(使用 envtest)
envtest 会启动一个真实的 Kubernetes API 服务器(etcd + kube-apiserver),用于更真实的集成测试:
// internal/controller/suite_test.go
package controller
import (
"context"
"path/filepath"
"testing"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
webappv1alpha1 "github.com/yourusername/webapp-operator/api/v1alpha1"
)
var (
cfg *rest.Config
k8sClient client.Client
testEnv *envtest.Environment
ctx context.Context
cancel context.CancelFunc
)
func TestControllers(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Controller Suite")
}
var _ = BeforeSuite(func() {
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))
ctx, cancel = context.WithCancel(context.TODO())
By("bootstrapping test environment")
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")},
ErrorIfCRDPathMissing: true,
}
var err error
cfg, err = testEnv.Start()
Expect(err).NotTo(HaveOccurred())
Expect(cfg).NotTo(BeNil())
err = webappv1alpha1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
Expect(err).NotTo(HaveOccurred())
Expect(k8sClient).NotTo(BeNil())
// 启动 Controller Manager
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme.Scheme,
})
Expect(err).ToNot(HaveOccurred())
err = (&WebAppReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr)
Expect(err).ToNot(HaveOccurred())
go func() {
defer GinkgoRecover()
err = mgr.Start(ctx)
Expect(err).ToNot(HaveOccurred())
}()
})
var _ = AfterSuite(func() {
cancel()
By("tearing down the test environment")
err := testEnv.Stop()
Expect(err).NotTo(HaveOccurred())
})
集成测试用例:
// internal/controller/webapp_integration_test.go
package controller
import (
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
webappv1alpha1 "github.com/yourusername/webapp-operator/api/v1alpha1"
)
var _ = Describe("WebApp Controller", func() {
const (
timeout = time.Second * 30
interval = time.Millisecond * 250
)
Context("When creating a WebApp", func() {
It("Should create Deployment and Service", func() {
webapp := &webappv1alpha1.WebApp{
ObjectMeta: metav1.ObjectMeta{
Name: "test-webapp",
Namespace: "default",
},
Spec: webappv1alpha1.WebAppSpec{
Name: "test-webapp",
Image: "nginx:latest",
Replicas: 2,
Port: 8080,
Size: webappv1alpha1.MediumSize,
},
}
Expect(k8sClient.Create(ctx, webapp)).Should(Succeed())
// 等待 Deployment 被创建
deployment := &appsv1.Deployment{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Name: "test-webapp",
Namespace: "default",
}, deployment)
}, timeout, interval).Should(Succeed())
// 验证 Deployment 属性
Expect(*deployment.Spec.Replicas).To(Equal(int32(2)))
Expect(deployment.Spec.Template.Spec.Containers[0].Image).To(Equal("nginx:latest"))
// 等待 Service 被创建
service := &corev1.Service{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Name: "test-webapp",
Namespace: "default",
}, service)
}, timeout, interval).Should(Succeed())
Expect(service.Spec.Ports[0].Port).To(Equal(int32(80)))
})
})
Context("When updating a WebApp", func() {
It("Should update the Deployment", func() {
webapp := &webappv1alpha1.WebApp{}
Expect(k8sClient.Get(ctx, types.NamespacedName{
Name: "test-webapp",
Namespace: "default",
}, webapp)).Should(Succeed())
// 更新副本数
webapp.Spec.Replicas = 5
Expect(k8sClient.Update(ctx, webapp)).Should(Succeed())
// 验证 Deployment 是否被更新
deployment := &appsv1.Deployment{}
Eventually(func() int32 {
err := k8sClient.Get(ctx, types.NamespacedName{
Name: "test-webapp",
Namespace: "default",
}, deployment)
if err != nil {
return 0
}
return *deployment.Spec.Replicas
}, timeout, interval).Should(Equal(int32(5)))
})
})
Context("When deleting a WebApp", func() {
It("Should cleanup all resources", func() {
webapp := &webappv1alpha1.WebApp{}
Expect(k8sClient.Get(ctx, types.NamespacedName{
Name: "test-webapp",
Namespace: "default",
}, webapp)).Should(Succeed())
Expect(k8sClient.Delete(ctx, webapp)).Should(Succeed())
// 验证 Deployment 是否被删除(因为有 OwnerReference)
deployment := &appsv1.Deployment{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Name: "test-webapp",
Namespace: "default",
}, deployment)
}, timeout, interval).ShouldNot(Succeed())
})
})
})
部署 Operator
kubebuilder 提供了一键部署的方式:
# 安装 CRD 到集群
make install
# 本地运行(开发调试)
make run
# 构建镜像并部署到集群
make docker-build docker-push IMG=yourregistry/webapp-operator:v0.1.0
make deploy IMG=yourregistry/webapp-operator:v0.1.0
测试使用 WebApp CR:
# config/samples/webapp_v1alpha1_webapp.yaml
apiVersion: webapp.example.com/v1alpha1
kind: WebApp
metadata:
name: my-webapp
namespace: default
spec:
name: my-webapp
image: nginx:1.21
replicas: 3
port: 8080
size: medium
env:
- name: ENV
value: production
database:
host: postgres.default.svc.cluster.local
port: 5432
name: mydb
credentialsSecret: db-credentials
autoScaling:
minReplicas: 2
maxReplicas: 10
targetCPUUtilization: 75
ingress:
host: my-webapp.example.com
tls: true
tlsSecret: my-webapp-tls
应用 CR:
kubectl apply -f config/samples/webapp_v1alpha1_webapp.yaml
# 查看 WebApp 状态
kubectl get webapps
# NAME PHASE REPLICAS AGE
# my-webapp Running 3 5m
# 查看详细状态
kubectl describe webapp my-webapp
# 查看生成的资源
kubectl get deployments,services,ingresses
Operator 高级模式
1. 多版本 API 支持
随着 Operator 的发展,你可能需要支持多个 API 版本:
// api/v1beta1/webapp_types.go(新版本)
package v1beta1
type WebAppSpec struct {
// 新增字段
IngressClassName string `json:"ingressClassName,omitempty"`
// 保留旧版本字段
Name string `json:"name"`
Image string `json:"image"`
Replicas int32 `json:"replicas"`
}
// 提供版本转换 Webhook
func (r *WebApp) ConvertTo(dst conversion.Hub) error {
// v1alpha1 -> v1beta1 转换逻辑
target := dst.(*v1beta1.WebApp)
target.Spec.Name = r.Spec.Name
target.Spec.Image = r.Spec.Image
target.Spec.Replicas = r.Spec.Replicas
return nil
}
func (r *WebApp) ConvertFrom(src conversion.Hub) error {
// v1beta1 -> v1alpha1 转换逻辑
source := src.(*v1beta1.WebApp)
r.Spec.Name = source.Spec.Name
r.Spec.Image = source.Spec.Image
r.Spec.Replicas = source.Spec.Replicas
return nil
}
2. 多集群管理
如果你的 Operator 需要管理跨集群资源:
import (
"sigs.k8s.io/controller-runtime/pkg/client"
"k8s.io/client-go/tools/clientcmd"
)
// 创建多集群客户端
func getRemoteClient(kubeconfigPath string) (client.Client, error) {
config, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
if err != nil {
return nil, err
}
scheme := runtime.NewScheme()
_ = clientgoscheme.AddToScheme(scheme)
_ = webappv1alpha1.AddToScheme(scheme)
return client.New(config, client.Options{Scheme: scheme})
}
// 在 Controller 中管理多个集群
type MultiClusterReconciler struct {
client.Client
Scheme *runtime.Scheme
RemoteClients map[string]client.Client // cluster-name -> client
}
func (r *MultiClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
webapp := &webappv1alpha1.WebApp{}
if err := r.Get(ctx, req.NamespacedName, webapp); err != nil {
return ctrl.Result{}, err
}
// 根据 WebApp 的注解决定部署到哪个集群
targetCluster := webapp.Annotations["target-cluster"]
remoteClient, ok := r.RemoteClients[targetCluster]
if !ok {
return ctrl.Result{}, fmt.Errorf("cluster %s not found", targetCluster)
}
// 在远程集群创建资源
deployment := buildDeployment(webapp)
if err := remoteClient.Create(ctx, deployment); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
3. Leader Election(领导者选举)
在多副本部署时,只有一个 Controller 实例应该执行调谐:
// cmd/main.go
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
LeaderElection: true, // 启用领导者选举
LeaderElectionID: "webapp-operator-leader",
LeaderElectionNamespace: "webapp-operator-system",
LeaseDuration: ptr.To(15 * time.Second),
RenewDeadline: ptr.To(10 * time.Second),
RetryPeriod: ptr.To(2 * time.Second),
})
4. 资源配额与限制
为 Operator 创建的资源设置默认配额:
func (r *WebAppReconciler) mutateDeployment(deployment *appsv1.Deployment, webapp *webappv1alpha1.WebApp) {
// 根据用户选择的 size 设置资源配额
resources := corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100m"),
corev1.ResourceMemory: resource.MustParse("128Mi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("500m"),
corev1.ResourceMemory: resource.MustParse("512Mi"),
},
}
// 允许用户覆盖(但不能超过限制)
if webapp.Spec.Resources != nil {
if webapp.Spec.Resources.Requests != nil {
resources.Requests = webapp.Spec.Resources.Requests
}
if webapp.Spec.Resources.Limits != nil {
resources.Limits = webapp.Spec.Resources.Limits
}
}
deployment.Spec.Template.Spec.Containers[0].Resources = resources
}
5. 优雅的错误处理与重试
func (r *WebAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
webapp := &webappv1alpha1.WebApp{}
if err := r.Get(ctx, req.NamespacedName, webapp); err != nil {
if errors.IsNotFound(err) {
// 资源被删除,正常情况
return ctrl.Result{}, nil
}
// 临时错误,稍后重试
logger.Error(err, "Failed to get WebApp")
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
// 尝试调谐,失败时使用指数退避
if err := r.reconcileDeployment(ctx, webapp); err != nil {
logger.Error(err, "Failed to reconcile Deployment")
// 更新状态为失败
r.setCondition(ctx, webapp, "Ready", metav1.ConditionFalse,
"ReconcileFailed", err.Error())
// 根据错误类型决定重试策略
if isTransientError(err) {
// 临时错误:快速重试
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
// 永久错误:不自动重试,等待人工干预
return ctrl.Result{}, nil
}
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}
func isTransientError(err error) bool {
// 判断是否为临时性错误(网络超时、资源不足等)
if errors.Is(err, context.DeadlineExceeded) {
return true
}
if k8serrors.IsServerTimeout(err) {
return true
}
return false
}
实战案例:数据库 Operator
让我们看一个更真实的场景——构建一个管理 PostgreSQL 数据库的 Operator。这个 Operator 会自动创建数据库实例、管理用户权限、执行数据库迁移。
Database CRD 定义
// api/v1alpha1/database_types.go
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// DatabaseSpec 数据库规格
type DatabaseSpec struct {
//+kubebuilder:validation:Required
//+kubebuilder:validation:Enum=postgres;mysql
// 数据库引擎
Engine string `json:"engine"`
//+kubebuilder:validation:Required
//+kubebuilder:validation:Pattern=`^\d+\.\d+$`
// 版本号
Version string `json:"version"`
//+kubebuilder:validation:Required
//+kubebuilder:validation:MinLength=1
// 数据库名称
DatabaseName string `json:"databaseName"`
//+kubebuilder:validation:Minimum=1
//+kubebuilder:validation:Maximum=100
//+kubebuilder:default=1
// 存储大小 (GB)
StorageGB int32 `json:"storageGB,omitempty"`
//+kubebuilder:validation:Enum=small;medium;large
//+kubebuilder:default=small
// 实例规格
InstanceSize string `json:"instanceSize,omitempty"`
//+kubebuilder:default=true
// 是否启用高可用
HighAvailability bool `json:"highAvailability,omitempty"`
// 数据库用户列表
Users []DatabaseUser `json:"users,omitempty"`
// 备份配置
Backup *BackupConfig `json:"backup,omitempty"`
}
// DatabaseUser 数据库用户
type DatabaseUser struct {
//+kubebuilder:validation:Required
Username string `json:"username"`
//+kubebuilder:validation:Enum=readonly;readwrite;admin
//+kubebuilder:default=readonly
Role string `json:"role,omitempty"`
// 密码 Secret 引用
PasswordSecret string `json:"passwordSecret"`
}
// BackupConfig 备份配置
type BackupConfig struct {
//+kubebuilder:default=true
Enabled bool `json:"enabled,omitempty"`
//+kubebuilder:validation:Pattern=`^(\d+h|\d+d)$`
//+kubebuilder:default="24h"
// 备份间隔
Interval string `json:"interval,omitempty"`
//+kubebuilder:validation:Minimum=1
//+kubebuilder:validation:Maximum=30
//+kubebuilder:default=7
// 保留天数
RetentionDays int32 `json:"retentionDays,omitempty"`
// 备份存储位置(S3 路径)
StoragePath string `json:"storagePath,omitempty"`
}
// DatabaseStatus 数据库状态
type DatabaseStatus struct {
//+kubebuilder:validation:Enum=Pending;Provisioning;Running;Failed;Deleting
Phase string `json:"phase,omitempty"`
// 连接地址
Endpoint string `json:"endpoint,omitempty"`
// 端口
Port int32 `json:"port,omitempty"`
// 实际使用的存储
StorageUsed string `json:"storageUsed,omitempty"`
// 最后备份时间
LastBackup *metav1.Time `json:"lastBackup,omitempty"`
// 条件
Conditions []metav1.Condition `json:"conditions,omitempty"`
}
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:printcolumn:name="Engine",type=string,JSONPath=`.spec.engine`
//+kubebuilder:printcolumn:name="Version",type=string,JSONPath=`.spec.version`
//+kubebuilder:printcolumn:name="Phase",type=string,JSONPath=`.status.phase`
//+kubebuilder:printcolumn:name="Endpoint",type=string,JSONPath=`.status.endpoint`
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
//+kubebuilder:resource:shortName=db
// Database 是 Schema 定义
type Database struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec DatabaseSpec `json:"spec,omitempty"`
Status DatabaseStatus `json:"status,omitempty"`
}
//+kubebuilder:object:root=true
// DatabaseList 是 Database 列表
type DatabaseList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []Database `json:"items"`
}
func init() {
SchemeBuilder.Register(&Database{}, &DatabaseList{})
}
Database Controller 核心逻辑
// internal/controller/database_controller.go
package controller
import (
"context"
"fmt"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
dbv1alpha1 "github.com/yourusername/db-operator/api/v1alpha1"
)
const dbFinalizer = "db.example.com/finalizer"
// DatabaseReconciler 调谐 Database 资源
type DatabaseReconciler struct {
client.Client
Scheme *runtime.Scheme
CloudProvider CloudProvider // 云厂商接口
}
// CloudProvider 云厂商抽象接口
type CloudProvider interface {
CreateInstance(ctx context.Context, spec *dbv1alpha1.DatabaseSpec) (*InstanceInfo, error)
DeleteInstance(ctx context.Context, instanceID string) error
GetInstanceStatus(ctx context.Context, instanceID string) (string, error)
CreateBackup(ctx context.Context, instanceID, storagePath string) error
}
type InstanceInfo struct {
Endpoint string
Port int32
ID string
}
func (r *DatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
logger.Info("Reconciling Database")
// 获取 Database 资源
db := &dbv1alpha1.Database{}
if err := r.Get(ctx, req.NamespacedName, db); err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
// 处理删除
if !db.ObjectMeta.DeletionTimestamp.IsZero() {
return r.handleDeletion(ctx, db)
}
// 添加 Finalizer
if !controllerutil.ContainsFinalizer(db, dbFinalizer) {
controllerutil.AddFinalizer(db, dbFinalizer)
if err := r.Update(ctx, db); err != nil {
return ctrl.Result{}, err
}
}
// 更新状态为 Provisioning
if db.Status.Phase == "" {
db.Status.Phase = "Provisioning"
if err := r.Status().Update(ctx, db); err != nil {
return ctrl.Result{}, err
}
}
// 检查是否已有实例 ID
instanceID := db.Annotations["instance-id"]
if instanceID == "" {
// 创建新实例
info, err := r.CloudProvider.CreateInstance(ctx, &db.Spec)
if err != nil {
logger.Error(err, "Failed to create database instance")
db.Status.Phase = "Failed"
r.Status().Update(ctx, db)
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}
// 保存实例信息
if db.Annotations == nil {
db.Annotations = make(map[string]string)
}
db.Annotations["instance-id"] = info.ID
db.Status.Endpoint = info.Endpoint
db.Status.Port = info.Port
if err := r.Update(ctx, db); err != nil {
return ctrl.Result{}, err
}
}
// 检查实例状态
status, err := r.CloudProvider.GetInstanceStatus(ctx, instanceID)
if err != nil {
logger.Error(err, "Failed to get instance status")
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
if status == "running" {
db.Status.Phase = "Running"
} else {
db.Status.Phase = "Provisioning"
}
// 创建连接 Secret
if err := r.reconcileConnectionSecret(ctx, db); err != nil {
logger.Error(err, "Failed to reconcile connection secret")
return ctrl.Result{}, err
}
// 处理备份
if db.Spec.Backup != nil && db.Spec.Backup.Enabled {
if err := r.handleBackup(ctx, db, instanceID); err != nil {
logger.Error(err, "Failed to handle backup")
// 备份失败不影响主流程
}
}
// 更新状态
if err := r.Status().Update(ctx, db); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil
}
// reconcileConnectionSecret 创建数据库连接 Secret
func (r *DatabaseReconciler) reconcileConnectionSecret(ctx context.Context, db *dbv1alpha1.Database) error {
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: db.Name + "-connection",
Namespace: db.Namespace,
},
}
op, err := controllerutil.CreateOrUpdate(ctx, r.Client, secret, func() error {
secret.StringData = map[string]string{
"host": db.Status.Endpoint,
"port": fmt.Sprintf("%d", db.Status.Port),
"database": db.Spec.DatabaseName,
"engine": db.Spec.Engine,
}
// 添加 JDBC 和连接字符串
if db.Spec.Engine == "postgres" {
secret.StringData["url"] = fmt.Sprintf(
"postgresql://$(USERNAME):$(PASSWORD)@%s:%d/%s",
db.Status.Endpoint, db.Status.Port, db.Spec.DatabaseName,
)
}
return controllerutil.SetControllerReference(db, secret, r.Scheme)
})
if err != nil {
return err
}
log.FromContext(ctx).Info("Connection secret reconciled", "operation", op)
return nil
}
// handleBackup 处理备份
func (r *DatabaseReconciler) handleBackup(ctx context.Context, db *dbv1alpha1.Database, instanceID string) error {
logger := log.FromContext(ctx)
// 检查是否需要备份
if db.Status.LastBackup != nil {
interval, _ := time.ParseDuration(db.Spec.Backup.Interval)
if time.Since(db.Status.LastBackup.Time) < interval {
return nil // 还不到备份时间
}
}
// 执行备份
if err := r.CloudProvider.CreateBackup(ctx, instanceID, db.Spec.Backup.StoragePath); err != nil {
return err
}
// 更新最后备份时间
now := metav1.Now()
db.Status.LastBackup = &now
logger.Info("Backup completed successfully")
return nil
}
// handleDeletion 处理删除
func (r *DatabaseReconciler) handleDeletion(ctx context.Context, db *dbv1alpha1.Database) (ctrl.Result, error) {
logger := log.FromContext(ctx)
if controllerutil.ContainsFinalizer(db, dbFinalizer) {
instanceID := db.Annotations["instance-id"]
if instanceID != "" {
logger.Info("Deleting database instance", "instanceID", instanceID)
if err := r.CloudProvider.DeleteInstance(ctx, instanceID); err != nil {
logger.Error(err, "Failed to delete instance")
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}
}
controllerutil.RemoveFinalizer(db, dbFinalizer)
if err := r.Update(ctx, db); err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
使用示例:
# database-sample.yaml
apiVersion: db.example.com/v1alpha1
kind: Database
metadata:
name: my-app-db
spec:
engine: postgres
version: "15.3"
databaseName: myapp
storageGB: 10
instanceSize: medium
highAvailability: true
users:
- username: app_user
role: readwrite
passwordSecret: db-app-password
- username: readonly_user
role: readonly
passwordSecret: db-readonly-password
backup:
enabled: true
interval: "6h"
retentionDays: 14
storagePath: s3://my-backups/databases/my-app-db
应用 Database CR
# 创建数据库连接密码 Secret
kubectl create secret generic db-app-password \
--from-literal=password=super-secret-password
kubectl create secret generic db-readonly-password \
--from-literal=password=readonly-password
# 创建 Database 资源
kubectl apply -f database-sample.yaml
# 查看 Database 状态
kubectl get databases
# NAME ENGINE VERSION PHASE ENDPOINT AGE
# my-app-db postgres 15.3 Running my-app-db.abc123.us-west-2.rds.amazonaws.com 5m
# 查看详细信息
kubectl describe database my-app-db
# 查看自动创建的连接 Secret
kubectl get secret my-app-db-connection -o yaml
测试数据库连接
# 使用连接 Secret 测试数据库
kubectl run psql-test --rm -it --image postgres:15 -- \
psql "postgresql://app_user:$(kubectl get secret db-app-password -o jsonpath='{.data.password}' | base64 -d)@my-app-db.abc123.us-west-2.rds.amazonaws.com:5432/myapp"
# 或者创建一个 Pod 来测试
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Pod
metadata:
name: db-client
spec:
containers:
- name: psql
image: postgres:15
command: ["sleep", "infinity"]
env:
- name: PGPASSWORD
valueFrom:
secretKeyRef:
name: db-app-password
key: password
- name: PGHOST
valueFrom:
secretKeyRef:
name: my-app-db-connection
key: host
- name: PGDATABASE
valueFrom:
secretKeyRef:
name: my-app-db-connection
key: database
EOF
# 进入 Pod 测试
kubectl exec -it db-client -- psql -U app_user -c "SELECT version();"
Operator 设计模式与最佳实践
1. 幂等性设计
Operator 的 Reconcile 函数必须是幂等的——无论执行多少次,结果都应该相同:
// ✅ 好的做法:使用 CreateOrUpdate
func (r *WebAppReconciler) reconcileDeployment(ctx context.Context, webapp *webappv1alpha1.WebApp) error {
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: webapp.Name,
Namespace: webapp.Namespace,
},
}
_, err := controllerutil.CreateOrUpdate(ctx, r.Client, deployment, func() error {
r.mutateDeployment(deployment, webapp)
return controllerutil.SetControllerReference(webapp, deployment, r.Scheme)
})
return err
}
// ❌ 不好的做法:直接 Create(重复调用会报错)
func (r *WebAppReconciler) reconcileDeploymentBad(ctx context.Context, webapp *webappv1alpha1.WebApp) error {
deployment := buildDeployment(webapp)
return r.Create(ctx, deployment) // 如果已存在会失败
}
2. 状态管理策略
合理管理 Status 子资源,避免频繁更新导致冲突:
// 使用 Patch 而不是 Update 来更新 Status
func (r *WebAppReconciler) updateStatus(ctx context.Context, webapp *webappv1alpha1.WebApp, newStatus webappv1alpha1.WebAppStatus) error {
patch := client.MergeFrom(webapp.DeepCopy())
webapp.Status = newStatus
return r.Status().Patch(ctx, webapp, patch)
}
// 或者使用 Server-Side Apply
func (r *WebAppReconciler) updateStatusWithSSA(ctx context.Context, webapp *webappv1alpha1.WebApp) error {
webapp.Status.Phase = webappv1alpha1.PhaseRunning
return r.Status().Update(ctx, webapp, client.FieldOwner("webapp-operator"))
}
3. 错误处理与重试策略
func (r *WebAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
webapp := &webappv1alpha1.WebApp{}
if err := r.Get(ctx, req.NamespacedName, webapp); err != nil {
if errors.IsNotFound(err) {
// 资源被删除,不需要重试
return ctrl.Result{}, nil
}
// API 服务器错误,使用指数退避重试
return ctrl.Result{}, err
}
// 业务错误:更新 Status,不重试
if err := r.reconcileResources(ctx, webapp); err != nil {
log.FromContext(ctx).Error(err, "Failed to reconcile")
// 更新 Status 为 Failed
webapp.Status.Phase = webappv1alpha1.PhaseFailed
r.Status().Update(ctx, webapp)
// 返回 nil error,避免无限重试
// 但可以设置 RequeueAfter 来定期检查
return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil
}
// 成功:定期调谐以保持状态一致
return ctrl.Result{RequeueAfter: 10 * time.Minute}, nil
}
调试与故障排查
开发 Operator 时,调试是一项日常任务。以下是几种常见的调试方法:
本地运行调试
# 安装 CRD 到本地集群
make install
# 本地运行 Operator(可以看到实时日志)
make run ENABLE_WEBHOOKS=false
# 在另一个终端创建测试资源
kubectl apply -f config/samples/webapp_v1alpha1_webapp.yaml
# 观察 Operator 的输出
# 2025-06-20T10:45:00+08:00 INFO Reconciling WebApp {"namespace": "default", "name": "my-webapp"}
# 2025-06-20T10:45:00+08:00 INFO Deployment reconciled {"operation": "created"}
# 2025-06-20T10:45:00+08:00 INFO Service reconciled {"operation": "created"}
使用 Telepresence 进行远程调试
当问题只能在集群环境中复现时,Telepresence 能让你用本地代码替换集群中运行的 Operator:
# 安装 Telepresence
brew install telepresence
# 连接到集群
telepresence connect
# 拦截 Operator 流量
telepresence intercept webapp-operator-controller-manager \
--port 8080:8080 \
--env-file /tmp/operator.env
# 现在本地运行的 Operator 会接收集群中的请求
go run cmd/main.go
常见问题排查清单
| 问题 | 可能原因 | 排查方法 |
|---|---|---|
| Reconcile 没有被触发 | RBAC 权限不足 | 检查 Operator 的 ServiceAccount 权限 |
| Deployment 创建失败 | 资源配额不足 | 查看命名空间的 ResourceQuota |
| Status 不更新 | 使用了错误的 Client | 使用 r.Status().Update() 而不是 r.Update() |
| Webhook 报错 | 证书配置问题 | 检查 cert-manager 是否正常运行 |
| Finalizer 卡住 | 清理逻辑超时 | 增加超时时间或异步执行清理 |
查看 Operator 日志
# 查看 Operator Pod 日志
kubectl logs -n webapp-operator-system \
-l control-plane=controller-manager \
--tail=100 -f
# 查看特定资源的 Events
kubectl describe webapp my-webapp
# Events:
# Type Reason Age From Message
# ---- ------ ---- ---- -------
# Normal Reconciled 5m webapp-operator Successfully reconciled WebApp
# Normal DeploymentReady 4m webapp-operator Deployment is ready
# 查看 RBAC 权限
kubectl auth can-i --list --as=system:serviceaccount:webapp-operator-system:webapp-operator-controller-manager
监控 Operator
Operator 本身也需要监控。controller-runtime 默认集成了 Prometheus 指标:
// 自定义指标
import (
"github.com/prometheus/client_golang/prometheus"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)
var (
reconcileTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "webapp_reconcile_total",
Help: "Total number of reconciliations",
},
[]string{"namespace", "name", "result"},
)
reconcileDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "webapp_reconcile_duration_seconds",
Help: "Duration of reconciliation in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"namespace", "name"},
)
)
func init() {
metrics.Registry.MustRegister(reconcileTotal, reconcileDuration)
}
性能优化建议
当 Operator 管理大量资源时,需要考虑性能优化。以下是一些关键的优化策略:
使用缓存减少 API 调用:通过配置缓存策略,可以显著减少对 Kubernetes API Server 的调用次数。你可以选择只缓存特定命名空间的资源,或者使用标签选择器过滤不需要的资源。
限制并发调谐数量:默认情况下,Controller 会并发处理所有需要调谐的资源。当资源数量很大时,这可能导致 API Server 压力过大。通过设置 MaxConcurrentReconciles 参数,可以控制并发调谐的数量,避免系统过载。
使用 RateLimiter 控制调用频率:Kubernetes 提供了工作队列的速率限制器,可以防止在短时间内对同一资源进行过多的 API 调用。使用指数退避策略(Exponential Backoff),可以在遇到错误时自动增加重试间隔,避免雪崩效应。
优化 Reconcile 逻辑:在 Reconcile 函数中,尽量减少不必要的 API 调用。例如,先检查资源是否需要更新,再执行 Update 操作;使用 Server-Side Apply 代替传统的 Get-Update 模式;批量处理多个资源的更新操作。
监控 Operator 性能:使用 controller-runtime 内置的 Prometheus 指标,监控 Reconcile 的执行时间、队列深度、错误率等关键指标。当发现性能瓶颈时,及时调整并发数、缓存策略或优化代码逻辑。
记住,性能优化是一个持续的过程。随着资源数量的增长,你需要不断调整和优化 Operator 的配置,确保它能够稳定、高效地运行。
总结
恭喜你完成了 Kubernetes Operator 的学习之旅!让我们回顾核心要点:
- CRD 定义:使用 kubebuilder 标记定义资源的 Schema,自动生成校验规则
- Reconciliation Loop:不断检查期望状态和实际状态的差异,执行修复
- OwnerReference:建立资源之间的父子关系,实现级联删除
- Finalizer:在资源删除前执行清理逻辑
- Status Management:通过 Status 和 Condition 反馈资源的健康状态
- Webhook:实现 Mutating(注入默认值)和 Validating(校验)逻辑
- 测试策略:单元测试用 fake client,集成测试用 envtest
- 部署方式:支持本地运行和集群部署
Operator 模式是 Kubernetes 生态的核心扩展机制。掌握了它,你就能让 Kubernetes “认识"任何应用,实现真正的 GitOps 和自动化运维。
希望这篇文章能帮助你掌握 Kubernetes Operator 的开发方法。下一篇,我们将深入领域驱动设计(DDD),看看如何用 Go 构建复杂的业务系统!
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。