cAdvisor源码阅读

August 1, 2021

基于 v0.39.0 版本。cadvisor是通过cgroup获取各个容器的指标的,支持docker、containerd、cri-o等多种容器运行时。

main 入口函数:

func main() {
	...
    // 初始化存储,指标默认存储在内存中,默认仅保留2分钟以内的数据
    // 也支持持久化,通过storage_driver参数可以指定持久化的存储
    // 目前支持的存储有bigquery、elasticsearch、influxdb、kafka、redis、statsd、stdout
	memoryStorage, err := NewMemoryStorage()
	if err != nil {
		klog.Fatalf("Failed to initialize storage driver: %s", err)
	}

  	// 定义了一些获取主机文件系统信息的方法
	sysFs := sysfs.NewRealSysFs()

  	// 创建采集指标的http client
	collectorHttpClient := createCollectorHttpClient(*collectorCert, *collectorKey)

  	// 初始化资源管理器
	resourceManager, err := manager.New(memoryStorage, sysFs, housekeepingConfig, includedMetrics, &collectorHttpClient, strings.Split(*rawCgroupPrefixWhiteList, ","), *perfEvents)
	if err != nil {
		klog.Fatalf("Failed to create a manager: %s", err)
	}

	...
	// 这个函数会默认把容器label和env加到metrics的label里去,如果label和env很多,可能会导致程序占用很多内存,这时可以设置store_container_labels参数为false,同时设置whitelisted_container_labels仅保留需要的label
	containerLabelFunc := metrics.DefaultContainerLabels
	if !*storeContainerLabels {
		whitelistedLabels := strings.Split(*whitelistedContainerLabels, ",")
		containerLabelFunc = metrics.BaseContainerLabels(whitelistedLabels)
	}

	// 注册Prometheus的handle
	cadvisorhttp.RegisterPrometheusHandler(mux, resourceManager, *prometheusEndpoint, containerLabelFunc, includedMetrics)

	// 启动资源管理器
	if err := resourceManager.Start(); err != nil {
		klog.Fatalf("Failed to start manager: %v", err)
	}

  	...
}

main函数的主要逻辑是主机、文件系统、handle等的初始化,然后用这些数据去创建一个资源管理器,最后去启动这个资源管理器。

资源管理器的初始化代码就不贴了,这里直接看下资源管理器的定义:

type manager struct {
  	// 存储所有容器
	containers               map[namespacedContainerName]*containerData
	containersLock           sync.RWMutex
  	// 指标数据存储
	memoryCache              *memory.InMemoryCache
  	// 文件系统信息
	fsInfo                   fs.FsInfo
  	// 主机系统信息
	sysFs                    sysfs.SysFs
	machineMu                sync.RWMutex // protects machineInfo
  	// 主机信息
	machineInfo              info.MachineInfo
	quitChannels             []chan error
	cadvisorContainer        string
    // 是否直接运行在宿主机上,运行在容器内为false
	inHostNamespace          bool
    // 事件处理器
	eventHandler             events.EventManager
  	// 启动时间
	startupTime              time.Time
  	// 指标采集的最大时间间隔
	maxHousekeepingInterval  time.Duration
	allowDynamicHousekeeping bool
    // 允许的指标
	includedMetrics          container.MetricSet
  	// 容器监听器,支持监听多种容器运行时
	containerWatchers        []watcher.ContainerWatcher
  	// 容器事件channel,每watch到一个容器事件就往eventsChannel里写入一条数据
	eventsChannel            chan watcher.ContainerEvent
	collectorHTTPClient      *http.Client
	nvidiaManager            stats.Manager
	perfManager              stats.Manager
	resctrlManager           stats.Manager
	// List of raw container cgroup path prefix whitelist.
	rawContainerCgroupPathPrefixWhiteList []string
}

Start的主要逻辑是初始化不同的容器运行时,并注册监听器,然后监听容器的创建并做相应的处理:

func (m *manager) Start() error {
  	// 初始化容器运行时监听器,InitializePlugins里的plugins就是不同的容器运行时,如docker、containerd、cri-o等
	m.containerWatchers = container.InitializePlugins(m, m.fsInfo, m.includedMetrics)
	...
}

