Kubernetes1.5源码分析(四) apiServer资源的etcd接口实现
源码版本
Kubernetes v1.5.0
简介
k8s的各个组件与apiServer交互操作各种资源对象,最终都会落入到etcd中。
k8s为所有对外提供服务的Restful资源实现了一套通用的符合Restful要求的etcd操作接口,每个服务接口负责处理一类(Kind)资源对象。
这些资源对象包括pods、bindings、podTemplates、RC、Services等。
Storage创建
要了解etcd操作接口的实现,我们先需要了解下Master.GenericAPIServer.storage结构:
storage map[string]rest.Storage
该storage变量是个map,Key是REST API的path,Value是rest.Storage接口,该接口就是一个通用的符合Restful要求的资源存储接口。
核心组资源列表的创建要查看pkg/registry/core/rest/storage_core.go中的NewLegacyRESTStorage()接口:
接口调用流程: main --> App.Run --> config.Complete().New() --> m.InstallLegacyAPI() --> NewLegacyRESTStorage()
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter genericapiserver.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) { .... // 创建podStorage podStorage := podetcd.NewStorage( restOptionsGetter(api.Resource("pods")), nodeStorage.KubeletConnectionInfo, c.ProxyTransport, podDisruptionClient, ) ... // 资源列表 restStorageMap := map[string]rest.Storage{ "pods": podStorage.Pod, "pods/attach": podStorage.Attach, "pods/status": podStorage.Status, "pods/log": podStorage.Log, "pods/exec": podStorage.Exec, "pods/portforward": podStorage.PortForward, "pods/proxy": podStorage.Proxy, "pods/binding": podStorage.Binding, "bindings": podStorage.Binding, "podTemplates": podTemplateStorage, "replicationControllers": controllerStorage.Controller, "replicationControllers/status": controllerStorage.Status, "services": serviceRest.Service, "services/proxy": serviceRest.Proxy, "services/status": serviceStatusStorage, "endpoints": endpointsStorage, ... "componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate), } if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}) { restStorageMap["replicationControllers/scale"] = controllerStorage.Scale } if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "policy", Version: "v1beta1"}) { restStorageMap["pods/eviction"] = podStorage.Eviction } apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap return restStorage, apiGroupInfo, nil }
该接口在ApiServer源码分析的第二章介绍资源注册的时候已经讲过,这里我们主要分析后端存储etcd操作接口的实现。
我们以Pod资源为例,进行介绍:
路径: pkg/registry/core/pod/etcd/etcd.go
func NewStorage(opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) PodStorage { // 完成prefix prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &api.PodList{} } // 调用接口装饰器,返回该storage的etcd操作接口及资源delete接口 // 该opts传参进来的,需要到上一层查看master.go下的restOptionsFactory.NewFor storageInterface, dFunc := opts.Decorator( opts.StorageConfig, // 这一下的参数都是用于开启cache时的接口使用 cachesize.GetWatchCacheSizeByResource(cachesize.Pods), &api.Pod{}, prefix, pod.Strategy, newListFunc, pod.NodeNameTriggerFunc, ) // 创建Store对象 store := ®istry.Store{ NewFunc: func() runtime.Object { return &api.Pod{} }, NewListFunc: newListFunc, KeyRootFunc: func(ctx api.Context) string { return registry.NamespaceKeyRootFunc(ctx, prefix) }, KeyFunc: func(ctx api.Context, name string) (string, error) { return registry.NamespaceKeyFunc(ctx, prefix, name) }, ObjectNameFunc: func(obj runtime.Object) (string, error) { return obj.(*api.Pod).Name, nil }, PredicateFunc: pod.MatchPod, QualifiedResource: api.Resource("pods"), EnableGarbageCollection: opts.EnableGarbageCollection, DeleteCollectionWorkers: opts.DeleteCollectionWorkers, CreateStrategy: pod.Strategy, UpdateStrategy: pod.Strategy, DeleteStrategy: pod.Strategy, ReturnDeletedObject: true, Storage: storageInterface, DestroyFunc: dFunc, } statusStore := *store statusStore.UpdateStrategy = pod.StatusStrategy return PodStorage{ Pod: &REST{store, proxyTransport}, Binding: &BindingREST{store: store}, Eviction: newEvictionStorage(store, podDisruptionBudgetClient), Status: &StatusREST{store: &statusStore}, Log: &podrest.LogREST{Store: store, KubeletConn: k}, Proxy: &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport}, Exec: &podrest.ExecREST{Store: store, KubeletConn: k}, Attach: &podrest.AttachREST{Store: store, KubeletConn: k}, PortForward: &podrest.PortForwardREST{Store: store, KubeletConn: k}, } }
该接口中调用了opts.Decorator()接口返回了关键的storage interface及清除操作资源的接口。
要看该接口的实现,我们得先从opts的创建开始。
restOptionsGetter(api.Resource("pods"))该步完成了opts的创建,api.Resource("pods")其实就是拼接了一个GroupResource的结构,我们需要从头开始介绍restOptionsGetter接口的由来。
路径:pkg/master/master.go
func (c completedConfig) New() (*Master, error) { ... restOptionsFactory := restOptionsFactory{ deleteCollectionWorkers: c.DeleteCollectionWorkers, enableGarbageCollection: c.GenericConfig.EnableGarbageCollection, storageFactory: c.StorageFactory, } // 判断是否使能了用于Watch的Cache // 有无cache赋值的是不同的接口实现 // restOptionsFactory.storageDecorator:是一个各个资源的REST interface(CRUD)装饰者 // 后面调用NewStorage()时会用到该接口,并输出对应的CRUD接口及销毁接口。 // 可以参考pkg/registry/core/pod/etcd/etcd.go中的NewStorage() // 其实这里有无cache的接口差异就在于:有cache的话,就提供操作cache的接口;无cache的话,就提供直接操作etcd的接口 if c.EnableWatchCache { restOptionsFactory.storageDecorator = registry.StorageWithCacher } else { restOptionsFactory.storageDecorator = generic.UndecoratedStorage } // install legacy rest storage if c.GenericConfig.APIResourceConfigSource.AnyResourcesForVersionEnabled(apiv1.SchemeGroupVersion) { legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{ StorageFactory: c.StorageFactory, ProxyTransport: c.ProxyTransport, KubeletClientConfig: c.KubeletClientConfig, EventTTL: c.EventTTL, ServiceIPRange: c.ServiceIPRange, ServiceNodePortRange: c.ServiceNodePortRange, LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig, } m.InstallLegacyAPI(c.Config, restOptionsFactory.NewFor, legacyRESTStorageProvider) } ... }
该接口初始化了一个restOptionsFactory变量,里面指定了最大的删除回收资源的协程数,是否使能GC和storageFactory,还根据是否使能了WatchCache来完成NewStorage()接口中调用的装饰器接口的赋值。
restOptionsFactory.NewForj接口一直被往下传,直到NewLegacyRESTStorage()接口中被调用然后创建了opts,我们看下该接口实现:
路径: pkg/master/master.go
type restOptionsFactory struct { deleteCollectionWorkers int enableGarbageCollection bool storageFactory genericapiserver.StorageFactory storageDecorator generic.StorageDecorator } func (f restOptionsFactory) NewFor(resource unversioned.GroupResource) generic.RESTOptions { // 创建该资源的Storage Config storageConfig, err := f.storageFactory.NewConfig(resource) if err != nil { glog.Fatalf("Unable to find storage destination for %v, due to %v", resource, err.Error()) } // 最终返回的就是RESTOptions, 就是前面的opts的类型 // 需要关注f.storageDecorator的由来 return generic.RESTOptions{ // 用于生成Storage的config StorageConfig: storageConfig, Decorator: f.storageDecorator, DeleteCollectionWorkers: f.deleteCollectionWorkers, EnableGarbageCollection: f.enableGarbageCollection, ResourcePrefix: f.storageFactory.ResourcePrefix(resource), } }
该接口比较简单,初始化了一个generic.RESTOptions变量,即opts。我们需要找出opts.Decorator的由来,就只需要看下上一个接口判断EnableWatchCache时就明白了。
opts.Decorator该接口最终返回了storage的interface和清除操作资源的接口。可以想一下带缓冲和不带缓冲的接口实现肯定不一致,所以这里需要进行区分:
registry.StorageWithCacher:该接口是返回了操作cache的接口,和清除cache的操作接口
generic.UndecoratedStorage: 该接口会根据你配置的后端类型(etcd2/etcd3等),来返回不同的etcd操作接口,其实是为所有的资源对象创建了etcd的链接,然后通过该链接发送不同的命令,最后还返回了断开该链接的接口。
所以实现完全不一样,一个操作cache,一个操作实际的etcd。
先看registry.StorageWithCacher()接口实现:
路径: pkg/registry/generic/registry/storage_factory.go
func StorageWithCacher( storageConfig *storagebackend.Config, capacity int, objectType runtime.Object, resourcePrefix string, scopeStrategy rest.NamespaceScopedStrategy, newListFunc func() runtime.Object, triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) { // storageConfig是后端存储的config,定义了存储类型,存储服务器List,TLS证书信息,Cache大小等。 // 该接口就是generic.UndecoratedStorage()接口的实现,StorageWithCacher()接口就是多了下面的cacher操作 s, d := generic.NewRawStorage(storageConfig) // TODO: we would change this later to make storage always have cacher and hide low level KV layer inside. // Currently it has two layers of same storage interface -- cacher and low level kv. cacherConfig := storage.CacherConfig{ CacheCapacity: capacity, Storage: s, Versioner: etcdstorage.APIObjectVersioner{}, Type: objectType, ResourcePrefix: resourcePrefix, NewListFunc: newListFunc, TriggerPublisherFunc: triggerFunc, Codec: storageConfig.Codec, } // 根据是否有namespace来进行区分赋值 // KeyFunc函数用于获取该object的Key: // 有namespace的话,key的格式:prefix + "/" + Namespace + "/" + name // 无namespace的话,key的格式:prefix + "/" + name if scopeStrategy.NamespaceScoped() { cacherConfig.KeyFunc = func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(resourcePrefix, obj) } } else { cacherConfig.KeyFunc = func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(resourcePrefix, obj) } } // 根据之前初始化的Cacher的config,进行cacher创建 // 比较关键,后面进行介绍 cacher := storage.NewCacherFromConfig(cacherConfig) destroyFunc := func() { cacher.Stop() d() } return cacher, destroyFunc }
先调用NewRawStorage()接口创建了一个存储后端,我们先看下这个接口实现:
路径: pkg/registry/generic/storage_decorator.go
func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc) { s, d, err := factory.Create(*config) if err != nil { glog.Fatalf("Unable to create storage backend: config (%v), err (%v)", config, err) } return s, d }
没啥好说的,继续看Create():
路径: pkg/storage/storagebackend/factory/factory.go
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) { // 判断下存储类型:etcd2 、etcd3 switch c.Type { case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD2: return newETCD2Storage(c) case storagebackend.StorageTypeETCD3: // TODO: We have the following features to implement: // - Support secure connection by using key, cert, and CA files. // - Honor "https" scheme to support secure connection in gRPC. // - Support non-quorum read. return newETCD3Storage(c) default: return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type) } }
挑个etcd2看下实现:
路径: pkg/storage/storagebackend/factory/etcd2.go
func newETCD2Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) { // 根据配置的TLS证书信息创建http.Transport tr, err := newTransportForETCD2(c.CertFile, c.KeyFile, c.CAFile) if err != nil { return nil, nil, err } // 创建etcd2 client,返回的是httpClusterClient结构 client, err := newETCD2Client(tr, c.ServerList) if err != nil { return nil, nil, err } // 根据入参初始化一个实现了storage.Interface接口的etcdHelper变量 s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize) // 返回etcdHelper变量,及关闭链接的函数 return s, tr.CloseIdleConnections, nil }
前两步都是为了创建与etcd连接的client,后一步比较关键:
路径: pkg/storage/etcd/etcd_helper.go
func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int) storage.Interface { return &etcdHelper{ // 创建一个httpMembersAPI变量,附带很多方法 etcdMembersAPI: etcd.NewMembersAPI(client), // 创建一个httpKeysAPI变量,同样附带各类方法 etcdKeysAPI: etcd.NewKeysAPI(client), // 编解码使用 codec: codec, versioner: APIObjectVersioner{}, // 用于序列化反序列化,版本间转换,兼容等 copier: api.Scheme, pathPrefix: path.Join("/", prefix), quorum: quorum, // 创建cache结构 cache: utilcache.NewCache(cacheSize), } }
该接口很简单的初始化,需要关注的是etcdHelper附带的通用的RESTFul 方法:
可以看到storage.Interface接口所需要的方法都实现了。
继续回到StorageWithCacher()接口,在往下走就是CacherConfig的初始化,就不介绍了,直接进入cacher的创建接口:
路径: pkg/storage/cacher.go
func NewCacherFromConfig(config CacherConfig) *Cacher { watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc) listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) // Give this error when it is constructed rather than when you get the // first watch item, because it's much easier to track down that way. if obj, ok := config.Type.(runtime.Object); ok { if err := runtime.CheckCodec(config.Codec, obj); err != nil { panic("storage codec doesn't seem to match given type: " + err.Error()) } } cacher := &Cacher{ ready: newReady(), storage: config.Storage, objectType: reflect.TypeOf(config.Type), watchCache: watchCache, reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0), versioner: config.Versioner, triggerFunc: config.TriggerPublisherFunc, watcherIdx: 0, watchers: indexedWatchers{ allWatchers: make(map[int]*cacheWatcher), valueWatchers: make(map[string]watchersMap), }, // TODO: Figure out the correct value for the buffer size. incoming: make(chan watchCacheEvent, 100), // We need to (potentially) stop both: // - wait.Until go-routine // - reflector.ListAndWatch // and there are no guarantees on the order that they will stop. // So we will be simply closing the channel, and synchronizing on the WaitGroup. stopCh: make(chan struct{}), } watchCache.SetOnEvent(cacher.processEvent) go cacher.dispatchEvents() stopCh := cacher.stopCh cacher.stopWg.Add(1) go func() { defer cacher.stopWg.Done() wait.Until( func() { if !cacher.isStopped() { cacher.startCaching(stopCh) } }, time.Second, stopCh, ) }() return cacher }
该接口主要用于开启cacher,而该cache只用于WATCH和LIST的request。
我们在看下Cacher结构体:
该接口必然也实现了storage.Interface接口所需要的方法。
因为该Cacher只用于WATCH和LIST的request,所以你可以看下cacher提供的API,除了WATCH和LIST相关的之外的接口都是调用了之前创建的storage的API。
查看下cacher.Create和Delete:
func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { return c.storage.Create(ctx, key, obj, out, ttl) } func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions) error { return c.storage.Delete(ctx, key, out, preconditions) }
到这里registry.StorageWithCacher()接口就结束了,我们继续回到前面讲的另外一个接口generic.UndecoratedStorage():
路径:pkg/registry/generic/storage_decorator.go
func UndecoratedStorage( config *storagebackend.Config, capacity int, objectType runtime.Object, resourcePrefix string, scopeStrategy rest.NamespaceScopedStrategy, newListFunc func() runtime.Object, trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) { return NewRawStorage(config) }
发现registry.StorageWithCacher()接口也是调用了NewRawStorage()接口,其实现就少了cache。
这里接触到了cache,下节会专门介绍该cache实现。
用户配置
--watch-cache: 该apiServer的参数默认就是true的,用于打开watch cache
--watch-cache-sizes: 既然有enable cache,那就少不了cache sizes,而且该size可以指定各类资源所使用的cache size。格式: resource#size
--storage-backend: 后端持久化存储类型,可选项为etcd2(默认)、etcd3
相关推荐
###host字段指定授权使用该证书的etcd节点IP或子网列表,需要将etcd集群的3个节点都添加其中。cp etcd-v3.3.13-linux-amd64/etcd* /opt/k8s/bin/