您的位置:  首页 > 技术杂谈 > 正文

K8s 核心组件 Reflector 源码剖析 (基于 Informer 的实现)

2022-08-24 19:00 https://my.oschina.net/anur/blog/5569314 Anur 次阅读 条评论

一、什么是 Reflector

Reflector watches a specified resource and causes all changes to be reflected in the given store.

K8s 对于 Reflector 的描述十分干练:Reflector 是监听某项资源并将 “变更” “落库” 的组件。

除了 Reflector 本身,需要额外关注的便是 Store 这个接口,这里我们只简单看下 Store 的接口定义,了解一下这个东西大体是个什么东西:

// Store is a generic object storage and processing interface.  A
// Store holds a map from string keys to accumulators, and has
// operations to add, update, and delete a given object to/from the
// accumulator currently associated with a given key.  A Store also
// knows how to extract the key from a given object, so many operations
// are given only the object.
//
// In the simplest Store implementations each accumulator is simply
// the last given object, or empty after Delete, and thus the Store's
// behavior is simple storage.
//
// Reflector knows how to watch a server and update a Store.  This
// package provides a variety of implementations of Store.
type Store interface {

	// Add adds the given object to the accumulator associated with the given object's key
	Add(obj interface{}) error

	// Update updates the given object in the accumulator associated with the given object's key
	Update(obj interface{}) error

	// Delete deletes the given object from the accumulator associated with the given object's key
	Delete(obj interface{}) error

	// List returns a list of all the currently non-empty accumulators
	List() []interface{}

	// ListKeys returns a list of all the keys currently associated with non-empty accumulators
	ListKeys() []string

	// Get returns the accumulator associated with the given object's key
	Get(obj interface{}) (item interface{}, exists bool, err error)

	// GetByKey returns the accumulator associated with the given key
	GetByKey(key string) (item interface{}, exists bool, err error)

	// Replace will delete the contents of the store, using instead the
	// given list. Store takes ownership of the list, you should not reference
	// it after calling this function.
	Replace([]interface{}, string) error

	// Resync is meaningless in the terms appearing here but has
	// meaning in some implementations that have non-trivial
	// additional behavior (e.g., DeltaFIFO).
	Resync() error
}

通俗解释一下,就是 Store 是一个通用对象存储和处理的接口,持有 keys 到 accumulators 的映射,可以增删改查。

以最简单的一种 case 来说,这个 accumulator 可以是一个对象,存储的行为也可以是一个简单存储,比如 Indexer client-go/tools/cache/store#cache 的实现,就是维护了内置了一个 map(严格意义来说是 client-go/tools/cache/thread_safe_store#goThreadSafeStore,基于 map 实现了并发安全、索引)。

二、准备逻辑 —— List

那么回到正题,Reflector 最主要的运转逻辑就是 ListAndWatch,资源的 watch 和 store 的更新是如何实现的呢,先从启动入口开始,启动的入口如下:

// Run repeatedly uses the reflector's ListAndWatch to fetch all the
// objects and subsequent deltas.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
	klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
	wait.BackoffUntil(func() {
		if err := r.ListAndWatch(stopCh); err != nil {
			utilruntime.HandleError(err)
		}
	}, r.backoffManager, true, stopCh)
	klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}

这里的核心逻辑可以看到是 r.ListAndWatch(),外面套了一层 Backoff 的管理器,用于崩溃时退避与恢复,这里不作赘述。

ListAndWatch 的代码结构如下,从注释也很容易看出,这里先进行了一次全量的 list,然后根据资源版本去做 watch。可以看到一开始的全量 list,这里由 Pager 代理了 listerWatcher 的 list 逻辑,在拿到结果后,调用 r.syncWith,将结果存储到 Store 中:

// ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
  klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
	var resourceVersion string

	options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}

	if err := func() error {
    ... // 有删减
    
    var list runtime.Object
    go func() {
      pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
				return r.listerWatcher.List(opts)
			}))
      list, paginatedResult, err = pager.List(context.Background(), options)
      close(listCh) 
    }
    
    listMetaInterface, err := meta.ListAccessor(list)
    resourceVersion = listMetaInterface.GetResourceVersion()
    items, err := meta.ExtractList(list)
    if err := r.syncWith(items, resourceVersion); err != nil {
			return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
		}
    
    r.setLastSyncResourceVersion(resourceVersion)
    ... // 有删减
  }(); err != nil {
		return err
	}

	resyncerrc := make(chan error, 1)
	cancelCh := make(chan struct{})
	defer close(cancelCh)
	go func() {...Block1...}()

	for {...Block2...}
}