看看InitializePlugins是如何初始化容器运行时的:

func InitializePlugins(factory info.MachineInfoFactory, fsInfo fs.FsInfo, includedMetrics MetricSet) []watcher.ContainerWatcher {
	pluginsLock.Lock()
	defer pluginsLock.Unlock()

	containerWatchers := []watcher.ContainerWatcher{}
	for name, plugin := range plugins {
      	// 注册各个容器运行时的监听器
		watcher, err := plugin.Register(factory, fsInfo, includedMetrics)
		if err != nil {
			klog.V(5).Infof("Registration of the %s container factory failed: %v", name, err)
		}
      	// 若不为空,则将监听器保存到containerWatchers数组中
		if watcher != nil {
			containerWatchers = append(containerWatchers, watcher)
		}
	}
	return containerWatchers
}

按照我们的理解,返回的containerWatchers应该包含不同容器运行时的监听器,然而,通过查看各个容器运行时的Register方法,发现它们都没有实现ContainerWatcher这个interface:

func (p *plugin) Register(factory info.MachineInfoFactory, fsInfo fs.FsInfo, includedMetrics container.MetricSet) (watcher.ContainerWatcher, error) {
	err := Register(factory, fsInfo, includedMetrics)
  	// 返回的ContainerWatcher是nil
	return nil, err
}

也就是说,InitializePlugins并没有注册容器运行时的监听器,那么,不同容器运行时创建的容器到底是如何被监听到的呢?

接着往下看Start方法的代码:

func (m *manager) Start() error {
	m.containerWatchers = container.InitializePlugins(m, m.fsInfo, m.includedMetrics)

  	// 这个Register里注册了一个raw类型的工厂方法,当有raw类型的容器被监听到,会使用注册的factory进行处理
	err := raw.Register(m, m.fsInfo, m.includedMetrics, m.rawContainerCgroupPathPrefixWhiteList)
	if err != nil {
		klog.Errorf("Registration of the raw container factory failed: %v", err)
	}

    // 这里创建的是一个raw类型的容器监听器
  	rawWatcher, err := raw.NewRawContainerWatcher()
	if err != nil {
		return err
	}
  	// 将raw watcher也保存到m.containerWatchers数组中
	m.containerWatchers = append(m.containerWatchers, rawWatcher)
	...

	quitWatcher := make(chan error)
    // 看这儿,监听容器就在这个方法里
	err = m.watchForNewContainers(quitWatcher)
	if err != nil {
		return err
    }
}

可以看到注册了一个raw类型的watcher,这个rawWatcher实现了watch.ContainerWatcher接口,它才是真正的容器监听器,也就是说不同容器运行时创建的容器都是通过这个监听器监听到的,那它是如何做到的呢?

直接看m.watchForNewContainers()方法的代码:

func (m *manager) watchForNewContainers(quit chan error) error {
	watched := make([]watcher.ContainerWatcher, 0)
  	// 遍历所有containerWatcher,实际上只有raw类型的watcher
	for _, watcher := range m.containerWatchers {
      	// 就是这个Start方法了,启动监听器
		err := watcher.Start(m.eventsChannel)
		if err != nil {
			for _, w := range watched {
				stopErr := w.Stop()
				if stopErr != nil {
					klog.Warningf("Failed to stop wacher %v with error: %v", w, stopErr)
				}
			}
			return err
		}
		watched = append(watched, watcher)
	}
 	...
    go func() {
		for {
			select {
              // 接收容器事件
			case event := <-m.eventsChannel:
				switch {
				case event.EventType == watcher.ContainerAdd:
					switch event.WatchSource {
					default:
                        // 若为创建容器的事件,则调用createContainer,也就是收集该容器的各项指标保存在内存中,并定时更新
						err = m.createContainer(event.Name, event.WatchSource)
					}
				case event.EventType == watcher.ContainerDelete:
                    	// 若为删除容器的事件,则清理该容器的指标数据
					err = m.destroyContainer(event.Name)
				}
				if err != nil {
					klog.Warningf("Failed to process watch event %+v: %v", event, err)
				}
			case <-quit:
				var errs partialFailure

				// 若为退出事件,则停止所有的containerWatchers
				for i, watcher := range m.containerWatchers {
					err := watcher.Stop()
					if err != nil {
						errs.append(fmt.Sprintf("watcher %d", i), "Stop", err)
					}
				}

				if len(errs) > 0 {
					quit <- errs
				} else {
					quit <- nil
					klog.Infof("Exiting thread watching subcontainers")
					return
				}
			}
		}
	}()
	return nil
}

