GenericAPIServer的结构体 #
这一部分会在GenericAPIServer那块分析细节
GenericAPIServer的new方法 #
func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
if c.Serializer == nil {
return nil, fmt.Errorf("Genericapiserver.New() called with config.Serializer == nil")
}
if c.LoopbackClientConfig == nil {
return nil, fmt.Errorf("Genericapiserver.New() called with config.LoopbackClientConfig == nil")
}
if c.EquivalentResourceRegistry == nil {
return nil, fmt.Errorf("Genericapiserver.New() called with config.EquivalentResourceRegistry == nil")
}
handlerChainBuilder := func(handler http.Handler) http.Handler {
return c.BuildHandlerChainFunc(handler, c.Config)
}
apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
s := &GenericAPIServer{
discoveryAddresses: c.DiscoveryAddresses,
LoopbackClientConfig: c.LoopbackClientConfig,
legacyAPIGroupPrefixes: c.LegacyAPIGroupPrefixes,
admissionControl: c.AdmissionControl,
Serializer: c.Serializer,
AuditBackend: c.AuditBackend,
Authorizer: c.Authorization.Authorizer,
delegationTarget: delegationTarget,
EquivalentResourceRegistry: c.EquivalentResourceRegistry,
HandlerChainWaitGroup: c.HandlerChainWaitGroup,
Handler: apiServerHandler,
listedPathProvider: apiServerHandler,
minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
ShutdownTimeout: c.RequestTimeout,
ShutdownDelayDuration: c.ShutdownDelayDuration,
SecureServingInfo: c.SecureServing,
ExternalAddress: c.ExternalAddress,
openAPIConfig: c.OpenAPIConfig,
openAPIV3Config: c.OpenAPIV3Config,
skipOpenAPIInstallation: c.SkipOpenAPIInstallation,
postStartHooks: map[string]postStartHookEntry{},
preShutdownHooks: map[string]preShutdownHookEntry{},
disabledPostStartHooks: c.DisabledPostStartHooks,
healthzChecks: c.HealthzChecks,
livezChecks: c.LivezChecks,
readyzChecks: c.ReadyzChecks,
livezGracePeriod: c.LivezGracePeriod,
DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer),
maxRequestBodyBytes: c.MaxRequestBodyBytes,
livezClock: clock.RealClock{},
lifecycleSignals: c.lifecycleSignals,
ShutdownSendRetryAfter: c.ShutdownSendRetryAfter,
APIServerID: c.APIServerID,
StorageVersionManager: c.StorageVersionManager,
Version: c.Version,
muxAndDiscoveryCompleteSignals: map[string]<-chan struct{}{},
}
for {
if c.JSONPatchMaxCopyBytes <= 0 {
break
}
existing := atomic.LoadInt64(&jsonpatch.AccumulatedCopySizeLimit)
if existing > 0 && existing < c.JSONPatchMaxCopyBytes {
break
}
if atomic.CompareAndSwapInt64(&jsonpatch.AccumulatedCopySizeLimit, existing, c.JSONPatchMaxCopyBytes) {
break
}
}
// first add poststarthooks from delegated targets
// 从delegated targets中添加poststarthooks
for k, v := range delegationTarget.PostStartHooks() {
s.postStartHooks[k] = v
}
// 从delegated targets中添加preshutdownhooks
for k, v := range delegationTarget.PreShutdownHooks() {
s.preShutdownHooks[k] = v
}
// add poststarthooks that were preconfigured. Using the add method will give us an error if the same name has already been registered.
// 从提前配置好的地方增加poststarthooks。使用add方法,如果这个名称已经被注册了,就会报错。
for name, preconfiguredPostStartHook := range c.PostStartHooks {
if err := s.AddPostStartHook(name, preconfiguredPostStartHook.hook); err != nil {
return nil, err
}
}
// register mux signals from the delegated server
// 从delegated服务器中注册mux信号
for k, v := range delegationTarget.MuxAndDiscoveryCompleteSignals() {
if err := s.RegisterMuxAndDiscoveryCompleteSignal(k, v); err != nil {
return nil, err
}
}
genericApiServerHookName := "generic-apiserver-start-informers"
//这一步是注册一个叫“generic-apiserver-start-informers”的hook
if c.SharedInformerFactory != nil {
if !s.isPostStartHookRegistered(genericApiServerHookName) {
err := s.AddPostStartHook(genericApiServerHookName, func(context PostStartHookContext) error {
//hook的内容就是开启SharedInformerFactory
c.SharedInformerFactory.Start(context.StopCh)
return nil
})
if err != nil {
return nil, err
}
}
// TODO: Once we get rid of /healthz consider changing this to post-start-hook.
// 以后会把/healthz做成post-start-hook
err := s.AddReadyzChecks(healthz.NewInformerSyncHealthz(c.SharedInformerFactory))
if err != nil {
return nil, err
}
}
// 这个是增加的另一个hook,名字叫“priority-and-fairness-config-consumer”
const priorityAndFairnessConfigConsumerHookName = "priority-and-fairness-config-consumer"
if s.isPostStartHookRegistered(priorityAndFairnessConfigConsumerHookName) {
} else if c.FlowControl != nil {
// 添加这个PostStartHook
err := s.AddPostStartHook(priorityAndFairnessConfigConsumerHookName, func(context PostStartHookContext) error {
// 启动FlowControl两个go协程,有机会晚点再看看这个东西
go c.FlowControl.MaintainObservations(context.StopCh)
go c.FlowControl.Run(context.StopCh)
return nil
})
if err != nil {
return nil, err
}
// TODO(yue9944882): plumb pre-shutdown-hook for request-management system?
} else {
klog.V(3).Infof("Not requested to run hook %s", priorityAndFairnessConfigConsumerHookName)
}
// Add PostStartHooks for maintaining the watermarks for the Priority-and-Fairness and the Max-in-Flight filters.
// 为了维持Priority-and-Fairness和Max-in-Flight的watermarks添加的PostStartHooks
if c.FlowControl != nil {
const priorityAndFairnessFilterHookName = "priority-and-fairness-filter"
if !s.isPostStartHookRegistered(priorityAndFairnessFilterHookName) {
err := s.AddPostStartHook(priorityAndFairnessFilterHookName, func(context PostStartHookContext) error {
genericfilters.StartPriorityAndFairnessWatermarkMaintenance(context.StopCh)
return nil
})
if err != nil {
return nil, err
}
}
} else {
const maxInFlightFilterHookName = "max-in-flight-filter"
if !s.isPostStartHookRegistered(maxInFlightFilterHookName) {
err := s.AddPostStartHook(maxInFlightFilterHookName, func(context PostStartHookContext) error {
genericfilters.StartMaxInFlightWatermarkMaintenance(context.StopCh)
return nil
})
if err != nil {
return nil, err
}
}
}
// 这里是检查添加delegate的健康检查
for _, delegateCheck := range delegationTarget.HealthzChecks() {
skip := false
for _, existingCheck := range c.HealthzChecks {
if existingCheck.Name() == delegateCheck.Name() {
skip = true
break
}
}
if skip {
continue
}
s.AddHealthChecks(delegateCheck)
}
s.listedPathProvider = routes.ListedPathProviders{s.listedPathProvider, delegationTarget}
// 根据config安装API
installAPI(s, c.Config)
// use the UnprotectedHandler from the delegation target to ensure that we don't attempt to double authenticator, authorize,
// or some other part of the filter chain in delegation cases.
// 使用delegation target中的UnprotectedHandler来保证我们不会两次验证身份、权限或者一些其他方面的过滤器。
if delegationTarget.UnprotectedHandler() == nil && c.EnableIndex {
s.Handler.NonGoRestfulMux.NotFoundHandler(routes.IndexLister{
StatusCode: http.StatusNotFound,
PathProvider: s.listedPathProvider,
})
}
return s, nil
}