博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kubernetes源码阅读笔记——Controller Manager(之二)
阅读量:4883 次
发布时间:2019-06-11

本文共 19698 字,大约阅读时间需要 65 分钟。

上一篇文章中,我们看到了Controller Manager的基本运行逻辑,但是还有一些问题没有解决,我们将在本篇文章中进行分析。

一、ListAndWatch

首先是Informer。上一篇中写道,启动Informer本质上是调用了controller的reflector的Run方法。下面我们进入reflector的Run方法看看:

k8s.io/cient-go/tools/cache/reflector.gofunc (r *Reflector) Run(stopCh <-chan struct{}) {    glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)    wait.Until(func() {        if err := r.ListAndWatch(stopCh); err != nil {            utilruntime.HandleError(err)        }    }, r.period, stopCh)}

可以看到,方法通过调用wait.Until,每过period时间段就执行一次ListAndWatch方法。

进入ListAndWatch方法:

k8s.io/client-go/tools/cache/reflector.go// ListAndWatch first lists all items and get the resource version at the moment of call,// and then use the resource version to watch.// It returns error if ListAndWatch didn't even try to initialize watch.func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {    glog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)    var resourceVersion string    // Explicitly set "0" as resource version - it's fine for the List()    // to be served from cache and potentially be delayed relative to    // etcd contents. Reflector framework will catch up via Watch() eventually.    options := metav1.ListOptions{ResourceVersion: "0"}    r.metrics.numberOfLists.Inc()    start := r.clock.Now()    list, err := r.listerWatcher.List(options)    if err != nil {        return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)    }    r.metrics.listDuration.Observe(time.Since(start).Seconds())    listMetaInterface, err := meta.ListAccessor(list)    if err != nil {        return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)    }    resourceVersion = listMetaInterface.GetResourceVersion()    items, err := meta.ExtractList(list)    if err != nil {        return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)    }    r.metrics.numberOfItemsInList.Observe(float64(len(items)))    if err := r.syncWith(items, resourceVersion); err != nil {        return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)    }    r.setLastSyncResourceVersion(resourceVersion)    resyncerrc := make(chan error, 1)    cancelCh := make(chan struct{})    defer close(cancelCh)    go func() {        resyncCh, cleanup := r.resyncChan()        defer func() {            cleanup() // Call the last one written into cleanup        }()        for {            select {            case <-resyncCh:            case <-stopCh:                return            case <-cancelCh:                return            }            if r.ShouldResync == nil || r.ShouldResync() {                glog.V(4).Infof("%s: forcing resync", r.name)                if err := r.store.Resync(); err != nil {                    resyncerrc <- err                    return                }            }            cleanup()            resyncCh, cleanup = r.resyncChan()        }    }()    for {        // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors        select {        case <-stopCh:            return nil        default:        }        timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))        options = metav1.ListOptions{            ResourceVersion: resourceVersion,            // We want to avoid situations of hanging watchers. Stop any wachers that do not            // receive any events within the timeout window.            TimeoutSeconds: &timeoutSeconds,        }        r.metrics.numberOfWatches.Inc()        w, err := r.listerWatcher.Watch(options)        if err != nil {            switch err {            case io.EOF:                // watch closed normally            case io.ErrUnexpectedEOF:                glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)            default:                utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))            }            // If this is "connection refused" error, it means that most likely apiserver is not responsive.            // It doesn't make sense to re-list all objects because most likely we will be able to restart            // watch where we ended.            // If that's the case wait and resend watch request.            if urlError, ok := err.(*url.Error); ok {                if opError, ok := urlError.Err.(*net.OpError); ok {                    if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {                        time.Sleep(time.Second)                        continue                    }                }            }            return nil        }        if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {            if err != errorStopRequested {                glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)            }            return nil        }    }}

这个方法的注释已经写明。首先,用默认的ResourceVersion “0” 调用List方法,列出资源,并调用ListAccessor方法进行验证。

其次,调用GetResourceVersion方法取得资源的实际的ResourceVersion,并调用syncWith方法进行同步。

再次,用go func()后面的一大段代码处理reflector缓存的同步。

最后,在一个for循环里调用Watch方法,对资源的变化进行watch,并调用watchHandler方法对变化进行相应处理。

Watch方法本质上是在调用ListWatch的WatchFunc。这一字段是在创建具体的Controller对象时向informer添加的,后面会详细介绍。总之,Watch的本质与用户通过client-go工具连接API Server是一样的。