r.syncWith 逻辑很简单,本质上就是调用了 store.Replace

// syncWith replaces the store's items with the given list.
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
   found := make([]interface{}, 0, len(items))
   for _, item := range items {
      found = append(found, item)
   }
   return r.store.Replace(found, resourceVersion)
}

2.1 ListWatcher

一步步来,首先是最里层的执行者,r.listerWatcher.List(opts),它有很多个实现,本质上就是如何去进行首次拉取,我们这里拿两个例子可以很直观的了解它做了什么:

第一个是测试用例中,如何声明 listerWatcher 的实现,ListFunc 就是很简单的返回一个列表对象,WatchFunc 则是简单的声明了一个基于 chan 的实现:

func TestReflectorResync(t *testing.T) {
	iteration := 0
	stopCh := make(chan struct{})
	rerr := errors.New("expected resync reached")
	s := &FakeCustomStore{
		ResyncFunc: func() error {
			iteration++
			if iteration == 2 {
				return rerr
			}
			return nil
		},
	}

	lw := &testLW{
		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
			fw := watch.NewFake()
			return fw, nil
		},
		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
			return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "0"}}, nil
		},
	}
	resyncPeriod := 1 * time.Millisecond
	r := NewReflector(lw, &v1.Pod{}, s, resyncPeriod)
	if err := r.ListAndWatch(stopCh); err != nil {
		// error from Resync is not propaged up to here.
		t.Errorf("expected error %v", err)
	}
	if iteration != 2 {
		t.Errorf("exactly 2 iterations were expected, got: %v", iteration)
	}
}

//
// fw:
//

func (f *FakeWatcher) ResultChan() <-chan Event {
	return f.result
}

// Add sends an add event.
func (f *FakeWatcher) Add(obj runtime.Object) {
	f.result <- Event{Added, obj}
}

另外则是看一下通常的 NewDeploymentInformer 是如何声明的,可以看到,就是依靠基础的 client 来完成的:

// NewFilteredDeploymentInformer constructs a new informer for Deployment type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
	return cache.NewSharedIndexInformer(
		&cache.ListWatch{
			ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.ExtensionsV1beta1().Deployments(namespace).List(context.TODO(), options)
			},
			WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.ExtensionsV1beta1().Deployments(namespace).Watch(context.TODO(), options)
			},
		},
		&extensionsv1beta1.Deployment{},
		resyncPeriod,
		indexers,
	)
}

2.2 Pager

了解了 listerWatcher 的职责,另一快则是 Pager。Pager 本质上是控制 List 这个动作是否分批执行,以及如何分批执行,很容易联想到一个 for 循环不断去捞取数据。

代码也确实如此,这个这里不做赘述。值得注意的是进行分页前的一些配置代码,如 page.PageSizeoptions.ResourceVersionContinue ,这些配置可以渗透到 pager 的设置中,在 Pager 启动前,可以看到:

			// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
			// list request will return the full response.
			pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
				return r.listerWatcher.List(opts)
			}))
			switch {
			case r.WatchListPageSize != 0:
				pager.PageSize = r.WatchListPageSize
			case r.paginatedResult:
				// We got a paginated result initially. Assume this resource and server honor
				// paging requests (i.e. watch cache is probably disabled) and leave the default
				// pager size set.
			case options.ResourceVersion != "" && options.ResourceVersion != "0":
				// User didn't explicitly request pagination.
				//
				// With ResourceVersion != "", we have a possibility to list from watch cache,
				// but we do that (for ResourceVersion != "0") only if Limit is unset.
				// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
				// switch off pagination to force listing from watch cache (if enabled).
				// With the existing semantic of RV (result is at least as fresh as provided RV),
				// this is correct and doesn't lead to going back in time.
				//
				// We also don't turn off pagination for ResourceVersion="0", since watch cache
				// is ignoring Limit in that case anyway, and if watch cache is not enabled
				// we don't introduce regression.
				pager.PageSize = 0
			}

大体围绕着要不要进行分页,以及进行多大的分页来进行。