可以看到,它起了一个协程接收容器事件,容器事件来源于watcher.Start(m.eventsChannel)方法里,即真正的监听行为在这个Start里,看它的实现:

func (w *rawContainerWatcher) Start(events chan watcher.ContainerEvent) error {
	// Watch this container (all its cgroups) and all subdirectories.
	watched := make([]string, 0)
  	// 首先遍历cgroup子系统,如/sys/fs/cgroup/cpu、/sys/fs/cgroup/memory等等
	for _, cgroupPath := range w.cgroupPaths {
      	// watchDirectory是一个递归的方法,它会监听cgroup子系统其所有子目录
		_, err := w.watchDirectory(events, cgroupPath, "/")
		if err != nil {
			for _, watchedCgroupPath := range watched {
				_, removeErr := w.watcher.RemoveWatch("/", watchedCgroupPath)
				if removeErr != nil {
					klog.Warningf("Failed to remove inotify watch for %q with error: %v", watchedCgroupPath, removeErr)
				}
			}
			return err
		}
		watched = append(watched, cgroupPath)
	}

	// 起个协程处理内核事件
	go func() {
		for {
			select {
              // 这里的w.watcher就是之前初始化的内核事件watcher
			case event := <-w.watcher.Event():
            	// 接收到内核事件后,交给processEvent处理
				err := w.processEvent(event, events)
				if err != nil {
					klog.Warningf("Error while processing event (%+v): %v", event, err)
				}
			case err := <-w.watcher.Error():
				klog.Warningf("Error while watching %q: %v", "/", err)
			case <-w.stopWatcher:
				err := w.watcher.Close()
				if err == nil {
					w.stopWatcher <- err
					return
				}
			}
		}
	}()

	return nil
}

可以发现,这个rawContainerWatcher其实是监听的cgroup子系统下的所有子目录,因为每个容器都会有对应的cgroup子目录,所以当监听到有一个cgroup子目录被创建时,就可以认为有一个容器被创建了。而且这样做的好处是屏蔽了具体的容器运行时,不管是哪种容器运行时创建的容器都可以被监听到,真是妙啊!

监听目录又是怎么监听的呢?归根到底,是监听的内核事件,上述代码中的w.watch就是内核事件监听器:

func NewRawContainerWatcher() (watcher.ContainerWatcher, error) {
	...
	watcher, err := common.NewInotifyWatcher()
  	...
}

func NewInotifyWatcher() (*InotifyWatcher, error) {
	w, err := inotify.NewWatcher()
	if err != nil {
		return nil, err
	}

	return &InotifyWatcher{
		watcher:           w,
		containersWatched: make(map[string]map[string]bool),
	}, nil
}

  func NewWatcher() (*Watcher, error) {
	fd, errno := syscall.InotifyInit1(syscall.IN_CLOEXEC)
	if fd == -1 {
		return nil, os.NewSyscallError("inotify_init", errno)
	}
	w := &Watcher{
		fd:      fd,
		watches: make(map[string]*watch),
		paths:   make(map[int]string),
		Event:   make(chan *Event),
		Error:   make(chan error),
		done:    make(chan bool, 1),
	}
	// 这里起了一个协程去监听内核事件
	go w.readEvents()
	return w, nil
  }

接着看看接收到内核事件后,processEvent是如何处理的:

