一篇文章带你了解kubernetes各组件间的通信机制

我们对kubernetes有了一定的认识,本文我们将继续深入的对kubernetes在系统层面上进行讨论,一起看看kubernetes的各个基本组件,以及各个组件是如何相互配合来撑起如此复杂的集群系统。下面跟随文章内容,一起来领略kubernetes令人惊叹的设计内幕吧。

一篇文章带你了解kubernetes各组件间的通信机制

Kubernetes的基本组件

Kubernetes将整个集群分为控制节点和工作节点,如下图所示。

一篇文章带你了解kubernetes各组件间的通信机制

Kubernetes中的Master是指集群控制节点,每个Kubernetes集群中需要一个Master节点来负责整个集群的管理和控制,Kubernetes中所有的控制指令都是交由Master来进行处理。引起在集群中处于非常重要的地位,因此在部署中需进行多节点单独部署。

Maste节点关键进程

一篇文章带你了解kubernetes各组件间的通信机制

Apiserver:提供kubernetes所有资源增删改查的唯一入口,也是集群控制的入口,提供http Rest接口,完成集群管理,资源配额,访问控制,认证授权,以及对etcd的操作。

Controller-manager:是集群内所有资源对象的自动化控制中心,负责pod和node的管理,节点控制器,服务控制器,副本控制器,服务账户和令牌控制器等

Scheduler:负责资源调度,监听Apiserver,查询是否有未调度的pod。

Etcd:在kubernetes系统中,主要有两个服务需要用到etcd来存储:

网络插件:如flannel等需要存储网络配置信息

Kubernetes本身,包括各种对象的状态和原信息配置。

除Master节点外,集群中其他集群被称为Node节点,较早的版本中也叫Minion。Node节点是集群中具体的工作负载节点,其可以使物理机也可以为虚拟机。

Node节点关键进程(包括但不限于以下进程)

一篇文章带你了解kubernetes各组件间的通信机制

kubelet:处理Master下发到本节点的任务,管理pod及pod的容器,每个kubelet在Apiserver上注册自身信息,定期向Master汇报节点的资源使用情况,并通过cAdvisor监控容器和节点信息。

kube-proxy:将到service的访问转发到后端的多个pod实例上,维护路由信息,对于每一个TCP类型的k8s service,kube-proxy会在本地建立一个sockerserver来负责均衡算法,使用rr负载均衡算法。

CNI网络组件:作为容器平台的网络标准化组件,为容器提供跨网段的通信支持,是kubernetes集群overlay网络的实现关键。

Docker:kubernetes支持多种容器工具,目前Docker作为主流的容器,为kubernetes集群提供容器创建及管理。

集群各组件间的交互

Kubernetes为所有资源增删改查的唯一入口,各组件均以list-watch的方式向Apiserve发送请求。为减少Apiserver的压力,各组件都采用缓存来缓存数据。功能模块在某些情况下不直接访问Apiserver,而是通过访问缓存来间接访问Apiserver。

Kubelet&Apiserver

每个node上的kubelet每个一个时间周期,就会调用Apiserver的REST接口来报告自身状态。Kubelet通过watch接口,监听pod信息。监听创建、删除、修改事件。

Controller-manager&Apiserver

controller-manager中包含多个controller,举例:Node Controller模块通过API server提供的Watch接口,实现监控Node信息,并做相应处理。

Scheduler&Apiserver

Scheduler通过API server的watch接口来监听,监听到新建pod副本后,检索所有符合该Pod要求的Node列表,开始执行Pod调度,调度成功后将pod绑定到具体节点。

以下是一张典型pod创建的流程图,其中可以看到Apiserver处于核心的位置,集群内的各个功能模块的所有原数据正删改查都是通过kube-apiserver操作etcd,当需要获取和操作这些数据时,通过Apiserver的REST接口来实现。

一篇文章带你了解kubernetes各组件间的通信机制
一篇文章带你了解kubernetes各组件间的通信机制

list-watch机制

kubernetes没有像其他分布式系统中额外引入MQ,是因为其设计理念采用了level trigger而非edge trigger。其仅仅通过http+protobuffer的方式,实现list-watcher机制来解决各组件间的消息通知。因此,在了解各组件通信前,必须先了解list-watch机制在kubernetes的应用。

List-watch是k8s统一的异步消息处理机制,list通过调用资源的list API罗列资源,基于HTTP短链接实现;watch则是调用资源的watch API监听资源变更事件,基于HTTP长链接实现。在kubernetes中,各组件通过监听Apiserver的资源变化,来更新资源状态。

这里对watch简要说明,流程如下图所示:

一篇文章带你了解kubernetes各组件间的通信机制

这一部分流程图看起来并不复杂,实际上里面的实现相当精妙。结合这幅图进行简要的解释:

1 首先需要强调一点,list或者watch的数据,均是来自于etcd的数据,因此在Apiserver中,一切的设计都是为了获取最新的etcd数据并返回给client。

2 当Apiserver监听到各组件发来的watch请求时,由于list和watch请求的格式相似,先进入ListResource函数进行分析,若解析为watch请求,便会创建一个watcher结构来响应请求。watcher的生命周期是每个http请求的。

  1. //每一个Watch请求对应一个watcher结构 
  2. func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage,... ... 
  3. …… 
  4. lister, isLister := storage.(rest.Lister) 
  5. watcher, isWatcher := storage.(rest.Watcher) ...(1) ... case "LIST": // List all resources of a kind. 
  6. …… 

