leader选举在kubernetes controller中是如何实现的

描述

在 Kubernetes 的 kube-controller-manager , kube-scheduler, 以及使用 Operator 的底层实现 controller-rumtime 都支持高可用系统中的 leader 选举,本文将以理解 controller-rumtime (底层的实现是 client-go) 中的 leader 选举以在 kubernetes controller 中是如何实现的。

Background

在运行 kube-controller-manager 时,是有一些参数提供给 cm 进行 leader 选举使用的,可以参考官方文档提供的 参数来了解相关参数。
 

--leader-elect                               Default: true
--leader-elect-renew-deadline duration       Default: 10s
--leader-elect-resource-lock string          Default: "leases"
--leader-elect-resource-name string       Default: "kube-controller-manager"
--leader-elect-resource-namespace string     Default: "kube-system"
--leader-elect-retry-period duration         Default: 2s
...

本身以为这些组件的选举动作时通过 etcd 进行的,但是后面对 controller-runtime 学习时,发现并没有配置其相关的 etcd 相关参数,这就引起了对选举机制的好奇。怀着这种好奇心搜索了下有关于 kubernetes 的选举,发现官网是这么介绍的,下面是对官方的说明进行一个通俗总结。simple leader election with kubernetes

通过阅读文章得知,kubernetes API 提供了一中选举机制,只要运行在集群内的容器,都是可以实现选举功能的。

Kubernetes API 通过提供了两个属性来完成选举动作的

ResourceVersions:每个 API 对象唯一一个 ResourceVersion

Annotations:每个 API 对象都可以对这些 key 进行注释

注:这种选举会增加 APIServer 的压力。也就对 etcd 会产生影响

那么有了这些信息之后,我们来看一下,在 Kubernetes 集群中,谁是 cm 的 leader(我们提供的集群只有一个节点,所以本节点就是 leader)。

在 Kubernetes 中所有启用了 leader 选举的服务都会生成一个 EndPoint ,在这个 EndPoint 中会有上面提到的 label(Annotations)来标识谁是 leader。

$ kubectl get ep -n kube-system
NAME                      ENDPOINTS   AGE
kube-controller-manager         3d4h
kube-dns                              3d4h
kube-scheduler                  3d4h

这里以 kube-controller-manager 为例,来看下这个 EndPoint 有什么信息

[root@master-machine ~]# kubectl describe ep kube-controller-manager -n kube-system
Name:         kube-controller-manager
Namespace:    kube-system
Labels:       
Annotations:  control-plane.alpha.kubernetes.io/leader:
                {"holderIdentity":"master-machine_06730140-a503-487d-850b-1fe1619f1fe1","leaseDurationSeconds":15,"acquireTime":"2022-06-27T1546Z","re...
Subsets:
Events:
  Type    Reason          Age    From                     Message
  ----    ------          ----   ----                     -------
  Normal  LeaderElection  2d22h  kube-controller-manager  master-machine_76aabcb5-49ff-45ff-bd18-4afa61fbc5af became leader
  Normal  LeaderElection  9m     kube-controller-manager  master-machine_06730140-a503-487d-850b-1fe1619f1fe1 became leader

 

可以看出 Annotations: control-plane.alpha.kubernetes.io/leader: 标出了哪个 node 是 leader。

election in controller-runtime

controller-runtime 有关 leader 选举的部分在 pkg/leaderelection下面,总共 100 行代码,我们来看下做了些什么?

可以看到,这里只提供了创建资源锁的一些选项

type Options struct {
 // 在manager启动时,决定是否进行选举
 LeaderElection bool
 // 使用那种资源锁 默认为租用 lease
 LeaderElectionResourceLock string
 // 选举发生的名称空间
 LeaderElectionNamespace string
 // 该属性将决定持有leader锁资源的名称
 LeaderElectionID string
}

通过 NewResourceLock 可以看到,这里是走的 client-go/tools/leaderelection下面,而这个 leaderelection 也有一个 example来学习如何使用它。

通过 example 可以看到,进入选举的入口是一个 RunOrDie() 的函数

 

// 这里使用了一个lease锁,注释中说愿意为集群中存在lease的监听较少
lock := &resourcelock.LeaseLock{
    LeaseMeta: metav1.ObjectMeta{
        Name:      leaseLockName,
        Namespace: leaseLockNamespace,
    },
    Client: client.CoordinationV1(),
    LockConfig: resourcelock.ResourceLockConfig{
        Identity: id,
    },
}

// 开启选举循环
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
    Lock: lock,
    // 这里必须保证拥有的租约在调用cancel()前终止,否则会仍有一个loop在运行
    ReleaseOnCancel: true,
    LeaseDuration:   60 * time.Second,
    RenewDeadline:   15 * time.Second,
    RetryPeriod:     5 * time.Second,
    Callbacks: leaderelection.LeaderCallbacks{
        OnStartedLeading: func(ctx context.Context) {
            // 这里填写你的代码,
            // usually put your code
            run(ctx)
        },
        OnStoppedLeading: func() {
            // 这里清理你的lease
            klog.Infof("leader lost: %s", id)
            os.Exit(0)
        },
        OnNewLeader: func(identity string) {
            // we're notified when new leader elected
            if identity == id {
                // I just got the lock
                return
            }
            klog.Infof("new leader elected: %s", identity)
        },
    },
})

 