进入watchHandler方法:

k8s.io/client-go/tools/cache/reflector.go// watchHandler watches w and keeps *resourceVersion up to date.func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {    start := r.clock.Now()    eventCount := 0    defer w.Stop()    // update metrics    ...loop:    for {        select {        case <-stopCh:            return errorStopRequested        case err := <-errc:            return err        case event, ok := <-w.ResultChan():            if !ok {                break loop            }            if event.Type == watch.Error {                return apierrs.FromObject(event.Object)            }            if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {                utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))                continue            }            meta, err := meta.Accessor(event.Object)            if err != nil {                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))                continue            }            newResourceVersion := meta.GetResourceVersion()            switch event.Type {            case watch.Added:                err := r.store.Add(event.Object)                if err != nil {                    utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))                }            case watch.Modified:                err := r.store.Update(event.Object)                if err != nil {                    utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))                }            case watch.Deleted:                // TODO: Will any consumers need access to the "last known                // state", which is passed in event.Object? If so, may need                // to change this.                err := r.store.Delete(event.Object)                if err != nil {                    utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))                }            default:                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))            }            *resourceVersion = newResourceVersion            r.setLastSyncResourceVersion(newResourceVersion)            eventCount++        }    }    watchDuration := r.clock.Now().Sub(start)    ...    return nil}

 

这个方法中,最重要的是中间的case选择。对于watch到的结果,按照Added、Modified、Deleted分别调用Add、Update、Delete方法,在缓存中进行更新。

Informer的大致逻辑就是这样,通过list-watch机制,与API Server建立连接,监听资源的变化,在缓存中进行更新。

二、StartControllers

上一篇文章中提到,Controller Manager通过在StartControllers方法中调用启动函数来启动所有的Controller。我们先来看看StartControllers方法:

cmd/kube-controller-manager/app/controllermanager.gofunc StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error {    ...    for controllerName, initFn := range controllers {    if !ctx.IsControllerEnabled(controllerName) {        klog.Warningf("%q is disabled", controllerName)        continue    }    time.Sleep(wait.Jitter(ctx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))    klog.V(1).Infof("Starting %q", controllerName)    debugHandler, started, err := initFn(ctx)    if err != nil {        klog.Errorf("Error starting %q", controllerName)        return err    }    if !started {        klog.Warningf("Skipping %q", controllerName)        continue    }    ...    return nil}

我们看到,方法通过对controllers中每个元素执行各自的initFn来执行启动函数,因为每个Controller都是以[string]function的格式保存在map中的,所以直接执行自身的function即可。

下面我们进入一个Controller的启动函数看看。以deployment为例。从NewControllerInitializers中找到controllers["deployment"]=startDeploymentController,进入startDeploymentController,可以看到方法位于app/apps.go中:

cmd/kube-controller-manager/app/apps.gofunc startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {    if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {    return nil, false, nil    }    dc, err := deployment.NewDeploymentController(    ctx.InformerFactory.Apps().V1().Deployments(),    ctx.InformerFactory.Apps().V1().ReplicaSets(),    ctx.InformerFactory.Core().V1().Pods(),    ctx.ClientBuilder.ClientOrDie("deployment-controller"),    )    if err != nil {    return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)    }    go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)    return nil, true, nil}

我们看到,方法调用了NewDeploymentController方法创建一个Deployment,并通过GO协程调用Run方法运行它。

我们先来看一下NewDeploymentController方法。

三、NewDeploymentController