先看看分页的描述,这里提到了,使用分页应是一个谨慎的操作,它将直接由 etcd 提供,可能会导致性能问题,也就是第一个 case,我们应尽量避免。分页由 Continue 来进行提供,:

	// WatchListPageSize is the requested chunk size of initial and resync watch lists.
	// If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
	// (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
	// it will turn off pagination to allow serving them from watch cache.
	// NOTE: It should be used carefully as paginated lists are always served directly from
	// etcd, which is significantly less efficient and may lead to serious performance and
	// scalability problems.
	WatchListPageSize int64

再看下服务端(ApiServer)的代码,具体的代码在 apiserver/pkg/storage/cacher/cacher.go 中,可以看到,启用了 APIListChunking 的前提下,使用 Continue 或 不指定 RV 或(使用 Limit 且 RV > 0)都将导致压力传导到 etcd 中。

	pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
	hasContinuation := pagingEnabled && len(pred.Continue) > 0
	hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
	if resourceVersion == "" || hasContinuation || hasLimit {
		// If resourceVersion is not specified, serve it from underlying
		// storage (for backward compatibility). If a continuation is
		// requested, serve it from the underlying storage as well.
		// Limits are only sent to storage when resourceVersion is non-zero
		// since the watch cache isn't able to perform continuations, and
		// limits are ignored when resource version is zero.
		return c.storage.List(ctx, key, resourceVersion, pred, listObj)
	}

所以综合下来,通常情况的优解就是 RV >= 0,且不使用 Limit 这个 case 。我们看看官方文档中对 RV 的描述:


对于 getlist 而言,resourceVersion 的语义为:

get:

resourceVersion 未设置resourceVersion="0"resourceVersion="<非零值>"
最新版本任何版本不老于给定版本

list:

除非你对一致性有着非常强烈的需求,使用 resourceVersionMatch=NotOlderThan 同时为 resourceVersion 设定一个已知值是优选的交互方式,因为与不设置 resourceVersionresourceVersionMatch 相比,这种配置可以取得更好的 集群性能和可扩缩性。后者需要提供带票选能力的读操作。


2.3 Store 与 RV

Pager 拿到数据之后,要做的就是三件事情:

  • 第一个是将数据存起来
  • 第二个是将数据最新的版本(RV)记录起来
  • 第三个是拿着这个最新的数据版本去调用 watch

这里先看前两件事:

		listMetaInterface, err := meta.ListAccessor(list)
		resourceVersion = listMetaInterface.GetResourceVersion()

		items, err := meta.ExtractList(list)

		if err := r.syncWith(items, resourceVersion); err != nil {
			return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
		}

		r.setLastSyncResourceVersion(resourceVersion)
  • 【Store】将数据存储起来这个事情很好理解,看起来也很 “简单”,这里调用了 Reflector 中的重要成员变量 Store,并将拉取到的数据存入,Reflector 中 Store 实际上比较复杂,值得长篇大论,这里可以先简单把它看作一个内存 KV mapping:
// syncWith replaces the store's items with the given list.
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
	found := make([]interface{}, 0, len(items))
	for _, item := range items {
		found = append(found, item)
	}
	return r.store.Replace(found, resourceVersion)
}
  • 【RV】 本质上是 MVCC 中基础的设计,是一个逻辑时间戳,对应 Etcd 中的 revision。为每个操作附带一个递增的逻辑时间戳,来达到整个系统的串行化。当然它还有其他的功能,如隔离级别、一致性视图等等,K8s 中最常接触到的是使用 RV 来做并发控制。

    更新一个资源的时候,怎么知道其他人在这期间有没有修改过此资源?靠的就是 RV 的对比。

三、运转逻辑 —— Watch

3.1 Watch

拿到 list 的结果后,可以从中获取到 RV,Watch 实际上就是基于这个 RV 来进行增量 watch:

    ... // 有删减
    options = metav1.ListOptions{
			ResourceVersion: resourceVersion,
			// We want to avoid situations of hanging watchers. Stop any wachers that do not
			// receive any events within the timeout window.
			TimeoutSeconds: &timeoutSeconds,
			// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
			// Reflector doesn't assume bookmarks are returned at all (if the server do not support
			// watch bookmarks, it will ignore this field).
			AllowWatchBookmarks: true,
		}

		// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
		start := r.clock.Now()
		w, err := r.listerWatcher.Watch(options)

    r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh);
    ... // 有删减

r.listerWatcher.Watch(options) 和前面提到的 list 是一样的,本质上都是借助基础的 client 来实现的。

watch 的实现这里不多讲,核心就是 Transfer-Encoding: chunked + 长连接。内容简单,往 body 不断塞入以下内容即可,长度开头,换行后为内容,再次换行来结束。灵活,简单,高效:

25
This is the data in the first chunk

1C
and this is the second one

3
con

8
sequence

类似的流式技术有很多,我也没想太明白为什么是这个,大概是因为简单吧。有了上面简单的认知后,就很好理解 client.watch 在做什么了,先发起一个 Http 长连接,根据 mediaType 拿到相应的解码器,使用 frameReader 来读取流:

func (r *Request) Watch(ctx context.Context) (watch.Interface, error)

  resp, err := client.Do(req)

	contentType := resp.Header.Get("Content-Type")

	mediaType, params, err := mime.ParseMediaType(contentType)

	objectDecoder, streamingSerializer, framer, err := r.c.content.Negotiator.StreamDecoder(mediaType, params)

	frameReader := framer.NewFrameReader(resp.Body)
	watchEventDecoder := streaming.NewDecoder(frameReader, streamingSerializer)

	return watch.NewStreamWatcher(
		restclientwatch.NewDecoder(watchEventDecoder, objectDecoder),
		// use 500 to indicate that the cause of the error is unknown - other error codes
		// are more specific to HTTP interactions, and set a reason
		errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
	), nil

这个 NewStreamWatcher 启了一个协程,不断从 Reader 中读取内容,我们可以看到内容由两部分构成,一个叫 action,另一个则是 runtime.Object。解码 + 反序列化后,将内容投入 channel 中:

// NewStreamWatcher creates a StreamWatcher from the given decoder.
func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
	sw := &StreamWatcher{
		source:   d,
		reporter: r,
		// It's easy for a consumer to add buffering via an extra
		// goroutine/channel, but impossible for them to remove it,
		// so nonbuffered is better.
		result: make(chan Event),
	}
	go sw.receive()
	return sw
}

const (
	Added    EventType = "ADDED"
	Modified EventType = "MODIFIED"
	Deleted  EventType = "DELETED"
	Bookmark EventType = "BOOKMARK"
	Error    EventType = "ERROR"

	DefaultChanSize int32 = 100
)

// receive reads result from the decoder in a loop and sends down the result channel.
func (sw *StreamWatcher) receive() {
	defer close(sw.result)
	defer sw.Stop()
	defer utilruntime.HandleCrash()
	for {
		action, obj, err := sw.source.Decode()
		if err != nil {
			// Ignore expected error.
			if sw.stopping() {
				return
			}
			switch err {
			case io.EOF:
				// watch closed normally
			case io.ErrUnexpectedEOF:
				klog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err)
			default:
				if net.IsProbableEOF(err) || net.IsTimeout(err) {
					klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err)
				} else {
					sw.result <- Event{
						Type:   Error,
						Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)),
					}
				}
			}
			return
		}
		sw.result <- Event{
			Type:   action,
			Object: obj,
		}
	}
}

3.2 WatchHandler

前面说到,Watcher 负责把 HTTP 长连接的内容进行解码、反序列化,拿到的内容投入 channel,那么消费 channel 的地方,在 Reflector 中叫做 WatchHandler ,整体的逻辑清晰简单,就是根据 Event,去调用不同的 store 实现,代码看起来多,本质上很简单:

// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
	eventCount := 0

	// Stopping the watcher should be idempotent and if we return from this function there's no way
	// we're coming back in with the same watch interface.
	defer w.Stop()

loop:
	for {
		select {
		case <-stopCh:
			return errorStopRequested
		case err := <-errc:
			return err
		case event, ok := <-w.ResultChan():
			if !ok {
				break loop
			}
			if event.Type == watch.Error {
				return apierrors.FromObject(event.Object)
			}
			if r.expectedType != nil {
				if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
					utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
					continue
				}
			}
			if r.expectedGVK != nil {
				if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
					utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
					continue
				}
			}
			meta, err := meta.Accessor(event.Object)
			if err != nil {
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
				continue
			}
			newResourceVersion := meta.GetResourceVersion()
			switch event.Type {
			case watch.Added:
				err := r.store.Add(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
				}
			case watch.Modified:
				err := r.store.Update(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
				}
			case watch.Deleted:
				// TODO: Will any consumers need access to the "last known
				// state", which is passed in event.Object? If so, may need
				// to change this.
				err := r.store.Delete(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
				}
			case watch.Bookmark:
				// A `Bookmark` means watch has synced here, just update the resourceVersion
			default:
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
			}
			*resourceVersion = newResourceVersion
			r.setLastSyncResourceVersion(newResourceVersion)
			eventCount++
		}
	}

	watchDuration := r.clock.Since(start)
	if watchDuration < 1*time.Second && eventCount == 0 {
		return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
	}
	klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
	return nil
}