到这里,我们了解了锁的概念和如何启动一个锁,下面看下,client-go 都提供了那些锁。

在代码 tools/leaderelection/resourcelock/interface.go[6] 定义了一个锁抽象,interface 提供了一个通用接口,用于锁定 leader 选举中使用的资源。

type Interface interface {
 // Get 返回选举记录
 Get(ctx context.Context) (*LeaderElectionRecord, []byte, error)

 // Create 创建一个LeaderElectionRecord
 Create(ctx context.Context, ler LeaderElectionRecord) error

 // Update will update and existing LeaderElectionRecord
 Update(ctx context.Context, ler LeaderElectionRecord) error

 // RecordEvent is used to record events
 RecordEvent(string)

 // Identity 返回锁的标识
 Identity() string

 // Describe is used to convert details on current resource lock into a string
 Describe() string
}

那么实现这个抽象接口的就是,实现的资源锁,我们可以看到,client-go 提供了四种资源锁

leaselock

configmaplock

multilock

endpointlock

leaselock

Lease 是 kubernetes 控制平面中的通过 ETCD 来实现的一个 Leases 的资源,主要为了提供分布式租约的一种控制机制。相关对这个 API 的描述可以参考于:Lease 。

在 Kubernetes 集群中,我们可以使用如下命令来查看对应的 lease

 

$ kubectl get leases -A
NAMESPACE         NAME                      HOLDER                                                AGE
kube-node-lease   master-machine            master-machine                                        3d19h
kube-system       kube-controller-manager   master-machine_06730140-a503-487d-850b-1fe1619f1fe1   3d19h
kube-system       kube-scheduler            master-machine_1724e2d9-c19c-48d7-ae47-ee4217b27073   3d19h

$ kubectl describe leases kube-controller-manager -n kube-system
Name:         kube-controller-manager
Namespace:    kube-system
Labels:       
Annotations:  
API Version:  coordination.k8s.io/v1
Kind:         Lease
Metadata:
  Creation Timestamp:  2022-06-24T1151Z
  Managed Fields:
    API Version:  coordination.k8s.io/v1
    Fields Type:  FieldsV1
    fieldsV1:
      f
        f
        f
        f
        f
        f
    Manager:         kube-controller-manager
    Operation:       Update
    Time:            2022-06-24T1151Z
  Resource Version:  56012
  Self Link:         /apis/coordination.k8s.io/v1/namespaces/kube-system/leases/kube-controller-manager
  UID:               851a32d2-25dc-49b6-a3f7-7a76f152f071
