Controller

Manager创建client #

package main

import (
	"context"
	"fmt"
	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"log"
	logf "sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/manager"
	"time"
)

func K8sRestConfig() *rest.Config {
	config, err := clientcmd.BuildConfigFromFlags("", "./resources/config")
	if err != nil {
		log.Fatal(err)
	}
	return config
}

func main() {
	mgr, err := manager.New(K8sRestConfig(),
		manager.Options{
			Logger: logf.Log.WithName("test"),
		})
	if err != nil {
		log.Fatal(err)
	}
	go func() {
		time.Sleep(time.Second * 3)
		pod := &v1.Pod{}
		err = mgr.GetClient().Get(context.TODO(),
			types.NamespacedName{Namespace: "default", Name: "gateway"}, pod)
		if err != nil {
			log.Fatal(err)
		}
		fmt.Println(pod)
	}()

	mgr.Start(context.Background())
}

Manager的Get #

get会走缓存,也可能不能缓存,这里想看看能不能饶过缓存读取资源。

具体的代码如下

当我们执行的时候,是这么走的

可以看到,会在这边进行判断,需不需要走缓存,如果是isUncached的话,就直接从Client中读取,否则从缓存中读取。

// Get retrieves an obj for a given object key from the Kubernetes Cluster.
func (d *delegatingReader) Get(ctx context.Context, key ObjectKey, obj Object) error {
	if isUncached, err := d.shouldBypassCache(obj); err != nil {
		return err
	} else if isUncached {
		return d.ClientReader.Get(ctx, key, obj)
	}
	return d.CacheReader.Get(ctx, key, obj)
}

然后我们点进去shouldBypassCache函数,看看到底根据什么来判断需不需要走缓存。可以看到,这个是根据GVK来判断是不是在缓存中的。

func (d *delegatingReader) shouldBypassCache(obj runtime.Object) (bool, error) {
	gvk, err := apiutil.GVKForObject(obj, d.scheme)
	if err != nil {
		return false, err
	}
	// TODO: this is producing unsafe guesses that don't actually work,
	// but it matches ~99% of the cases out there.
	if meta.IsListType(obj) {
		gvk.Kind = strings.TrimSuffix(gvk.Kind, "List")
	}
	if _, isUncached := d.uncachedGVKs[gvk]; isUncached {
		return true, nil
	}
	if !d.cacheUnstructured {
		_, isUnstructured := obj.(*unstructured.Unstructured)
		_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
		return isUnstructured || isUnstructuredList, nil
	}
	return false, nil
}

修改下面一段代码,就是manager生成的时候的代码,这样以后走的就不是缓存了。

mgr, err := manager.New(K8sRestConfig(),
		manager.Options{
			Logger: logf.Log.WithName("test"),
			NewClient: func(cache cache.Cache, config *rest.Config, options client.Options, uncachedObjects ...client.Object) (client.Client, error) {
				return cluster.DefaultNewClient(cache, config, options, &v1.Pod{})
			},
		})

判断方式可以这样,就是我们下载下来controller-runtime的代码,然后在之前的func (d *delegatingReader) Get(ctx context.Context, key ObjectKey, obj Object)函数中打印内容,这样就可以判断出来是不是走的Cache了。

Scheme #

Scheme其实就是两个map,一个是根据GVK获取对象,一个是根据对象获取GVK。这个在初始化的时候就完成构建了。

现在只把corev1注册进来

func main(){
  s := runtime.NewScheme()
  v1.AddToScheme(s)
  fmt.Println(s.AllKnownTypes)
}

把所有的注册进来

func main(){
  s := runtime.NewScheme()
  scheme.AddToScheme(s)
  fmt.Println(s.AllKnownTypes)
}

那么上面的Get的逻辑是什么呢?