func (w *rawContainerWatcher) processEvent(event *inotify.Event, events chan watcher.ContainerEvent) error {
	// 将内核事件转换为容器事件
	var eventType watcher.ContainerEventType
	switch {
	case (event.Mask & inotify.InCreate) > 0:
		eventType = watcher.ContainerAdd
	case (event.Mask & inotify.InDelete) > 0:
		eventType = watcher.ContainerDelete
	case (event.Mask & inotify.InMovedFrom) > 0:
		eventType = watcher.ContainerDelete
	case (event.Mask & inotify.InMovedTo) > 0:
		eventType = watcher.ContainerAdd
	default:
		// Ignore other events.
		return nil
	}

	// Derive the container name from the path name.
	var containerName string
	for _, mount := range w.cgroupSubsystems.Mounts {
		mountLocation := path.Clean(mount.Mountpoint) + "/"
		if strings.HasPrefix(event.Name, mountLocation) {
			containerName = event.Name[len(mountLocation)-1:]
			break
		}
	}
	if containerName == "" {
		return fmt.Errorf("unable to detect container from watch event on directory %q", event.Name)
	}

	// Maintain the watch for the new or deleted container.
	switch eventType {
	case watcher.ContainerAdd:
		// 若为新增容器事件,表名有新的子目录被创建,则监听该目录及其子目录
		alreadyWatched, err := w.watchDirectory(events, event.Name, containerName)
		if err != nil {
			return err
		}

		// Only report container creation once.
		if alreadyWatched {
			return nil
		}
	case watcher.ContainerDelete:
		// 若为删除容器事件,则移除对该目录的监听
		lastWatched, err := w.watcher.RemoveWatch(containerName, event.Name)
		if err != nil {
			return err
		}

		// Only report container deletion once.
		if !lastWatched {
			return nil
		}
	default:
		return fmt.Errorf("unknown event type %v", eventType)
	}

	// 将容器事件写入到eventChannel
	events <- watcher.ContainerEvent{
		EventType:   eventType,
		Name:        containerName,
		WatchSource: watcher.Raw,
	}

	return nil
}

可以看到,processEvent就是将内核事件转换为了容器事件,并对新建或删除容器的cgroup目录进行监听或移除监听,最后把容器事件写入channel,这样watchForNewContainers()方法里的协程接收到容器事件后就可以对该容器进行相应的处理。

下面简单看看接收到容器事件后,做了什么操作:

func (m *manager) createContainerLocked(containerName string, watchSource watcher.ContainerWatchSource) error {
	...
    // 获取容器运行时handler
	handler, accept, err := container.NewContainerHandler(containerName, watchSource, m.inHostNamespace)
	if err != nil {
		return err
    }

  	...
    // 每个容器生成一个containerData对象,也就是一个容器管理器
  	cont, err := newContainerData(containerName, m.memoryCache, handler, logUsage, collectorManager, m.maxHousekeepingInterval, m.allowDynamicHousekeeping, clock.RealClock{})
	if err != nil {
		return err
    }
  	...
	// 运行这个容器管理器,会定期更新相关数据
	return cont.Start()
}

func (cd *containerData) Start() error {
  	// 这个housekeeping会定期去采集容器指标数据
	go cd.housekeeping()
	return nil
}

如何获取handler的,看NewContainerHandler的代码:

func NewContainerHandler(name string, watchType watcher.ContainerWatchSource, inHostNamespace bool) (ContainerHandler, bool, error) {
	factoriesLock.RLock()
	defer factoriesLock.RUnlock()

	// 这个factories就是之前plugin.Register()里注册的
	for _, factory := range factories[watchType] {
		canHandle, canAccept, err := factory.CanHandleAndAccept(name)
		if err != nil {
			klog.V(4).Infof("Error trying to work out if we can handle %s: %v", name, err)
		}
		if canHandle {
			if !canAccept {
				klog.V(3).Infof("Factory %q can handle container %q, but ignoring.", factory, name)
				return nil, false, nil
			}
			klog.V(3).Infof("Using factory %q for container %q", factory, name)
          	// 使用第一个可用的容器运行时handler进行处理
			handle, err := factory.NewContainerHandler(name, inHostNamespace)
			return handle, canAccept, err
		}
		klog.V(4).Infof("Factory %q was unable to handle container %q", factory, name)
	}

	return nil, false, fmt.Errorf("no known factory can handle creation of container")
}

不同的容器运行时handler有不同的实现,要看当前节点上运行了哪种容器运行时。

完。