// Run runs the CMServer. This should never exit.
func Run(s *options.CMServer) error {
// 1. 验证CM配置
if err := s.Validate(KnownControllers(), ControllersDisabledByDefault.List()); err != nil {
return err
}
....
kubeClient, leaderElectionClient, kubeconfig, err := createClients(s)
if err != nil {
return err
}
go startHTTP(s)
recorder := createRecorder(kubeClient)
// 2. 创建controllers启动逻辑。run是真正用来启动controllers的函数
run := func(stop <-chan struct{}) {
rootClientBuilder := controller.SimpleControllerClientBuilder{
ClientConfig: kubeconfig,
}
var clientBuilder controller.ControllerClientBuilder
....
ctx, err := CreateControllerContext(s, rootClientBuilder, clientBuilder, stop)
....
// NewControllerInitializers返回所有待启动的controllers,StartControllers则依次启动
if err := StartControllers(ctx, saTokenControllerInitFunc, NewControllerInitializers()); err != nil {
glog.Fatalf( "error starting controllers: %v" , err)
}
ctx.InformerFactory.Start(ctx.Stop)
close(ctx.InformersStarted)
select {}
}
...
// 3. 选主,由leader执行run函数
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
RetryPeriod: s.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf( "leaderelection lost" )
},
},
})
panic( "unreachable" )
}
|