// setOptionsDefaults set default values for Options fields.
func setOptionsDefaults(options Options) Options {
	// Use the Kubernetes client-go scheme if none is specified
	if options.Scheme == nil {
		options.Scheme = scheme.Scheme
	}

可以看到,如果没有设置Scheme,那么就会默认使用全部的Scheme,于是都有cache了。

Manager中的Informer #

使用Manager中的informer

需要注意的是,这里的GetInformer得到的Informer是在controller-runtime中的,与之前的是不同的,但是他们实现了同样的接口,所以可以转换成cache.SharedIndexInformer。然后来获取相关内容。

func main() {
	mgr, err := manager.New(K8sRestConfig(),
		manager.Options{
			Logger:    logf.Log.WithName("test"),
			Namespace: "default",
		})
	if err != nil {
		log.Fatal(err)
	}
	go func() {
		time.Sleep(time.Second * 3)
		podInformer, _ := mgr.GetCache().GetInformer(context.Background(), &v1.Pod{})
		fmt.Println(podInformer.(cache.SharedIndexInformer).GetIndexer().ListKeys())

	}()

	mgr.Start(context.Background())
}

Controller #

test.go

package main

import (
	"context"
	v1 "k8s.io/api/core/v1"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"log"
	"sigs.k8s.io/controller-runtime/pkg/controller"
	"sigs.k8s.io/controller-runtime/pkg/handler"
	logf "sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/manager"
	"sigs.k8s.io/controller-runtime/pkg/source"
	"sigs.k8s.io/controller-runtime/pkg/tests/lib"
)

func K8sRestConfig() *rest.Config {
	config, err := clientcmd.BuildConfigFromFlags("", "./resources/config")
	if err != nil {
		log.Fatal(err)
	}
	return config
}

func main() {
	mgr, err := manager.New(K8sRestConfig(),
		manager.Options{
			Logger:    logf.Log.WithName("test"),
			Namespace: "default",
		})
	if err != nil {
		log.Fatal(err)
	}
  // 这里就是开始手工创建controller,参数分别为,第一个是名字,第二个是manager,第三个是Options,主要填入Reconciler
	ctl, err := controller.New("abc", mgr, controller.Options{
		Reconciler: &lib.Ctl{},
	})
  // 设置监控资源
	src := &source.Kind{
		Type: &v1.Pod{},
	}
  // 放入的Queen
	hdler := &handler.EnqueueRequestForObject{}

	ctl.Watch(src, hdler)

	mgr.Start(context.Background())
}

ctl.go

package lib

import (
	"context"
	"fmt"
	controllerruntime "sigs.k8s.io/controller-runtime"
)

type Ctl struct {
}
// 所谓的Reconciler就是实现了Reconcile这个借口,每次Reconcile的时候调用一下
func (*Ctl) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {

	fmt.Println(req.NamespacedName)
	return controllerruntime.Result{}, nil
}

手工触发reconcile #

这里的逻辑就是实现gin的一个网页,然后访问一次这个网页,就触发一次reconcile,其实就是把事件加入到队列中。然后给这个gin程序实现start接口,可以直接放入manager的add函数中。

package lib

import (
	"context"
	"github.com/gin-gonic/gin"
	v1 "k8s.io/api/core/v1"
	"sigs.k8s.io/controller-runtime/pkg/controller"
	"sigs.k8s.io/controller-runtime/pkg/event"
	"sigs.k8s.io/controller-runtime/pkg/handler"
	cc "sigs.k8s.io/controller-runtime/pkg/internal/controller"
	"sigs.k8s.io/controller-runtime/pkg/manager"
)

type MyWeb struct {
	e   handler.EventHandler
	ctl controller.Controller
}

func NewMyWeb(e handler.EventHandler, ctl controller.Controller) *MyWeb {
	return &MyWeb{e: e, ctl: ctl}
}

func (m MyWeb) Start(ctx context.Context) error {
	//TODO implement me
	r := gin.New()
	r.GET("/add", func(c *gin.Context) {
		pod := &v1.Pod{}
		pod.Name = "mytestpod"
		pod.Namespace = "abc"
		m.e.Create(event.CreateEvent{Object: pod}, m.ctl.(*cc.Controller).Queue)
	})

	return r.Run(":8081")
}

var _ manager.Runnable = &MyWeb{}

其他资源变更触发Reconcile #

package lib

import (
	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/client-go/util/workqueue"
	"sigs.k8s.io/controller-runtime/pkg/controller"
	"sigs.k8s.io/controller-runtime/pkg/event"
	"sigs.k8s.io/controller-runtime/pkg/handler"
	"sigs.k8s.io/controller-runtime/pkg/reconcile"
	"sigs.k8s.io/controller-runtime/pkg/source"
)

func AddCmWatch(ctl controller.Controller) error {
	src := &source.Kind{
		Type: &v1.ConfigMap{},
	}

	e := handler.Funcs{
		CreateFunc: func(event event.CreateEvent, limitingInterface workqueue.RateLimitingInterface) {
			limitingInterface.Add(reconcile.Request{
				types.NamespacedName{
					Name: "abc", Namespace: "bbb",
				},
			})
		},
	}

	return ctl.Watch(src, e)
}

工作队列 #

这个工作队列符合的特性

  • Fair:先到先处理
  • Stingy:同一事件在处理之前多次被添加,只处理一次
  • 支持多消费者和生产者,支持运行时重新入队
  • 关闭通知
package main

import (
	"fmt"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/client-go/util/workqueue"
	"sigs.k8s.io/controller-runtime/pkg/reconcile"
	"time"
)

func newItem(name, ns string) reconcile.Request {
	return reconcile.Request{
		NamespacedName: types.NamespacedName{
			Name:      name,
			Namespace: ns,
		}}
}

func main() {
	que := workqueue.New()
	go func() {
		for {
			item, _ := que.Get()
			fmt.Println(item.(reconcile.Request).NamespacedName)
			time.Sleep(time.Millisecond * 20)
		}
	}()
	for {
		que.Add(newItem("abc", "default"))
		que.Add(newItem("abc", "default"))
		que.Add(newItem("abc", "default"))
		time.Sleep(time.Second * 1)
	}
}

运行结果为一行default/abc

整个的逻辑是,有一个dirty的map,键存的是item的内容,所以后面的add发现dirty里面有了,就不会塞入了。

而Get的作用是将第一个item放入处理队列。如果只有一个的话,就会取出来,然后判断长度为0,就进行等待。进入等待状态。手动处理数据,最后执行que.Done就会唤醒之前的协程,继续往下走。

package main

import (
	"fmt"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/client-go/util/workqueue"
	"sigs.k8s.io/controller-runtime/pkg/reconcile"
	"time"
)

func newItem(name, ns string) reconcile.Request {
	return reconcile.Request{
		NamespacedName: types.NamespacedName{
			Name:      name,
			Namespace: ns,
		}}
}

func main() {
	que := workqueue.New()
	go func() {
		for {
			item, _ := que.Get()
			fmt.Println(item.(reconcile.Request).NamespacedName)
			time.Sleep(time.Millisecond * 20)
      que.Done(item)
		}
	}()
	for {
		que.Add(newItem("abc", "default"))
		que.Add(newItem("abc", "default"))
		que.Add(newItem("abc", "default"))
		time.Sleep(time.Second * 1)
	}
}

如果对队列不做限制,那么加入的过程可能很快,这样负载就太高了。所以需要用限速队列来延迟加入一下。

限速队列 通过一些限速算法(令牌桶 go官方的)让数据延迟一定的时间后再加入队列

延迟队列 实现上面的延迟xx时间后加入

package main

import (
	"fmt"
	"golang.org/x/time/rate"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/client-go/util/workqueue"
	"sigs.k8s.io/controller-runtime/pkg/reconcile"
	"strconv"
	"time"
)

func newItem(name, ns string) reconcile.Request {
	return reconcile.Request{
		NamespacedName: types.NamespacedName{
			Name:      name,
			Namespace: ns,
		}}
}

func main() {
	que := workqueue.NewRateLimitingQueue(&workqueue.BucketRateLimiter{
		Limiter: rate.NewLimiter(1, 1),
	})
	go func() {
		for {
			item, _ := que.Get()
			fmt.Println(item.(reconcile.Request).NamespacedName)
			time.Sleep(time.Millisecond * 20)
		}
	}()

	for i := 0; i < 10; i++ {
		que.AddRateLimited(newItem("abc"+strconv.Itoa(i), "default"))
	}

}

这样以后,每秒钟加入一个到队列

golang限流器 #

	limter := rate.NewLimiter(1, 3)
	for i := 0; ; i++ {
		err := limter.Wait(context.Background())
		if err != nil {
			log.Fatalln(err)
		}
		fmt.Println(i)
	}

一开始直接打印0、1、2

然后每过一秒打印一个数字

或者用下面这种方式实现,也是一样的

	limter := rate.NewLimiter(1, 3)
	for i := 0; ; i++ {
		r := limter.Reserve()
		time.Sleep(r.Delay())
		fmt.Println(i)
	}

控制器的并发数设置 #

如果没有设置MaxConcurrentReconciles的话,那么是单线程的,reconcile需要等待。

业务里面,如果reconcile比较耗时间,就需要设置这个参数。一般3-5就可以了。

func main() {
	mgr, err := manager.New(K8sRestConfig(),
		manager.Options{
			Logger:    logf.Log.WithName("test"),
			Namespace: "default",
		})
	if err != nil {
		log.Fatal(err)
	}
	ctl, err := controller.New("abc", mgr, controller.Options{
		Reconciler:              &lib.Ctl{},
		MaxConcurrentReconciles: 2,
	})
	src := &source.Kind{
		Type: &v1.Pod{},
	}
	hdler := &handler.EnqueueRequestForObject{}

	ctl.Watch(src, hdler)
	mgr.Add(lib.NewMyWeb(hdler, ctl))
	lib.AddCmWatch(ctl)

	mgr.Start(context.Background())
}

Own资源监听 #

package main

import (
	"context"
	v1 "k8s.io/api/core/v1"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"log"
	"sigs.k8s.io/controller-runtime/pkg/controller"
	"sigs.k8s.io/controller-runtime/pkg/handler"
	logf "sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/manager"
	"sigs.k8s.io/controller-runtime/pkg/source"
	"sigs.k8s.io/controller-runtime/pkg/tests/lib"
)

func K8sRestConfig() *rest.Config {
	config, err := clientcmd.BuildConfigFromFlags("", "./resources/config")
	if err != nil {
		log.Fatal(err)
	}
	return config
}

func main() {
	mgr, err := manager.New(K8sRestConfig(),
		manager.Options{
			Logger:    logf.Log.WithName("test"),
			Namespace: "default",
		})
	if err != nil {
		log.Fatal(err)
	}
	ctl, err := controller.New("abc", mgr, controller.Options{
		Reconciler:              &lib.Ctl{},
		MaxConcurrentReconciles: 2,
	})
	src := &source.Kind{
		Type: &v1.Pod{},
	}
	//hdler := &handler.EnqueueRequestForObject{}
	ownObj := &v1.Pod{}
	hdler := &handler.EnqueueRequestForOwner{
		OwnerType: ownObj,
	}
	ctl.Watch(src, hdler)
	mgr.Add(lib.NewMyWeb(hdler, ctl, ownObj, mgr.GetScheme()))
	lib.AddCmWatch(ctl)

	mgr.Start(context.Background())
}



// myweb.go
package lib

import (
	"context"
	"github.com/gin-gonic/gin"
	v1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"sigs.k8s.io/controller-runtime/pkg/controller"
	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
	"sigs.k8s.io/controller-runtime/pkg/event"
	"sigs.k8s.io/controller-runtime/pkg/handler"
	cc "sigs.k8s.io/controller-runtime/pkg/internal/controller"
	"sigs.k8s.io/controller-runtime/pkg/manager"
)

type MyWeb struct {
	e      handler.EventHandler
	ctl    controller.Controller
	ownObj runtime.Object
	scheme *runtime.Scheme
}

func NewMyWeb(e handler.EventHandler, ctl controller.Controller, ownObj runtime.Object, scheme *runtime.Scheme) *MyWeb {
	return &MyWeb{e: e, ctl: ctl, ownObj: ownObj, scheme: scheme}
}

func (m MyWeb) Start(ctx context.Context) error {
	//TODO implement me
	r := gin.New()
	r.GET("/add", func(c *gin.Context) {
		pod := &v1.Pod{}
		pod.Name = "mytestpod"
		pod.Namespace = "mytest"
		controllerutil.SetOwnerReference(m.ownObj.(metav1.Object), pod, m.scheme)
		m.e.Create(event.CreateEvent{Object: pod}, m.ctl.(*cc.Controller).Queue)
	})

	return r.Run(":8081")
}

var _ manager.Runnable = &MyWeb{}