Spec:
  Acquire Time:            2022-06-27T1546.000000Z
  Holder Identity:         master-machine_06730140-a503-487d-850b-1fe1619f1fe1
  Lease Duration Seconds:  15
  Lease Transitions:       2
  Renew Time:              2022-06-28T0626.837773Z
Events:                    

 

下面来看下 leaselock 的实现,leaselock 会实现了作为资源锁的抽象

 

type LeaseLock struct {
 // LeaseMeta 就是类似于其他资源类型的属性,包含name ns 以及其他关于lease的属性
 LeaseMeta  metav1.ObjectMeta
 Client     coordinationv1client.LeasesGetter // Client 就是提供了informer中的功能
 // lockconfig包含上面通过 describe 看到的 Identity与recoder用于记录资源锁的更改
    LockConfig ResourceLockConfig
    // lease 就是 API中的Lease资源,可以参考下上面给出的这个API的使用
 lease      *coordinationv1.Lease
}

 

下面来看下 leaselock 实现了那些方法?

Get

Get是从 spec 中返回选举的记录

 

func (ll *LeaseLock) Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) {
 var err error
 ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ctx, ll.LeaseMeta.Name, metav1.GetOptions{})
 if err != nil {
  return nil, nil, err
 }
 record := LeaseSpecToLeaderElectionRecord(&ll.lease.Spec)
 recordByte, err := json.Marshal(*record)
 if err != nil {
  return nil, nil, err
 }
 return record, recordByte, nil
}

// 可以看出是返回这个资源spec里面填充的值
func LeaseSpecToLeaderElectionRecord(spec *coordinationv1.LeaseSpec) *LeaderElectionRecord {
 var r LeaderElectionRecord
 if spec.HolderIdentity != nil {
  r.HolderIdentity = *spec.HolderIdentity
 }
 if spec.LeaseDurationSeconds != nil {
  r.LeaseDurationSeconds = int(*spec.LeaseDurationSeconds)
 }
 if spec.LeaseTransitions != nil {
  r.LeaderTransitions = int(*spec.LeaseTransitions)
 }
 if spec.AcquireTime != nil {
  r.AcquireTime = metav1.Time{spec.AcquireTime.Time}
 }
 if spec.RenewTime != nil {
  r.RenewTime = metav1.Time{spec.RenewTime.Time}
 }
 return &r
}

 

Create

Create是在 kubernetes 集群中尝试去创建一个租约,可以看到,Client 就是 API 提供的对应资源的 REST 客户端,结果会在 Kubernetes 集群中创建这个 Lease

 

func (ll *LeaseLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
 var err error
 ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Create(ctx, &coordinationv1.Lease{
  ObjectMeta: metav1.ObjectMeta{
   Name:      ll.LeaseMeta.Name,
   Namespace: ll.LeaseMeta.Namespace,
  },
  Spec: LeaderElectionRecordToLeaseSpec(&ler),
 }, metav1.CreateOptions{})
 return err
}

 

Update

Update是更新 Lease 的 spec

 

func (ll *LeaseLock) Update(ctx context.Context, ler LeaderElectionRecord) error {
 if ll.lease == nil {
  return errors.New("lease not initialized, call get or create first")
 }
 ll.lease.Spec = LeaderElectionRecordToLeaseSpec(&ler)

 lease, err := ll.Client.Leases(ll.LeaseMeta.Namespace).Update(ctx, ll.lease, metav1.UpdateOptions{})
 if err != nil {
  return err
 }

 ll.lease = lease
 return nil
}

 

RecordEvent

RecordEvent是记录选举时出现的事件,这时候我们回到上部分 在 kubernetes 集群中查看 ep 的信息时可以看到的 event 中存在 became leader 的事件,这里就是将产生的这个 event 添加到 meta-data 中。

 