pkg/controller/deployment/deployment_controller.gofunc NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {    eventBroadcaster := record.NewBroadcaster()    eventBroadcaster.StartLogging(klog.Infof)    eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})    if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {    if err := metrics.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter()); err != nil {        return nil, err    }    }    dc := &DeploymentController{    client:        client,    eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),    queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),    }    dc.rsControl = controller.RealRSControl{    KubeClient: client,    Recorder:   dc.eventRecorder,    }    dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{    AddFunc:    dc.addDeployment,    UpdateFunc: dc.updateDeployment,    // This will enter the sync loop and no-op, because the deployment has been deleted from the store.    DeleteFunc: dc.deleteDeployment,    })    rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{    AddFunc:    dc.addReplicaSet,    UpdateFunc: dc.updateReplicaSet,    DeleteFunc: dc.deleteReplicaSet,    })    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{    DeleteFunc: dc.deletePod,    })    dc.syncHandler = dc.syncDeployment    dc.enqueueDeployment = dc.enqueue    dc.dLister = dInformer.Lister()    dc.rsLister = rsInformer.Lister()    dc.podLister = podInformer.Lister()    dc.dListerSynced = dInformer.Informer().HasSynced    dc.rsListerSynced = rsInformer.Informer().HasSynced    dc.podListerSynced = podInformer.Informer().HasSynced    return dc, nil}

我们看到,这个方法大致做了三件事。

(1)创建了一个Broadcaster,用于做kubernetes中event资源相关的处理。

(2)创建DeploymentController

我们知道,Deployment并不直接操作集群中的pod,而是通过操作ReplicaSet来间接操作pod,因此我们可以看到,DeploymentController结构体中也包含rsControl这一字段,用于操作ReplicaSet。而queue字段则维护了一个deployment的队列,用于将需要更新状态的deployment元素存入,后面会讲到。

(3)添加回调函数

Informer的AddEventHandler方法为deployment controller的所有informer添加了回调函数,对deployment和ReplicaSet的添加、更新、删除进行相应的处理。

下面我们详细分析一下添加回调函数操作,这里分成三部分说,一是Informer的具体创建,二是函数被调用的逻辑,三是回调函数本身。

四、Informer()

在Deployment Controller中包含了deployment、replicaset、pod三个informer,都是各自的Informer()方法创建各自的informer。

以方法中dInformer.Informer()方法为例。方法本质上调用了InformerFor方法:

k8s.io/client-go/informers/apps/v1/deployment.gofunc (f *deploymentInformer) Informer() cache.SharedIndexInformer {	return f.factory.InformerFor(&apps_v1.Deployment{}, f.defaultInformer)}

InformerFor方法比较直观,就是调用defaultInformer方法,将生成的informer存入factory的informer字段中。

defaultInformer方法则是调用了NewFilteredDeploymentInformer方法:

k8s.io/client-go/informers/apps/v1/deployment.go
func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {	return cache.NewSharedIndexInformer(		&cache.ListWatch{			ListFunc: func(options meta_v1.ListOptions) (runtime.Object, error) {				if tweakListOptions != nil {					tweakListOptions(&options)				}				return client.AppsV1().Deployments(namespace).List(options)			},			WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) {				if tweakListOptions != nil {					tweakListOptions(&options)				}				return client.AppsV1().Deployments(namespace).Watch(options)			},		},		&apps_v1.Deployment{},		resyncPeriod,		indexers,	)}

这个方法直接就返回了一个NewSharedIndexInformer结构体。可以看到,这个结构体内部,定义了这个Informer的List和Watch函数,与一般用户通过client-go连接API Server并无二致。

五、AddEventHandler

首先看调用的逻辑。AddEventHandler方法除了为Informer添加相应的回调函数外,还描述了这些回调函数如何被调用。AddEventHandler方法只有一行,即调用AddEventHandlerWithResyncPeriod方法。

进入AddEventHandlerWithResyncPeriod方法:

k8s.io/client-go/tools/cache/shared_informer.gofunc (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {    s.startedLock.Lock()    defer s.startedLock.Unlock()    if s.stopped {        glog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)        return    }    if resyncPeriod > 0 {          ...    }    listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)    if !s.started {        s.processor.addListener(listener)        return    }    // in order to safely join, we have to    // 1. stop sending add/update/delete notifications    // 2. do a list against the store    // 3. send synthetic "Add" events to the new handler    // 4. unblock    s.blockDeltas.Lock()    defer s.blockDeltas.Unlock()    s.processor.addListener(listener)    for _, item := range s.indexer.List() {        listener.add(addNotification{newObj: item})    }}

我们看到,这里引入了一个listener的概念。listener是informer用于获取缓存并处理的机制。通过调用addListener方法,为informer添加并运行listener。

六、addListener

listener的添加和运行都通过addListener方法进行。进入addListener方法:

k8s.io/client-go/tools/cache/shared_informer.gofunc (p *sharedProcessor) addListener(listener *processorListener) {	p.listenersLock.Lock()	defer p.listenersLock.Unlock()	p.addListenerLocked(listener)	if p.listenersStarted {		p.wg.Start(listener.run)		p.wg.Start(listener.pop)	}}

addListener方法通过goroutine的方式,调用listener的run和pop方法,运行listener。