3 创建了watcher,但谁来接收并缓存etcd的数据呢?Apiserver使用cacher来接收etcd的事件,cacher也是Storage类型,这里cacher可以理解为是监听etcd的一个实例,cacher针对于某个类型的数据,其cacher通过ListAndWatch()这个方法,向etcd发送watch请求。etcd会将某一类型的数据同步到watchCache这个结构,也就是说,ListAndWatch()将远端数据源源不断同步到cacher结构中来。Cacher的结构如下所示:

  1. type Cacher struct { 
  2.  incomingHWM storage.HighWaterMark 
  3.  incoming chan watchCacheEvent 
  4.  sync.RWMutex 
  5.  // Before accessing the cacher's cache, wait for the ready to be ok. 
  6.  // This is necessary to prevent users from accessing structures that are 
  7.  // uninitialized or are being repopulated right now. 
  8.  // ready needs to be set to false when the cacher is paused or stopped. 
  9.  // ready needs to be set to true when the cacher is ready to use after 
  10.  // initialization. 
  11.  ready *ready 
  12.  // Underlying storage.Interface. 
  13.  storage storage.Interface 
  14.  // Expected type of objects in the underlying cache. 
  15.  objectType reflect.Type 
  16.  // "sliding window" of recent changes of objects and the current state. 
  17.  watchCache *watchCache 
  18.  reflector *cache.Reflector 
  19.  // Versioner is used to handle resource versions. 
  20.  versioner storage.Versioner 
  21.  // newFunc is a function that creates new empty object storing a object of type Type. 
  22.  newFunc func() runtime.Object 
  23.  // indexedTrigger is used for optimizing amount of watchers that needs to process 
  24.  // an incoming event. 
  25.  indexedTrigger *indexedTriggerFunc 
  26.  // watchers is mapping from the value of trigger function that a 
  27.  // watcher is interested into the watchers 
  28.  watcherIdx int 
  29.  watchers indexedWatchers 
  30.  // Defines a time budget that can be spend on waiting for not-ready watchers 
  31.  // while dispatching event before shutting them down. 
  32.  dispatchTimeoutBudget *timeBudget 
  33.  // Handling graceful termination. 
  34.  stopLock sync.RWMutex 
  35.  stopped bool 
  36.  stopCh chan struct{} 
  37.  stopWg sync.WaitGroup 
  38.  clock clock.Clock 
  39.  // timer is used to avoid unnecessary allocations in underlying watchers. 
  40.  timer *time.Timer 
  41.  // dispatching determines whether there is currently dispatching of 
  42.  // any event in flight. 
  43.  dispatching bool 
  44.  // watchersBuffer is a list of watchers potentially interested in currently 
  45.  // dispatched event. 
  46.  watchersBuffer []*cacheWatcher 
  47.  // blockedWatchers is a list of watchers whose buffer is currently full
  48.  blockedWatchers []*cacheWatcher 
  49.  // watchersToStop is a list of watchers that were supposed to be stopped 
  50.  // during current dispatching, but stopping was deferred to the end of 
  51.  // dispatching that event to avoid race with closing channels in watchers. 
  52.  watchersToStop []*cacheWatcher 
  53.  // Maintain a timeout queue to send the bookmark event before the watcher times out
  54.  bookmarkWatchers *watcherBookmarkTimeBuckets 
  55.  // watchBookmark feature-gate 
  56.  watchBookmarkEnabled bool 
  57. }  

watchCache的结构如下所示:

  1. type watchCache struct { 
  2.  sync.RWMutex //同步锁 
  3.  cond *sync.Cond //条件变量 
  4.  capacity int//历史滑动窗口容量 
  5.  keyFunc func(runtime.Object) (string, error)//从storage中获取键值 
  6.  getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error)//获取一个对象的field和label信息 
  7.  cache []watchCacheElement//循环队列缓存 
  8.  startIndex int//循环队列的起始下标 
  9.  endIndex int//循环队列的结束下标 
  10.  store cache.Store// 
  11.  resourceVersion uint64 
  12.  onReplace func() 
  13.  onEvent func(*watchCacheEvent)//在每次缓存中的数据发生Add/Update/Delete后都会调用该函数,来获取对象的之前版本的值 
  14.  clock clock.Clock 
  15.  versioner storage.Versioner 

cache里面存放的是所有操作事件,而store中存放的是当前最新的事件。

4 cacheWatcher从watchCache中拿到从某个resourceVersion以来的所有数据,即initEvents,然后将数据放到input这个channel里面去,通过filter然后输出到result这个channel里面,返回数据到某个client。

  1. type cacheWatcher struct { 
  2.  sync.Mutex//同步锁 
  3.  input chan *watchCacheEvent//输入管道,Apiserver都事件发生时都会通过广播的形式向input管道进行发送 
  4.  result chan watch.Event//输出管道,输出到update管道中去 
  5.  done chan struct{} 
  6.  filter filterWithAttrsFunc//过滤器 
  7.  stopped bool 
  8.  forget func(bool) 
  9.  versioner storage.Versioner 

从一个pod创建过程看k8s组件通信

我们再回到上面的Pod创建流程图。从图中我们可以看出以下信息:

1 首先各组件也会在初始化时向Apiserver发送watch请求,即在图中标0的指令。Apiserver在创建kubeApiserver并注册各API路由信息时,获取Watch请求的路由信息

2 从Kubectl向Apiserver发送创建pod请求起,每一步创建、更新操作,都会存储到etcd中。

相关推荐