Reflector watches a specified resource and causes all changes to be reflected in the given store.
K8s 对于 Reflector 的描述十分干练:Reflector 是监听某项资源并将 “变更” “落库” 的组件。
除了 Reflector 本身,需要额外关注的便是 Store 这个接口,这里我们只简单看下 Store 的接口定义,了解一下这个东西大体是个什么东西:
// Store is a generic object storage and processing interface. A
// Store holds a map from string keys to accumulators, and has
// operations to add, update, and delete a given object to/from the
// accumulator currently associated with a given key. A Store also
// knows how to extract the key from a given object, so many operations
// are given only the object.
//
// In the simplest Store implementations each accumulator is simply
// the last given object, or empty after Delete, and thus the Store's
// behavior is simple storage.
//
// Reflector knows how to watch a server and update a Store. This
// package provides a variety of implementations of Store.
type Store interface {
// Add adds the given object to the accumulator associated with the given object's key
Add(obj interface{}) error
// Update updates the given object in the accumulator associated with the given object's key
Update(obj interface{}) error
// Delete deletes the given object from the accumulator associated with the given object's key
Delete(obj interface{}) error
// List returns a list of all the currently non-empty accumulators
List() []interface{}
// ListKeys returns a list of all the keys currently associated with non-empty accumulators
ListKeys() []string
// Get returns the accumulator associated with the given object's key
Get(obj interface{}) (item interface{}, exists bool, err error)
// GetByKey returns the accumulator associated with the given key
GetByKey(key string) (item interface{}, exists bool, err error)
// Replace will delete the contents of the store, using instead the
// given list. Store takes ownership of the list, you should not reference
// it after calling this function.
Replace([]interface{}, string) error
// Resync is meaningless in the terms appearing here but has
// meaning in some implementations that have non-trivial
// additional behavior (e.g., DeltaFIFO).
Resync() error
}
通俗解释一下,就是 Store 是一个通用对象存储和处理的接口,持有 keys 到 accumulators 的映射,可以增删改查。
以最简单的一种 case 来说,这个 accumulator 可以是一个对象,存储的行为也可以是一个简单存储,比如 Indexer client-go/tools/cache/store#cache
的实现,就是维护了内置了一个 map(严格意义来说是 client-go/tools/cache/thread_safe_store#goThreadSafeStore
,基于 map 实现了并发安全、索引)。
那么回到正题,Reflector 最主要的运转逻辑就是 ListAndWatch,资源的 watch 和 store 的更新是如何实现的呢,先从启动入口开始,启动的入口如下:
// Run repeatedly uses the reflector's ListAndWatch to fetch all the
// objects and subsequent deltas.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.backoffManager, true, stopCh)
klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}
这里的核心逻辑可以看到是 r.ListAndWatch()
,外面套了一层 Backoff 的管理器,用于崩溃时退避与恢复,这里不作赘述。
ListAndWatch 的代码结构如下,从注释也很容易看出,这里先进行了一次全量的 list,然后根据资源版本去做 watch。可以看到一开始的全量 list,这里由 Pager 代理了 listerWatcher 的 list 逻辑,在拿到结果后,调用 r.syncWith
,将结果存储到 Store 中:
// ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
var resourceVersion string
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
if err := func() error {
... // 有删减
var list runtime.Object
go func() {
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
}))
list, paginatedResult, err = pager.List(context.Background(), options)
close(listCh)
}
listMetaInterface, err := meta.ListAccessor(list)
resourceVersion = listMetaInterface.GetResourceVersion()
items, err := meta.ExtractList(list)
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
}
r.setLastSyncResourceVersion(resourceVersion)
... // 有删减
}(); err != nil {
return err
}
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {...Block1...}()
for {...Block2...}
}
r.syncWith
逻辑很简单,本质上就是调用了 store.Replace
:
// syncWith replaces the store's items with the given list.
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0, len(items))
for _, item := range items {
found = append(found, item)
}
return r.store.Replace(found, resourceVersion)
}
一步步来,首先是最里层的执行者,r.listerWatcher.List(opts)
,它有很多个实现,本质上就是如何去进行首次拉取,我们这里拿两个例子可以很直观的了解它做了什么:
第一个是测试用例中,如何声明 listerWatcher
的实现,ListFunc 就是很简单的返回一个列表对象,WatchFunc 则是简单的声明了一个基于 chan 的实现:
func TestReflectorResync(t *testing.T) {
iteration := 0
stopCh := make(chan struct{})
rerr := errors.New("expected resync reached")
s := &FakeCustomStore{
ResyncFunc: func() error {
iteration++
if iteration == 2 {
return rerr
}
return nil
},
}
lw := &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
fw := watch.NewFake()
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "0"}}, nil
},
}
resyncPeriod := 1 * time.Millisecond
r := NewReflector(lw, &v1.Pod{}, s, resyncPeriod)
if err := r.ListAndWatch(stopCh); err != nil {
// error from Resync is not propaged up to here.
t.Errorf("expected error %v", err)
}
if iteration != 2 {
t.Errorf("exactly 2 iterations were expected, got: %v", iteration)
}
}
//
// fw:
//
func (f *FakeWatcher) ResultChan() <-chan Event {
return f.result
}
// Add sends an add event.
func (f *FakeWatcher) Add(obj runtime.Object) {
f.result <- Event{Added, obj}
}
另外则是看一下通常的 NewDeploymentInformer
是如何声明的,可以看到,就是依靠基础的 client 来完成的:
// NewFilteredDeploymentInformer constructs a new informer for Deployment type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.ExtensionsV1beta1().Deployments(namespace).List(context.TODO(), options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.ExtensionsV1beta1().Deployments(namespace).Watch(context.TODO(), options)
},
},
&extensionsv1beta1.Deployment{},
resyncPeriod,
indexers,
)
}
了解了 listerWatcher
的职责,另一快则是 Pager。Pager 本质上是控制 List 这个动作是否分批执行,以及如何分批执行,很容易联想到一个 for 循环不断去捞取数据。
代码也确实如此,这个这里不做赘述。值得注意的是进行分页前的一些配置代码,如 page.PageSize
、options.ResourceVersion
和 Continue
,这些配置可以渗透到 pager 的设置中,在 Pager 启动前,可以看到:
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
// list request will return the full response.
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
}))
switch {
case r.WatchListPageSize != 0:
pager.PageSize = r.WatchListPageSize
case r.paginatedResult:
// We got a paginated result initially. Assume this resource and server honor
// paging requests (i.e. watch cache is probably disabled) and leave the default
// pager size set.
case options.ResourceVersion != "" && options.ResourceVersion != "0":
// User didn't explicitly request pagination.
//
// With ResourceVersion != "", we have a possibility to list from watch cache,
// but we do that (for ResourceVersion != "0") only if Limit is unset.
// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
// switch off pagination to force listing from watch cache (if enabled).
// With the existing semantic of RV (result is at least as fresh as provided RV),
// this is correct and doesn't lead to going back in time.
//
// We also don't turn off pagination for ResourceVersion="0", since watch cache
// is ignoring Limit in that case anyway, and if watch cache is not enabled
// we don't introduce regression.
pager.PageSize = 0
}
大体围绕着要不要进行分页,以及进行多大的分页来进行。
先看看分页的描述,这里提到了,使用分页应是一个谨慎的操作,它将直接由 etcd 提供,可能会导致性能问题,也就是第一个 case,我们应尽量避免。分页由 Continue 来进行提供,:
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
// If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
// (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
// it will turn off pagination to allow serving them from watch cache.
// NOTE: It should be used carefully as paginated lists are always served directly from
// etcd, which is significantly less efficient and may lead to serious performance and
// scalability problems.
WatchListPageSize int64
再看下服务端(ApiServer)的代码,具体的代码在 apiserver/pkg/storage/cacher/cacher.go
中,可以看到,启用了 APIListChunking
的前提下,使用 Continue 或 不指定 RV 或(使用 Limit 且 RV > 0)都将导致压力传导到 etcd 中。
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
hasContinuation := pagingEnabled && len(pred.Continue) > 0
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
if resourceVersion == "" || hasContinuation || hasLimit {
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility). If a continuation is
// requested, serve it from the underlying storage as well.
// Limits are only sent to storage when resourceVersion is non-zero
// since the watch cache isn't able to perform continuations, and
// limits are ignored when resource version is zero.
return c.storage.List(ctx, key, resourceVersion, pred, listObj)
}
所以综合下来,通常情况的优解就是 RV >= 0,且不使用 Limit 这个 case 。我们看看官方文档中对 RV 的描述:
对于 get 和 list 而言,resourceVersion
的语义为:
get:
resourceVersion 未设置 | resourceVersion="0" | resourceVersion="<非零值>" |
---|---|---|
最新版本 | 任何版本 | 不老于给定版本 |
list:
除非你对一致性有着非常强烈的需求,使用 resourceVersionMatch=NotOlderThan
同时为 resourceVersion
设定一个已知值是优选的交互方式,因为与不设置 resourceVersion
和 resourceVersionMatch
相比,这种配置可以取得更好的 集群性能和可扩缩性。后者需要提供带票选能力的读操作。
Pager 拿到数据之后,要做的就是三件事情:
这里先看前两件事:
listMetaInterface, err := meta.ListAccessor(list)
resourceVersion = listMetaInterface.GetResourceVersion()
items, err := meta.ExtractList(list)
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
}
r.setLastSyncResourceVersion(resourceVersion)
// syncWith replaces the store's items with the given list.
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0, len(items))
for _, item := range items {
found = append(found, item)
}
return r.store.Replace(found, resourceVersion)
}
【RV】 本质上是 MVCC 中基础的设计,是一个逻辑时间戳,对应 Etcd 中的 revision。为每个操作附带一个递增的逻辑时间戳,来达到整个系统的串行化。当然它还有其他的功能,如隔离级别、一致性视图等等,K8s 中最常接触到的是使用 RV 来做并发控制。
更新一个资源的时候,怎么知道其他人在这期间有没有修改过此资源?靠的就是 RV 的对比。
拿到 list 的结果后,可以从中获取到 RV,Watch 实际上就是基于这个 RV 来进行增量 watch:
... // 有删减
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
// We want to avoid situations of hanging watchers. Stop any wachers that do not
// receive any events within the timeout window.
TimeoutSeconds: &timeoutSeconds,
// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
// Reflector doesn't assume bookmarks are returned at all (if the server do not support
// watch bookmarks, it will ignore this field).
AllowWatchBookmarks: true,
}
// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
start := r.clock.Now()
w, err := r.listerWatcher.Watch(options)
r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh);
... // 有删减
r.listerWatcher.Watch(options)
和前面提到的 list 是一样的,本质上都是借助基础的 client 来实现的。
watch 的实现这里不多讲,核心就是 Transfer-Encoding: chunked + 长连接。内容简单,往 body 不断塞入以下内容即可,长度开头,换行后为内容,再次换行来结束。灵活,简单,高效:
25
This is the data in the first chunk
1C
and this is the second one
3
con
8
sequence
类似的流式技术有很多,我也没想太明白为什么是这个,大概是因为简单吧。有了上面简单的认知后,就很好理解 client.watch 在做什么了,先发起一个 Http 长连接,根据 mediaType 拿到相应的解码器,使用 frameReader 来读取流:
func (r *Request) Watch(ctx context.Context) (watch.Interface, error)
resp, err := client.Do(req)
contentType := resp.Header.Get("Content-Type")
mediaType, params, err := mime.ParseMediaType(contentType)
objectDecoder, streamingSerializer, framer, err := r.c.content.Negotiator.StreamDecoder(mediaType, params)
frameReader := framer.NewFrameReader(resp.Body)
watchEventDecoder := streaming.NewDecoder(frameReader, streamingSerializer)
return watch.NewStreamWatcher(
restclientwatch.NewDecoder(watchEventDecoder, objectDecoder),
// use 500 to indicate that the cause of the error is unknown - other error codes
// are more specific to HTTP interactions, and set a reason
errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
), nil
这个 NewStreamWatcher 启了一个协程,不断从 Reader 中读取内容,我们可以看到内容由两部分构成,一个叫 action,另一个则是 runtime.Object。解码 + 反序列化后,将内容投入 channel 中:
// NewStreamWatcher creates a StreamWatcher from the given decoder.
func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
sw := &StreamWatcher{
source: d,
reporter: r,
// It's easy for a consumer to add buffering via an extra
// goroutine/channel, but impossible for them to remove it,
// so nonbuffered is better.
result: make(chan Event),
}
go sw.receive()
return sw
}
const (
Added EventType = "ADDED"
Modified EventType = "MODIFIED"
Deleted EventType = "DELETED"
Bookmark EventType = "BOOKMARK"
Error EventType = "ERROR"
DefaultChanSize int32 = 100
)
// receive reads result from the decoder in a loop and sends down the result channel.
func (sw *StreamWatcher) receive() {
defer close(sw.result)
defer sw.Stop()
defer utilruntime.HandleCrash()
for {
action, obj, err := sw.source.Decode()
if err != nil {
// Ignore expected error.
if sw.stopping() {
return
}
switch err {
case io.EOF:
// watch closed normally
case io.ErrUnexpectedEOF:
klog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err)
default:
if net.IsProbableEOF(err) || net.IsTimeout(err) {
klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err)
} else {
sw.result <- Event{
Type: Error,
Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)),
}
}
}
return
}
sw.result <- Event{
Type: action,
Object: obj,
}
}
}
前面说到,Watcher 负责把 HTTP 长连接的内容进行解码、反序列化,拿到的内容投入 channel,那么消费 channel 的地方,在 Reflector 中叫做 WatchHandler ,整体的逻辑清晰简单,就是根据 Event,去调用不同的 store 实现,代码看起来多,本质上很简单:
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
eventCount := 0
// Stopping the watcher should be idempotent and if we return from this function there's no way
// we're coming back in with the same watch interface.
defer w.Stop()
loop:
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
case event, ok := <-w.ResultChan():
if !ok {
break loop
}
if event.Type == watch.Error {
return apierrors.FromObject(event.Object)
}
if r.expectedType != nil {
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
continue
}
}
if r.expectedGVK != nil {
if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
continue
}
}
meta, err := meta.Accessor(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
continue
}
newResourceVersion := meta.GetResourceVersion()
switch event.Type {
case watch.Added:
err := r.store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Modified:
err := r.store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
err := r.store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
}
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
}
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
eventCount++
}
}
watchDuration := r.clock.Since(start)
if watchDuration < 1*time.Second && eventCount == 0 {
return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
}
klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
return nil
}
经过前面的分析,我们已经了解到,Reflector 的核心逻辑是先进行了一次 List
再进行 watch
那么这里还剩下一个疑点没解,就是 Store 里面究竟做了什么?首先 Store 和 listWatcher 一样,都是很开放的实现,我们看看在 informer 的实现中,Store 是如何工作的。
代码很多,但是只需要关注 Store 这块内容即可,在初始化 informer 的过程中,new 了一个 DeltaFifo,注入到 Config 中,然后在运行 informer 的时候,将这个 DeltaFifo 当作 Store 传入了 Reflector 的构造函数。
// newInformer returns a controller for populating the store while also
// providing event notifications.
//
// Parameters
// * lw is list and watch functions for the source of the resource you want to
// be informed of.
// * objType is an object of the type that you expect to receive.
// * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
// calls, even if nothing changed). Otherwise, re-list will be delayed as
// long as possible (until the upstream source closes the watch or times out,
// or you stop the controller).
// * h is the object you want notifications sent to.
// * clientState is the store you want to populate
//
func newInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
clientState Store,
transformer TransformFunc,
) Controller {
// This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
})
cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
Process: func(obj interface{}) error {
if deltas, ok := obj.(Deltas); ok {
return processDeltas(h, clientState, transformer, deltas)
}
return errors.New("object given as Process argument is not Deltas")
},
}
return New(cfg)
}
// New makes a new Controller from the given Config.
func New(c *Config) Controller {
ctlr := &controller{
config: *c,
clock: &clock.RealClock{},
}
return ctlr
}
// Run begins processing items, and will continue until a value is sent down stopCh or it is closed.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
r.clock = c.clock
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
我们来关注一下 Informer 中的 Store,也就是 DeltaFIFO 承担了什么角色。先看看 DeltaFIFO 的设计。本质上 DeltaFIFO 像 FIFIO 一样,但是有两个不同:
用于解决以下问题:
实现的方式有很多种,来看看 K8s 是怎么实现这套逻辑的,主要声明的成员变量:
其中 keyFunc 是最简单的,默认会取这个对象的 meta.GetNamespace() + "/" + meta.GetName()
,或者 meta.GetName()
,这里不细说,本质上就是怎么唯一去区分一个对象。
knownObjects 本质上是一个 Indexer( infomer 背景下),在本篇文章的最开头,就介绍了这个 Indexer,DeltaFIFO 和 Indexer 都实现了 Store 接口。可以这么理解,Indexer 是 DeltaFIFO 中实际进行对象存储的成员,而 DeltaFIFO 则补充了期间的增量变化。这里也不多说,这个东西太简单了,有兴趣的可以去看下 client-go/tools/cache/store#cache
。
// DeltaFIFO is like FIFO, but differs in two ways. One is that the
// accumulator associated with a given object's key is not that object
// but rather a Deltas, which is a slice of Delta values for that
// object. Applying an object to a Deltas means to append a Delta
// except when the potentially appended Delta is a Deleted and the
// Deltas already ends with a Deleted. In that case the Deltas does
// not grow, although the terminal Deleted will be replaced by the new
// Deleted if the older Deleted's object is a
// DeletedFinalStateUnknown.
//
// The other difference is that DeltaFIFO has an additional way that
// an object can be applied to an accumulator, called Sync.
//
// DeltaFIFO is a producer-consumer queue, where a Reflector is
// intended to be the producer, and the consumer is whatever calls
// the Pop() method.
//
// DeltaFIFO solves this use case:
// * You want to process every object change (delta) at most once.
// * When you process an object, you want to see everything
// that's happened to it since you last processed it.
// * You want to process the deletion of some of the objects.
// * You might want to periodically reprocess objects.
//
// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
// interface{} to satisfy the Store/Queue interfaces, but they
// will always return an object of type Deltas.
//
// A DeltaFIFO's knownObjects KeyListerGetter provides the abilities
// to list Store keys and to get objects by Store key. The objects in
// question are called "known objects" and this set of objects
// modifies the behavior of the Delete, Replace, and Resync methods
// (each in a different way).
//
// A note on threading: If you call Pop() in parallel from multiple
// threads, you could end up with multiple threads processing slightly
// different versions of the same object.
type DeltaFIFO struct {
// lock/cond protects access to 'items' and 'queue'.
lock sync.RWMutex
cond sync.Cond
// We depend on the property that items in the set are in
// the queue and vice versa, and that all Deltas in this
// map have at least one Delta.
items map[string]Deltas
queue []string
// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populated bool
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int
// keyFunc is used to make the key used for queued item
// insertion and retrieval, and should be deterministic.
keyFunc KeyFunc
// knownObjects list keys that are "known" --- affecting Delete(),
// Replace(), and Resync()
knownObjects KeyListerGetter
// Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRED operations.
closed bool
closedLock sync.Mutex
// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
// DeltaType when Replace() is called (to preserve backwards compat).
emitDeltaTypeReplaced bool
}
先纵览一下 增删改,发现它的核心干活逻辑都是 queueActionLocked,其中删除多了一点点逻辑,本质上只是做了下重入,在删除操作发生的时候,如果 Indexer 和 Queue 中都没有了,就不做任何处理了。其他的都是通过 queueActionLocked 来完成:
// Add inserts an item, and puts it in the queue. The item is only enqueued
// if it doesn't already exist in the set.
func (f *DeltaFIFO) Add(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
return f.queueActionLocked(Added, obj)
}
// Update is just like Add, but makes an Updated Delta.
func (f *DeltaFIFO) Update(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
return f.queueActionLocked(Updated, obj)
}
// Delete is just like Add, but makes a Deleted Delta. If the given
// object does not already exist, it will be ignored. (It may have
// already been deleted by a Replace (re-list), for example.) In this
// method `f.knownObjects`, if not nil, provides (via GetByKey)
// _additional_ objects that are considered to already exist.
func (f *DeltaFIFO) Delete(obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
if f.knownObjects == nil {
if _, exists := f.items[id]; !exists {
// Presumably, this was deleted when a relist happened.
// Don't provide a second report of the same deletion.
return nil
}
} else {
// We only want to skip the "deletion" action if the object doesn't
// exist in knownObjects and it doesn't have corresponding item in items.
// Note that even if there is a "deletion" action in items, we can ignore it,
// because it will be deduped automatically in "queueActionLocked"
_, exists, err := f.knownObjects.GetByKey(id)
_, itemsExist := f.items[id]
if err == nil && !exists && !itemsExist {
// Presumably, this was deleted when a relist happened.
// Don't provide a second report of the same deletion.
return nil
}
}
return f.queueActionLocked(Deleted, obj)
}
所以目光只需要专注于 queueActionLocked 即可,来看看它是如何完成的,先是根据唯一标识找到了 Delta 数组中的对象,将此次的 Object 与 ActionType,一起 append 到原有的 Delta 数组中。可以看到,不管是增、删还是改,都会以一条记录的形式,塞入到 Delta 数组中:
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
newDeltas := append(f.items[id], Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
if len(newDeltas) > 0 {
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast()
} else {
// This never happens, because dedupDeltas never returns an empty list
// when given a non-empty list (as it is here).
// But if somehow it ever does return an empty list, then
// We need to remove this from our map (extra items in the queue are
// ignored if they are not in the map).
delete(f.items, id)
}
return nil
}
Reflector 有一个定期从 Api 重新拉取数据的 case,比较简单,这里不细说。值得注意的是,DeltaFIFO 的 Replace 操作,也就是说,触发了数据重新拉取之后,DeltaFIFO 将怎么处理。
重新同步拉取的列表,将全量变成新的 Event 塞入 DeltaFIFO,而同步过来时,本地有(Indexer),服务端却没有的数据,则会以占位符 DeletedFinalStateUnknown
的形式,塞入队列中。这样,在这种数据不一致场景出现的情况下,将模拟出一条删除事件供上层处理:
... // 有删减
func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
// keep backwards compat for old clients
action := Sync
if f.emitDeltaTypeReplaced {
action = Replaced
}
// Add Sync/Replaced action for each new item.
for _, item := range list {
key, err := f.KeyOf(item)
if err != nil {
return KeyError{item, err}
}
keys.Insert(key)
if err := f.queueActionLocked(action, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}
// Detect deletions not already in the queue.
knownKeys := f.knownObjects.ListKeys()
queuedDeletions := 0
for _, k := range knownKeys {
if keys.Has(k) {
continue
}
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
return nil
}
看到了这里,还是有些没头没尾,这里似乎没有 Indexer 什么事,那么数据是什么时候落入 Indexer 的呢?DeltaFIFO 的消费者又在哪里?实际上 Reflector 并没有定义这些逻辑,它只负责生产消息,如何消费则非常开放。
在第四节中,我们提到了 Store 的定义者 informer(实体类叫 controller),负责管理 Reflector 的生命周期。它定义了 Reflector 中的组件,如 DeltaFIFO,它同时也定义了消息的消费逻辑:
// Multiplexes updates in the form of a list of Deltas into a Store, and informs
// a given handler of events OnUpdate, OnAdd, OnDelete
func processDeltas(
// Object which receives event notifications from the given deltas
handler ResourceEventHandler,
clientState Store,
transformer TransformFunc,
deltas Deltas,
) error {
// from oldest to newest
for _, d := range deltas {
obj := d.Object
if transformer != nil {
var err error
obj, err = transformer(obj)
if err != nil {
return err
}
}
switch d.Type {
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(obj); err == nil && exists {
if err := clientState.Update(obj); err != nil {
return err
}
handler.OnUpdate(old, obj)
} else {
if err := clientState.Add(obj); err != nil {
return err
}
handler.OnAdd(obj)
}
case Deleted:
if err := clientState.Delete(obj); err != nil {
return err
}
handler.OnDelete(obj)
}
}
return nil
}
func newInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
clientState Store,
transformer TransformFunc,
) Controller {
// This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
})
cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
Process: func(obj interface{}) error {
if deltas, ok := obj.(Deltas); ok {
return processDeltas(h, clientState, transformer, deltas)
}
return errors.New("object given as Process argument is not Deltas")
},
}
return New(cfg)
}
// New makes a new Controller from the given Config.
func New(c *Config) Controller {
ctlr := &controller{
config: *c,
clock: &clock.RealClock{},
}
return ctlr
}
// Run begins processing items, and will continue until a value is sent down stopCh or it is closed.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
r.clock = c.clock
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
// processLoop drains the work queue.
// TODO: Consider doing the processing in parallel. This will require a little thought
// to make sure that we don't end up processing the same object multiple times
// concurrently.
//
// TODO: Plumb through the stopCh here (and down to the queue) so that this can
// actually exit when the controller is stopped. Or just give up on this stuff
// ever being stoppable. Converting this whole package to use Context would
// also be helpful.
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
可以看到,informer 启动了一个协程,不断的从 DeltaFIFO 中 Pop 出 Event,并交给 PopProcessFunc,也就是消费逻辑中处理。
在 Informer 的消费逻辑中,我们看到了 Indexer(clientState) 的更新逻辑,每当有新的事件进来,process 逻辑中便会从旧到新,遍历 Event 中的事件与当时的对象,分别对本地存储进行操作,并调用对应注册进来的 handler。
这里算是阅读源码的一些思考,其实很早就读过这部分代码,只是没有细看,细看之后产生了一些疑问,同时感谢同事解答。有一些可能我目前也不是太清楚,也欢迎大佬解答。
这块自己也是先入为主、以偏概全了,其实不然,有不少的控制器是需要知道每次变更的,比如 Deployment 的 Revision,Rollback 等机制,后需要能清楚处理每一次的变更。
先入为主 +1,从实现(Indexer)去倒推了抽象,认为 Indexer 的操作不会报错。实际上 Store 可以是任意实现,如果是其他的实现,则无法保证落库一定成功。
?
|