func (ll *LeaseLock) RecordEvent(s string) {
   if ll.LockConfig.EventRecorder == nil {
      return
   }
   events := fmt.Sprintf("%v %v", ll.LockConfig.Identity, s)
   subject := &coordinationv1.Lease{ObjectMeta: ll.lease.ObjectMeta}
   // Populate the type meta, so we don't have to get it from the schema
   subject.Kind = "Lease"
   subject.APIVersion = coordinationv1.SchemeGroupVersion.String()
   ll.LockConfig.EventRecorder.Eventf(subject, corev1.EventTypeNormal, "LeaderElection", events)
}

 

到这里大致上了解了资源锁究竟是什么了,其他种类的资源锁也是相同的实现的方式,这里就不过多阐述了;下面的我们来看看选举的过程。

election workflow

选举的代码入口是在 leaderelection.go,这里会继续上面的 example 向下分析整个选举的过程。

前面我们看到了进入选举的入口是一个 RunOrDie()的函数,那么就继续从这里开始来了解。进入 RunOrDie,看到其实只有几行而已,大致上了解到了 RunOrDie 会使用提供的配置来启动选举的客户端,之后会阻塞,直到 ctx 退出,或停止持有 leader 的租约。

 

func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
 le, err := NewLeaderElector(lec)
 if err != nil {
  panic(err)
 }
 if lec.WatchDog != nil {
  lec.WatchDog.SetLeaderElection(le)
 }
 le.Run(ctx)
}

 

下面看下 NewLeaderElector做了些什么?可以看到,LeaderElector 是一个结构体,这里只是创建他,这个结构体提供了我们选举中所需要的一切(LeaderElector 就是 RunOrDie 创建的选举客户端)。

 

func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) {
 if lec.LeaseDuration <= lec.RenewDeadline {
  return nil, fmt.Errorf("leaseDuration must be greater than renewDeadline")
 }
 if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) {
  return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor")
 }
 if lec.LeaseDuration < 1 {
  return nil, fmt.Errorf("leaseDuration must be greater than zero")
 }
 if lec.RenewDeadline < 1 {
  return nil, fmt.Errorf("renewDeadline must be greater than zero")
 }
 if lec.RetryPeriod < 1 {
  return nil, fmt.Errorf("retryPeriod must be greater than zero")
 }
 if lec.Callbacks.OnStartedLeading == nil {
  return nil, fmt.Errorf("OnStartedLeading callback must not be nil")
 }
 if lec.Callbacks.OnStoppedLeading == nil {
  return nil, fmt.Errorf("OnStoppedLeading callback must not be nil")
 }

 if lec.Lock == nil {
  return nil, fmt.Errorf("Lock must not be nil.")
 }
 le := LeaderElector{
  config:  lec,
  clock:   clock.RealClock{},
  metrics: globalMetricsFactory.newLeaderMetrics(),
 }
 le.metrics.leaderOff(le.config.Name)
 return &le, nil
}

 

LeaderElector是建立的选举客户端,

 

type LeaderElector struct {
 config LeaderElectionConfig // 这个的配置,包含一些时间参数,健康检查
 // recoder相关属性
 observedRecord    rl.LeaderElectionRecord
 observedRawRecord []byte
 observedTime      time.Time
 // used to implement OnNewLeader(), may lag slightly from the
 // value observedRecord.HolderIdentity if the transition has
 // not yet been reported.
 reportedLeader string
 // clock is wrapper around time to allow for less flaky testing
 clock clock.Clock
 // 锁定 observedRecord
 observedRecordLock sync.Mutex
 metrics leaderMetricsAdapter
}

 

可以看到 Run 实现的选举逻辑就是在初始化客户端时传入的 三个 callback

 

func (le *LeaderElector) Run(ctx context.Context) {
 defer runtime.HandleCrash()
 defer func() { // 退出时执行callbacke的OnStoppedLeading
  le.config.Callbacks.OnStoppedLeading()
 }()

 if !le.acquire(ctx) {
  return
 }
 ctx, cancel := context.WithCancel(ctx)
 defer cancel()
 go le.config.Callbacks.OnStartedLeading(ctx) // 选举时,执行 OnStartedLeading
 le.renew(ctx)
}

 