四、DeltaFIFO

经过前面的分析,我们已经了解到,Reflector 的核心逻辑是先进行了一次 List

  • 通过 Pager 配合 listerWatcher#ListFunc 尽可能拿到所有数据
  • 并将数据存入到 Store( Replace )

再进行 watch

  • 通过 listerWatcher#WatchFunc 请求了 ApiServer,不断解析 Body,产出 Event
  • WatchHandler 中通过事件的不同,进行处理,或者调用 Store 对应的增删改( Add、Delete、Update )

那么这里还剩下一个疑点没解,就是 Store 里面究竟做了什么?首先 Store 和 listWatcher 一样,都是很开放的实现,我们看看在 informer 的实现中,Store 是如何工作的。

代码很多,但是只需要关注 Store 这块内容即可,在初始化 informer 的过程中,new 了一个 DeltaFifo,注入到 Config 中,然后在运行 informer 的时候,将这个 DeltaFifo 当作 Store 传入了 Reflector 的构造函数。

// newInformer returns a controller for populating the store while also
// providing event notifications.
//
// Parameters
//  * lw is list and watch functions for the source of the resource you want to
//    be informed of.
//  * objType is an object of the type that you expect to receive.
//  * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
//    calls, even if nothing changed). Otherwise, re-list will be delayed as
//    long as possible (until the upstream source closes the watch or times out,
//    or you stop the controller).
//  * h is the object you want notifications sent to.
//  * clientState is the store you want to populate
//
func newInformer(
	lw ListerWatcher,
	objType runtime.Object,
	resyncPeriod time.Duration,
	h ResourceEventHandler,
	clientState Store,
	transformer TransformFunc,
) Controller {
	// This will hold incoming changes. Note how we pass clientState in as a
	// KeyLister, that way resync operations will result in the correct set
	// of update/delete deltas.
	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KnownObjects:          clientState,
		EmitDeltaTypeReplaced: true,
	})

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    lw,
		ObjectType:       objType,
		FullResyncPeriod: resyncPeriod,
		RetryOnError:     false,

		Process: func(obj interface{}) error {
			if deltas, ok := obj.(Deltas); ok {
				return processDeltas(h, clientState, transformer, deltas)
			}
			return errors.New("object given as Process argument is not Deltas")
		},
	}
	return New(cfg)
}

// New makes a new Controller from the given Config.
func New(c *Config) Controller {
	ctlr := &controller{
		config: *c,
		clock:  &clock.RealClock{},
	}
	return ctlr
}

// Run begins processing items, and will continue until a value is sent down stopCh or it is closed.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	go func() {
		<-stopCh
		c.config.Queue.Close()
	}()
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
	r.ShouldResync = c.config.ShouldResync
	r.WatchListPageSize = c.config.WatchListPageSize
	r.clock = c.clock
	if c.config.WatchErrorHandler != nil {
		r.watchErrorHandler = c.config.WatchErrorHandler
	}

	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()

	var wg wait.Group

	wg.StartWithChannel(stopCh, r.Run)

	wait.Until(c.processLoop, time.Second, stopCh)
	wg.Wait()
}

4.1 DeltaFIFO

我们来关注一下 Informer 中的 Store,也就是 DeltaFIFO 承担了什么角色。先看看 DeltaFIFO 的设计。本质上 DeltaFIFO 像 FIFIO 一样,但是有两个不同:

  • 其一是通过 Delta,可以认为是一个包装类(增量)来存储 K -> V 的 V
  • 其二是有一个 Sync 机制

用于解决以下问题:

  • 不错过任何一个操作历史的处理
  • 处理某个 V 时,可以看到从上次处理后,到此次处理期间的操作
  • 有感删除
  • 可定期重新同步

实现的方式有很多种,来看看 K8s 是怎么实现这套逻辑的,主要声明的成员变量:

  • KV 映射:items
  • 队列:queue
  • 从对象到 key 怎么转换:keyFunc
  • 怎么通过 key 拿到对象、维护所有 key:knownObjects