分别进入run和pop方法:

k8s.io/client-go/tools/cache/shared_informer.gofunc (p *processorListener) run() {    stopCh := make(chan struct{})    wait.Until(func() {        // this gives us a few quick retries before a long pause and then a few more quick retries        err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {            for next := range p.nextCh {                switch notification := next.(type) {                case updateNotification:                    p.handler.OnUpdate(notification.oldObj, notification.newObj)                case addNotification:                    p.handler.OnAdd(notification.newObj)                case deleteNotification:                    p.handler.OnDelete(notification.oldObj)                default:                    utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))                }            }            // the only way to get here is if the p.nextCh is empty and closed            return true, nil        })        // the only way to get here is if the p.nextCh is empty and closed        if err == nil {            close(stopCh)        }    }, 1*time.Minute, stopCh)}

 

k8s.io/client-go/tools/cache/shared_informer.gofunc (p *processorListener) pop() {    defer utilruntime.HandleCrash()    defer close(p.nextCh) // Tell .run() to stop    var nextCh chan<- interface{}    var notification interface{}    for {        select {        case nextCh <- notification:            // Notification dispatched            var ok bool            notification, ok = p.pendingNotifications.ReadOne()            if !ok { // Nothing to pop                nextCh = nil // Disable this select case            }        case notificationToAdd, ok := <-p.addCh:            if !ok {                return            }            if notification == nil { // No notification to pop (and pendingNotifications is empty)                // Optimize the case - skip adding to pendingNotifications                notification = notificationToAdd                nextCh = p.nextCh            } else { // There is already a notification waiting to be dispatched                p.pendingNotifications.WriteOne(notificationToAdd)            }        }    }}

pop方法用到好几个channel,本质上仍是从channel中取出一个notification,并通过run方法进行处理。可以看到,在run方法中有对于notification的类型选择,并分别调用OnUpdate、OnAdd、OnDelete方法进行处理,而这三个方法则分别调用了前面AddEventHandler方法中传进来的参数,即:

pkg/controller/deployment/deployment_controller.go func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {    ...    dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{	AddFunc:    dc.addDeployment,	UpdateFunc: dc.updateDeployment,	DeleteFunc: dc.deleteDeployment,    })    rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{	AddFunc:    dc.addReplicaSet,	UpdateFunc: dc.updateReplicaSet,	DeleteFunc: dc.deleteReplicaSet,    })    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{	DeleteFunc: dc.deletePod,    })    ...}

那么这些notification从何而来呢?答案是,通过listener的add方法添加,即AddEventHandlerWithResyncPeriod方法的最后一句:

k8s.io/client-go/tools/cache/shared_informer.gofunc (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {        ...	for _, item := range s.indexer.List() {		listener.add(addNotification{newObj: item})	}}

add方法只有一行,就是将参数传入listener的channel中。而参数则是通过informer的List方法,从informer的缓存中获得。

至此,informer的逻辑就很清晰了。首先通过ListAndWatch方法,将watch到的资源状态的变化存入缓存。之后,在启动controller后,通过factory生成相应的informer,并通过AddEventHandler方法添加相应的回调函数。AddEventHandler方法会为informer添加listener,用于将缓存中的内容取出,根据其状态调用相应的回调函数进行处理。

剩下的内容,将在下一篇文章中分析。下一篇文章链接

转载于:https://www.cnblogs.com/00986014w/p/10273738.html

你可能感兴趣的文章
2017.11.18 手把手教你学51单片机-点亮LED
查看>>
xml的创建与解析
查看>>
grep不区分大小写查找字符串方法
查看>>
linux系统灵活运用灯[android课程3]
查看>>
Android 通用Dialog中设置RecyclerView
查看>>
利用 Android Studio 和 Gradle 打包多版本APK
查看>>
Android 自定义标题栏
查看>>
Android 如何把一个 RelativeLayout或ImageView背景设为透明
查看>>
tomcat优化方向
查看>>
http
查看>>
8-1-组队赛
查看>>
codility: CountTriangles
查看>>
赛斯说
查看>>
python 中的pipe
查看>>
(SQL Analyzer services)定义链接维度
查看>>
squid
查看>>
系统开发管理、架构与设计步步谈随笔索引
查看>>
Java的时间空间复杂度详解
查看>>
有效防止SQL注入漏洞
查看>>
Linux chown命令
查看>>