在 Run 中调用了 acquire,这个是 通过一个 loop 去调用 tryAcquireOrRenew,直到 ctx 传递过来结束信号

 

func (le *LeaderElector) acquire(ctx context.Context) bool {
 ctx, cancel := context.WithCancel(ctx)
 defer cancel()
 succeeded := false
 desc := le.config.Lock.Describe()
 klog.Infof("attempting to acquire leader lease %v...", desc)
    // jitterUntil是执行定时的函数 func() 是定时任务的逻辑
    // RetryPeriod是周期间隔
    // JitterFactor 是重试系数,类似于延迟队列中的系数 (duration + maxFactor * duration)
    // sliding 逻辑是否计算在时间内
    // 上下文传递
 wait.JitterUntil(func() {
  succeeded = le.tryAcquireOrRenew(ctx)
  le.maybeReportTransition()
  if !succeeded {
   klog.V(4).Infof("failed to acquire lease %v", desc)
   return
  }
  le.config.Lock.RecordEvent("became leader")
  le.metrics.leaderOn(le.config.Name)
  klog.Infof("successfully acquired lease %v", desc)
  cancel()
 }, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
 return succeeded
}

 

这里实际上选举动作在 tryAcquireOrRenew 中,下面来看下 tryAcquireOrRenew;tryAcquireOrRenew 是尝试获得一个 leader 租约,如果已经获得到了,则更新租约;否则可以得到租约则为 true,反之 false

 

func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
 now := metav1.Now() // 时间
 leaderElectionRecord := rl.LeaderElectionRecord{ // 构建一个选举record
  HolderIdentity:       le.config.Lock.Identity(), // 选举人的身份特征,ep与主机名有关
  LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second), // 默认15s
  RenewTime:            now, // 重新获取时间
  AcquireTime:          now, // 获得时间
 }

 // 1. 从API获取或创建一个recode,如果可以拿到则已经有租约,反之创建新租约
 oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
 if err != nil {
  if !errors.IsNotFound(err) {
   klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
   return false
  }
  // 创建租约的动作就是新建一个对应的resource,这个lock就是leaderelection提供的四种锁,
  // 看你在runOrDie中初始化传入了什么锁
  if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
   klog.Errorf("error initially creating leader election record: %v", err)
   return false
  }
  // 到了这里就已经拿到或者创建了租约,然后记录其一些属性,LeaderElectionRecord
  le.setObservedRecord(&leaderElectionRecord)

  return true
 }

 // 2. 获取记录检查身份和时间
 if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
  le.setObservedRecord(oldLeaderElectionRecord)

  le.observedRawRecord = oldLeaderElectionRawRecord
 }
 if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
  le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
  !le.IsLeader() { // 不是leader,进行HolderIdentity比较,再加上时间,这个时候没有到竞选其,跳出
  klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
  return false
 }

 // 3.我们将尝试更新。 在这里leaderElectionRecord设置为默认值。让我们在更新之前更正它。
 if le.IsLeader() { // 到这就说明是leader,修正他的时间
  leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
  leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
 } else { // LeaderTransitions 就是指leader调整(转变为其他)了几次,如果是,
  // 则为发生转变,保持原有值
  // 反之,则+1
  leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
 }
 // 完事之后更新APIServer中的锁资源,也就是更新对应的资源的属性信息
 if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
  klog.Errorf("Failed to update lock: %v", err)
  return false
 }
 // setObservedRecord 是通过一个新的record来更新这个锁中的record
 // 操作是安全的,会上锁保证临界区仅可以被一个线程/进程操作
 le.setObservedRecord(&leaderElectionRecord)
 return true
}

 

到这里,已经完整知道利用 kubernetes 进行选举的流程都是什么了;下面简单回顾下,上述 leader 选举所有的步骤:

首选创建的服务就是该服务的 leader,锁可以为 lease , endpoint 等资源进行上锁