其中 keyFunc 是最简单的,默认会取这个对象的 meta.GetNamespace() + "/" + meta.GetName(),或者 meta.GetName(),这里不细说,本质上就是怎么唯一去区分一个对象。

knownObjects 本质上是一个 Indexer( infomer 背景下),在本篇文章的最开头,就介绍了这个 Indexer,DeltaFIFO 和 Indexer 都实现了 Store 接口。可以这么理解,Indexer 是 DeltaFIFO 中实际进行对象存储的成员,而 DeltaFIFO 则补充了期间的增量变化。这里也不多说,这个东西太简单了,有兴趣的可以去看下 client-go/tools/cache/store#cache

// DeltaFIFO is like FIFO, but differs in two ways.  One is that the
// accumulator associated with a given object's key is not that object
// but rather a Deltas, which is a slice of Delta values for that
// object.  Applying an object to a Deltas means to append a Delta
// except when the potentially appended Delta is a Deleted and the
// Deltas already ends with a Deleted.  In that case the Deltas does
// not grow, although the terminal Deleted will be replaced by the new
// Deleted if the older Deleted's object is a
// DeletedFinalStateUnknown.
//
// The other difference is that DeltaFIFO has an additional way that
// an object can be applied to an accumulator, called Sync.
//
// DeltaFIFO is a producer-consumer queue, where a Reflector is
// intended to be the producer, and the consumer is whatever calls
// the Pop() method.
//
// DeltaFIFO solves this use case:
//  * You want to process every object change (delta) at most once.
//  * When you process an object, you want to see everything
//    that's happened to it since you last processed it.
//  * You want to process the deletion of some of the objects.
//  * You might want to periodically reprocess objects.
//
// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
// interface{} to satisfy the Store/Queue interfaces, but they
// will always return an object of type Deltas.
//
// A DeltaFIFO's knownObjects KeyListerGetter provides the abilities
// to list Store keys and to get objects by Store key.  The objects in
// question are called "known objects" and this set of objects
// modifies the behavior of the Delete, Replace, and Resync methods
// (each in a different way).
//
// A note on threading: If you call Pop() in parallel from multiple
// threads, you could end up with multiple threads processing slightly
// different versions of the same object.
type DeltaFIFO struct {
	// lock/cond protects access to 'items' and 'queue'.
	lock sync.RWMutex
	cond sync.Cond

	// We depend on the property that items in the set are in
	// the queue and vice versa, and that all Deltas in this
	// map have at least one Delta.
	items map[string]Deltas
	queue []string

	// populated is true if the first batch of items inserted by Replace() has been populated
	// or Delete/Add/Update was called first.
	populated bool
	// initialPopulationCount is the number of items inserted by the first call of Replace()
	initialPopulationCount int

	// keyFunc is used to make the key used for queued item
	// insertion and retrieval, and should be deterministic.
	keyFunc KeyFunc

	// knownObjects list keys that are "known" --- affecting Delete(),
	// Replace(), and Resync()
	knownObjects KeyListerGetter

	// Indication the queue is closed.
	// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
	// Currently, not used to gate any of CRED operations.
	closed     bool
	closedLock sync.Mutex

	// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
	// DeltaType when Replace() is called (to preserve backwards compat).
	emitDeltaTypeReplaced bool
}

4.2 从增删改看 Deltas 工作

先纵览一下 增删改,发现它的核心干活逻辑都是 queueActionLocked,其中删除多了一点点逻辑,本质上只是做了下重入,在删除操作发生的时候,如果 Indexer 和 Queue 中都没有了,就不做任何处理了。其他的都是通过 queueActionLocked 来完成:

// Add inserts an item, and puts it in the queue. The item is only enqueued
// if it doesn't already exist in the set.
func (f *DeltaFIFO) Add(obj interface{}) error {
	f.lock.Lock()
	defer f.lock.Unlock()
	f.populated = true
	return f.queueActionLocked(Added, obj)
}

// Update is just like Add, but makes an Updated Delta.
func (f *DeltaFIFO) Update(obj interface{}) error {
	f.lock.Lock()
	defer f.lock.Unlock()
	f.populated = true
	return f.queueActionLocked(Updated, obj)
}

// Delete is just like Add, but makes a Deleted Delta. If the given
// object does not already exist, it will be ignored. (It may have
// already been deleted by a Replace (re-list), for example.)  In this
// method `f.knownObjects`, if not nil, provides (via GetByKey)
// _additional_ objects that are considered to already exist.
func (f *DeltaFIFO) Delete(obj interface{}) error {
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	f.lock.Lock()
	defer f.lock.Unlock()
	f.populated = true
	if f.knownObjects == nil {
		if _, exists := f.items[id]; !exists {
			// Presumably, this was deleted when a relist happened.
			// Don't provide a second report of the same deletion.
			return nil
		}
	} else {
		// We only want to skip the "deletion" action if the object doesn't
		// exist in knownObjects and it doesn't have corresponding item in items.
		// Note that even if there is a "deletion" action in items, we can ignore it,
		// because it will be deduped automatically in "queueActionLocked"
		_, exists, err := f.knownObjects.GetByKey(id)
		_, itemsExist := f.items[id]
		if err == nil && !exists && !itemsExist {
			// Presumably, this was deleted when a relist happened.
			// Don't provide a second report of the same deletion.
			return nil
		}
	}

	return f.queueActionLocked(Deleted, obj)
}

所以目光只需要专注于 queueActionLocked 即可,来看看它是如何完成的,先是根据唯一标识找到了 Delta 数组中的对象,将此次的 Object 与 ActionType,一起 append 到原有的 Delta 数组中。可以看到,不管是增、删还是改,都会以一条记录的形式,塞入到 Delta 数组中:

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}

	newDeltas := append(f.items[id], Delta{actionType, obj})
	newDeltas = dedupDeltas(newDeltas)

	if len(newDeltas) > 0 {
		if _, exists := f.items[id]; !exists {
			f.queue = append(f.queue, id)
		}
		f.items[id] = newDeltas
		f.cond.Broadcast()
	} else {
		// This never happens, because dedupDeltas never returns an empty list
		// when given a non-empty list (as it is here).
		// But if somehow it ever does return an empty list, then
		// We need to remove this from our map (extra items in the queue are
		// ignored if they are not in the map).
		delete(f.items, id)
	}
	return nil
}

4.3 DeltaFIFO Replace

Reflector 有一个定期从 Api 重新拉取数据的 case,比较简单,这里不细说。值得注意的是,DeltaFIFO 的 Replace 操作,也就是说,触发了数据重新拉取之后,DeltaFIFO 将怎么处理。

重新同步拉取的列表,将全量变成新的 Event 塞入 DeltaFIFO,而同步过来时,本地有(Indexer),服务端却没有的数据,则会以占位符 DeletedFinalStateUnknown 的形式,塞入队列中。这样,在这种数据不一致场景出现的情况下,将模拟出一条删除事件供上层处理:

... // 有删减
func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
  
	// keep backwards compat for old clients
	action := Sync
	if f.emitDeltaTypeReplaced {
		action = Replaced
	}
  
	// Add Sync/Replaced action for each new item.
	for _, item := range list {
		key, err := f.KeyOf(item)
		if err != nil {
			return KeyError{item, err}
		}
		keys.Insert(key)
		if err := f.queueActionLocked(action, item); err != nil {
			return fmt.Errorf("couldn't enqueue object: %v", err)
		}
	}

	// Detect deletions not already in the queue.
	knownKeys := f.knownObjects.ListKeys()
	queuedDeletions := 0
	for _, k := range knownKeys {
		if keys.Has(k) {
			continue
		}

		deletedObj, exists, err := f.knownObjects.GetByKey(k)
		if err != nil {
			deletedObj = nil
			klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
		} else if !exists {
			deletedObj = nil
			klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
		}
		queuedDeletions++
		if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
			return err
		}
	}

	return nil
}

看到了这里,还是有些没头没尾,这里似乎没有 Indexer 什么事,那么数据是什么时候落入 Indexer 的呢?DeltaFIFO 的消费者又在哪里?实际上 Reflector 并没有定义这些逻辑,它只负责生产消息,如何消费则非常开放。

五、消息消费者

在第四节中,我们提到了 Store 的定义者 informer(实体类叫 controller),负责管理 Reflector 的生命周期。它定义了 Reflector 中的组件,如 DeltaFIFO,它同时也定义了消息的消费逻辑:

// Multiplexes updates in the form of a list of Deltas into a Store, and informs
// a given handler of events OnUpdate, OnAdd, OnDelete
func processDeltas(
	// Object which receives event notifications from the given deltas
	handler ResourceEventHandler,
	clientState Store,
	transformer TransformFunc,
	deltas Deltas,
) error {
	// from oldest to newest
	for _, d := range deltas {
		obj := d.Object
		if transformer != nil {
			var err error
			obj, err = transformer(obj)
			if err != nil {
				return err
			}
		}

		switch d.Type {
		case Sync, Replaced, Added, Updated:
			if old, exists, err := clientState.Get(obj); err == nil && exists {
				if err := clientState.Update(obj); err != nil {
					return err
				}
				handler.OnUpdate(old, obj)
			} else {
				if err := clientState.Add(obj); err != nil {
					return err
				}
				handler.OnAdd(obj)
			}
		case Deleted:
			if err := clientState.Delete(obj); err != nil {
				return err
			}
			handler.OnDelete(obj)
		}
	}
	return nil
}

func newInformer(
	lw ListerWatcher,
	objType runtime.Object,
	resyncPeriod time.Duration,
	h ResourceEventHandler,
	clientState Store,
	transformer TransformFunc,
) Controller {
	// This will hold incoming changes. Note how we pass clientState in as a
	// KeyLister, that way resync operations will result in the correct set
	// of update/delete deltas.
	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KnownObjects:          clientState,
		EmitDeltaTypeReplaced: true,
	})

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    lw,
		ObjectType:       objType,
		FullResyncPeriod: resyncPeriod,
		RetryOnError:     false,

		Process: func(obj interface{}) error {
			if deltas, ok := obj.(Deltas); ok {
				return processDeltas(h, clientState, transformer, deltas)
			}
			return errors.New("object given as Process argument is not Deltas")
		},
	}
	return New(cfg)
}

// New makes a new Controller from the given Config.
func New(c *Config) Controller {
	ctlr := &controller{
		config: *c,
		clock:  &clock.RealClock{},
	}
	return ctlr
}

// Run begins processing items, and will continue until a value is sent down stopCh or it is closed.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	go func() {
		<-stopCh
		c.config.Queue.Close()
	}()
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
	r.ShouldResync = c.config.ShouldResync
	r.WatchListPageSize = c.config.WatchListPageSize
	r.clock = c.clock
	if c.config.WatchErrorHandler != nil {
		r.watchErrorHandler = c.config.WatchErrorHandler
	}

	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()

	var wg wait.Group

	wg.StartWithChannel(stopCh, r.Run)

	wait.Until(c.processLoop, time.Second, stopCh)
	wg.Wait()
}

// processLoop drains the work queue.
// TODO: Consider doing the processing in parallel. This will require a little thought
// to make sure that we don't end up processing the same object multiple times
// concurrently.
//
// TODO: Plumb through the stopCh here (and down to the queue) so that this can
// actually exit when the controller is stopped. Or just give up on this stuff
// ever being stoppable. Converting this whole package to use Context would
// also be helpful.
func (c *controller) processLoop() {
	for {
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
			if err == ErrFIFOClosed {
				return
			}
			if c.config.RetryOnError {
				// This is the safe way to re-enqueue.
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

可以看到,informer 启动了一个协程,不断的从 DeltaFIFO 中 Pop 出 Event,并交给 PopProcessFunc,也就是消费逻辑中处理。

在 Informer 的消费逻辑中,我们看到了 Indexer(clientState) 的更新逻辑,每当有新的事件进来,process 逻辑中便会从旧到新,遍历 Event 中的事件与当时的对象,分别对本地存储进行操作,并调用对应注册进来的 handler。

六:Self-Reflective

这里算是阅读源码的一些思考,其实很早就读过这部分代码,只是没有细看,细看之后产生了一些疑问,同时感谢同事解答。有一些可能我目前也不是太清楚,也欢迎大佬解答。

保存状态变化有必要吗,申明式的资源,控制器不是只需要对 “终态” 负责吗?

这块自己也是先入为主、以偏概全了,其实不然,有不少的控制器是需要知道每次变更的,比如 Deployment 的 Revision,Rollback 等机制,后需要能清楚处理每一次的变更。

为何 processLoop 需要 Requeue 机制?

先入为主 +1,从实现(Indexer)去倒推了抽象,认为 Indexer 的操作不会报错。实际上 Store 可以是任意实现,如果是其他的实现,则无法保证落库一定成功。

已知分页会对 etcd 有压力,什么时候需要分页,什么时候不需要呢?

?

展开阅读全文
  • 0
    感动
  • 0
    路过
  • 0
    高兴
  • 0
    难过
  • 0
    搞笑
  • 0
    无聊
  • 0
    愤怒
  • 0
    同情
热度排行
友情链接