已经是 leader 的实例会不断续租,租约的默认值是 15 秒 (leaseDuration);leader 在租约满时更新租约时间(renewTime)。

其他的 follower,会不断检查对应资源锁的存在,如果已经有 leader,那么则检查 renewTime,如果超过了租用时间(),则表明 leader 存在问题需要重新启动选举,直到有 follower 提升为 leader。

而为了避免资源被抢占,Kubernetes API 使用了 ResourceVersion 来避免被重复修改(如果版本号与请求版本号不一致,则表示已经被修改了,那么 APIServer 将返回错误)

利用 Leader 机制实现 HA 应用

下面就通过一个 example 来实现一个,利用 kubernetes 提供的选举机制完成的高可用应用。

代码实现

如果仅仅是使用 Kubernetes 中的锁,实现的代码也只有几行而已。

 

package main

import (
 "context"
 "flag"
 "fmt"
 "os"
 "os/signal"
 "syscall"
 "time"

 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 clientset "k8s.io/client-go/kubernetes"
 "k8s.io/client-go/rest"
 "k8s.io/client-go/tools/clientcmd"
 "k8s.io/client-go/tools/leaderelection"
 "k8s.io/client-go/tools/leaderelection/resourcelock"
 "k8s.io/klog/v2"
)

func buildConfig(kubeconfig string) (*rest.Config, error) {
 if kubeconfig != "" {
  cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
  if err != nil {
   return nil, err
  }
  return cfg, nil
 }

 cfg, err := rest.InClusterConfig()
 if err != nil {
  return nil, err
 }
 return cfg, nil
}

func main() {
 klog.InitFlags(nil)

 var kubeconfig string
 var leaseLockName string
 var leaseLockNamespace string
 var id string
 // 初始化客户端的部分
 flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
 flag.StringVar(&id, "id", "", "the holder identity name")
 flag.StringVar(&leaseLockName, "lease-lock-name", "", "the lease lock resource name")
 flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "", "the lease lock resource namespace")
 flag.Parse()

 if leaseLockName == "" {
  klog.Fatal("unable to get lease lock resource name (missing lease-lock-name flag).")
 }
 if leaseLockNamespace == "" {
  klog.Fatal("unable to get lease lock resource namespace (missing lease-lock-namespace flag).")
 }
 config, err := buildConfig(kubeconfig)
 if err != nil {
  klog.Fatal(err)
 }
 client := clientset.NewForConfigOrDie(config)

 run := func(ctx context.Context) {
  // 实现的业务逻辑,这里仅仅为实验,就直接打印了
  klog.Info("Controller loop...")

  for {
   fmt.Println("I am leader, I was working.")
   time.Sleep(time.Second * 5)
  }
 }

 // use a Go context so we can tell the leaderelection code when we
 // want to step down
 ctx, cancel := context.WithCancel(context.Background())
 defer cancel()

 // 监听系统中断
 ch := make(chan os.Signal, 1)
 signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
 go func() {
  <-ch
  klog.Info("Received termination, signaling shutdown")
  cancel()
 }()

 // 创建一个资源锁
 lock := &resourcelock.LeaseLock{
  LeaseMeta: metav1.ObjectMeta{
   Name:      leaseLockName,
   Namespace: leaseLockNamespace,
  },
  Client: client.CoordinationV1(),
  LockConfig: resourcelock.ResourceLockConfig{
   Identity: id,
  },
 }

 // 开启一个选举的循环
 leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
  Lock:            lock,
  ReleaseOnCancel: true,
  LeaseDuration:   60 * time.Second,
  RenewDeadline:   15 * time.Second,
  RetryPeriod:     5 * time.Second,
  Callbacks: leaderelection.LeaderCallbacks{
   OnStartedLeading: func(ctx context.Context) {
    // 当选举为leader后所运行的业务逻辑
    run(ctx)
   },
   OnStoppedLeading: func() {
    // we can do cleanup here
    klog.Infof("leader lost: %s", id)
    os.Exit(0)
   },
   OnNewLeader: func(identity string) { // 申请一个选举时的动作
    if identity == id {
     return
    }
    klog.Infof("new leader elected: %s", identity)
   },
  },
 })
}

 

注:这种 lease 锁只能在 in-cluster 模式下运行,如果需要类似二进制部署的程序,可以选择 endpoint 类型的资源锁。

生成镜像

这里已经制作好了镜像并上传到 dockerhub(cylonchau/leaderelection:v0.0.2)上了,如果只要学习运行原理,则忽略此步骤

 

FROM golang:alpine AS builder
MAINTAINER cylon
WORKDIR /election
COPY . /election
ENV GOPROXY https://goproxy.cn,direct
RUN GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o elector main.go

FROM alpine AS runner
WORKDIR /go/elector
COPY --from=builder /election/elector .
VOLUME ["/election"]
ENTRYPOINT ["./elector"]

 

准备资源清单

默认情况下,Kubernetes 运行的 pod 在请求 Kubernetes 集群内资源时,默认的账户是没有权限的,默认服务帐户无权访问协调  API,因此我们需要创建另一个 serviceaccount 并相应地设置  对应的 RBAC 权限绑定;在清单中配置上这个 sa,此时所有的 pod 就会有协调锁的权限了。

 

apiVersion: v1
kind: ServiceAccount
metadata:
  name: sa-leaderelection
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: leaderelection
rules:
  - apiGroups:
      - coordination.k8s.io
    resources:
      - leases
    verbs:
      - '*'
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: leaderelection
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: leaderelection
subjects:
  - kind: ServiceAccount
    name: sa-leaderelection
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: leaderelection
  name: leaderelection
  namespace: default
spec:
  replicas: 3
  selector:
    matchLabels:
      app: leaderelection
  template:
    metadata:
      labels:
        app: leaderelection
    spec:
      containers:
        - image: cylonchau/leaderelection:v0.0.2
          imagePullPolicy: IfNotPresent
          command: ["./elector"]
          args:
          - "-id=$(POD_NAME)"
          - "-lease-lock-name=test"
          - "-lease-lock-namespace=default"
          env:
          - name: POD_NAME
            valueFrom:
              fieldRef:
                apiVersion: v1
                fieldPath: metadata.name
          name: elector
      serviceAccountName: sa-leaderelection

 

集群中运行

执行完清单后,当 pod 启动后,可以看到会创建出一个 lease。

 

$ kubectl get lease
NAME   HOLDER                            AGE
test   leaderelection-5644c5f84f-frs5n   1s


$ kubectl describe lease
Name:         test
Namespace:    default
Labels:       
Annotations:  
API Version:  coordination.k8s.io/v1
Kind:         Lease
Metadata:
  Creation Timestamp:  2022-06-28T1645Z
  Managed Fields:
    API Version:  coordination.k8s.io/v1
    Fields Type:  FieldsV1
    fieldsV1:
      f
        f
        f
        f
        f
        f
    Manager:         elector
    Operation:       Update
    Time:            2022-06-28T1645Z
  Resource Version:  131693
  Self Link:         /apis/coordination.k8s.io/v1/namespaces/default/leases/test
  UID:               bef2b164-a117-44bd-bad3-3e651c94c97b
Spec:
  Acquire Time:            2022-06-28T1645.931873Z
  Holder Identity:         leaderelection-5644c5f84f-frs5n
  Lease Duration Seconds:  60
  Lease Transitions:       0
  Renew Time:              2022-06-28T1655.963537Z
Events:                    

 

通过其持有者的信息查看对应 pod(因为程序中对 holder Identity 设置的是 pod 的名称),实际上是工作的 pod。

如上实例所述,这是利用 Kubernetes 集群完成的 leader 选举的方案,虽然这不是最完美解决方案,但这是一种简单的方法,因为可以无需在集群上部署更多东西或者进行大量的代码工作就可以利用 Kubernetes 集群来实现一个高可用的 HA 应用。


审核编辑:刘清

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分