Operator First

简单的反代功能 #

第三方库

# Go
# fasthttp
# nginx caddy openr

安装fasthttp

go get github.com/yeqown/fasthttp-reverse-proxy/v2
func ProxyHandler(ctx *fasthttp.RequestCtx){
  jtthink.ServeHTTP(ctx)
  ctx.Response.Header.Add("myname","shenyi")
}
var jtthink=proxy.NewReverseProxy("www.jtthink.com")

func main(){
  
  fasthttp.ListenAndServe(":80",ProxyHandler)
}

初步集成k8s runtime库、配置文件的设计 #

装库

go get sigs.k8s.io/controller-runtime@v0.9.6

创建一个app.yaml的配置文件

server:
  port: 80

ingress:
  rules:
    - http:
        paths:
          - path: /
            pathType: 
            backend:
              service:
                name: test
                port:
                  number: 80

接着创建一个pkg/sysinit/config.go的配置文件解析类来匹配这个yaml

type Server struct{
  Port int
}
type SysConfigStruct struct{
  Server Server
  Ingress v1.IngressSpec
}

var SysConfig = new(SysConfigStruct)

func InitConfig(){
  config,err := ioutil.ReadFile("./app.yaml")
  if err != nil{
    log.Fatal(err)
  }
  err = yaml.Unmarshal(config,SysConfig)
  if err != nil{
    log.Fatal(err)
  }
  
}

路由解析(1)结合mux初步完成路由解析 #

安装库

go get -u github.com/gorilla/mux

创建/sysinit/rules.go

type ProxyHandler struct{
  Proxy *proxy.ReverseProxy
}

func(this *ProxyHandler) ServeHTTP(http.ResponseWriter,*http.Request){}

var MyRouter *mux Router

func init(){
  MyRouter=mux.NewRouter()
}

func ParseRule(){
  for _,rule := range SysConfig.Ingress.Rules{
    for _, path := range rule.HTTP.paths{
      rProxy := proxy.NewReverseProxy(
        fmt.Sprintf("%s:%d",path.Backend.Service.Name,path.Backend.Service.Port.Number))
      MyRouter.NewRoute().Path(path.Path).
      Methods("GET","POST","PUT","DELETE","OPTIONS").
      Handler(&ProxyHandler{Proxy:rProxy})
    }
  }
}

func GetRoute(req fasthttp.Request)*proxy.ReverseProxy{
  match:=&mux.RouteMatch{}
  httpReq:=&http.Request{URL:&url.URL{Path:string(req.URI().Path())},Method:string(req.Header.Method)}
  if MyRouter.Match(httpReq,match){
    return match.Handler.(*ProxyHandler).Proxy
  }
  return nil
}
func ProxyHandler(ctx *fasthttp.RequestCtx){
  if getProxy:=sysinit.GetRoute(ctx.Request);getProxy!=nil{
    getProxy.ServeHTTP(ctx)
  }else{
    ctx.Response.SetStatusCode(404)
    ctx.Response.SetBodyString("404...")
  }
  
 
}
var jtthink=proxy.NewReverseProxy("www.jtthink.com")

func main(){
  
  fasthttp.ListenAndServe(":80",ProxyHandler)
}

路由解析(2)支持前缀匹配 #

type ProxyHandler struct{
  Proxy *proxy.ReverseProxy
}

func(this *ProxyHandler) ServeHTTP(http.ResponseWriter,*http.Request){}

var MyRouter *mux Router

func init(){
  MyRouter=mux.NewRouter()
}

func ParseRule(){
  for _,rule := range SysConfig.Ingress.Rules{
    for _, path := range rule.HTTP.paths{
      rProxy := proxy.NewReverseProxy(
        fmt.Sprintf("%s:%d",path.Backend.Service.Name,path.Backend.Service.Port.Number))
      if path.PathType != nil && *path.PathType == v1.PathTypeExact{
        MyRouter.NewRoute().Path(path.Path).
      	Methods("GET","POST","PUT","DELETE","OPTIONS").
      	Handler(&ProxyHandler{Proxy:rProxy})
      }else{
        MyRouter.NewRoute().PathPrefix(path.Path).
      	Methods("GET","POST","PUT","DELETE","OPTIONS").
      	Handler(&ProxyHandler{Proxy:rProxy})
      }
      
      
    }
  }
}

func GetRoute(req fasthttp.Request)*proxy.ReverseProxy{
  match:=&mux.RouteMatch{}
  httpReq:=&http.Request{URL:&url.URL{Path:string(req.URI().Path())},Method:string(req.Header.Method)}
  if MyRouter.Match(httpReq,match){
    return match.Handler.(*ProxyHandler).Proxy
  }
  return nil
}

路由解析(3)支持Host匹配 #

ingress可以多配置

server:
  port: 80

ingress:
  - apiVersion: networking.k8s.io/v1
    kind: Ingress
    metadata:
      name: ingress-myservice
      annotations:
        kubernetes.io/ingress.class: "nginx"
    spec:
      rules:
        - host: xxx.com
          http:
            paths:
              - path: /
                pathType: 
                backend:
                  service:
                    name: test
                    port:
                      number: 80

其实就是加了Host选项,注意这里的代码有更新。拿到了课件再做修改。

package sysinit

import (
	"github.com/gorilla/mux"
	"net/http"
)
//本课程来自程序员在囧途(www.jtthink.com)咨询群:98514334
//route构建器, build 方法必须要执行
var MyRouter *mux.Router

func init() {
	MyRouter=mux.NewRouter()
}
type RouteBuilder struct {
	route *mux.Route
}
func NewRouteBuilder() *RouteBuilder {
	return &RouteBuilder{route: MyRouter.NewRoute()}
}
//本课程来自程序员在囧途(www.jtthink.com)咨询群:98514334
func(this *RouteBuilder) SetPath(path string ,exact bool ) *RouteBuilder{
	if exact{
		 this.route.Path(path)
	}else{
		 this.route.PathPrefix(path)
	}
   return this
}
//第二个参数是故意的,方便调用时 传入 条件,省的外面写 if else
func(this *RouteBuilder) SetHost(host string, set bool ) *RouteBuilder{
	if set{
		this.route.Host(host)
	}
	 return this
}
func(this *RouteBuilder) Build(handler http.Handler)  {
	 this.route.
		Methods("GET","POST","PUT","DELETE","OPTIONS").
		Handler(handler)
}
//本课程来自程序员在囧途(www.jtthink.com)咨询群:98514334
package sysinit

import (
	"fmt"
	"github.com/gorilla/mux"
	"github.com/valyala/fasthttp"
	"github.com/yeqown/fasthttp-reverse-proxy/v2"
	v1 "k8s.io/api/networking/v1"
	"net/http"
	"net/url"
)
type ProxyHandler struct {
	Proxy *proxy.ReverseProxy  // proxy对象。 保存proxy
}
//空函数没啥用
func(this *ProxyHandler) ServeHTTP(http.ResponseWriter, *http.Request){}


//解析配置文件中的rules, 初始化 路由
func ParseRule()  {
	//现在要循环 遍历
	for _,ingress:=range SysConfig.Ingress{
		for _,rule:= range   ingress.Spec.Rules{
			for _,path:=range rule.HTTP.Paths {
				//构建 反代对象
				rProxy:=proxy.NewReverseProxy(
					fmt.Sprintf("%s:%d",path.Backend.Service.Name,path.Backend.Service.Port.Number))
				//本课程来自程序员在囧途(www.jtthink.com)咨询群:98514334
				routeBud:=NewRouteBuilder()

			    routeBud.
					SetPath(path.Path,path.PathType!=nil && *path.PathType==v1.PathTypeExact).
					SetHost(rule.Host,rule.Host!="").
					Build(&ProxyHandler{Proxy: rProxy})



			}
		}
	}

}

// 获取路由   (先匹配 请求path ,如果匹配到 ,会返回 对应的proxy 对象)
func GetRoute(req fasthttp.Request)*proxy.ReverseProxy{
	match:=&mux.RouteMatch{}
	httpReq:=&http.Request{
		URL:&url.URL{Path:  string(req.URI().Path())},
		Method:string(req.Header.Method()),
		Host: string(req.Header.Host()),
	}
	if MyRouter.Match(httpReq,match){
		return match.Handler.(*ProxyHandler).Proxy
	}
	return  nil
}
//本课程来自程序员在囧途(www.jtthink.com)咨询群:98514334

路由解析(4)支持路径重写 #

server:
  port: 80

ingress:
 - apiVersion: networking.k8s.io/v1
   kind: Ingress
   metadata:
    name: ingress-myservicea
    annotations:
      nginx.ingress.kubernetes.io/rewrite-target: /$1
      kubernetes.io/ingress.class: "nginx"
   spec:
    rules:
       - host: aabb.com
         http:
          paths:
            - path: /jtthink/{(.*)}
              backend:
                service:
                  name: www.jtthink.com
                  port:
                    number: 80
            - path: /baidu/{(.*)}
              backend:
                service:
                  name: www.baidu.com
                  port:
                    number: 80

过滤器commcon.go

package filters

import (
	"github.com/valyala/fasthttp"
	"reflect"
)
//所有过滤器 的接口
type ProxyFileter interface {
	SetValue(values ...string)
	Do(ctx *fasthttp.RequestCtx)
}
type ProxyFilters []ProxyFileter
func(this ProxyFilters) Do(ctx *fasthttp.RequestCtx){
	for _,filter:=range this {
		filter.Do(ctx)
	}
}
var FileterList=map[string]ProxyFileter{}

//注册过滤器
func registerFilter(key string ,filter ProxyFileter)  {
	FileterList[key]=filter
}
func init() {

}
//检查注解是否 和预设的 过滤器 匹配
func CheckAnnotations(annos map[string]string,exts ...string  ) []ProxyFileter{
	fileters:=[]ProxyFileter{}
	for anno_key,anno_value:=range annos{
		for filter_key,filterReflect:=range FileterList{
			if anno_key==filter_key{
				t:=reflect.TypeOf(filterReflect)
				if t.Kind()==reflect.Ptr{
					t=t.Elem()
				}
				filter:=reflect.New(t).Interface().(ProxyFileter)
				params:=[]string{anno_value}
				params=append(params,exts...)
				filter.SetValue(params...)
				fileters=append(fileters,filter)
			}
		}
	}
	return fileters
}

rewrite.go

package filters

import (
	"github.com/valyala/fasthttp"
	"log"
	"regexp"
	"strings"
)
const RewriteAnnotation="nginx.ingress.kubernetes.io/rewrite-target"

func init() {
	registerFilter(RewriteAnnotation,(*RewriteFilter)(nil) )
}
type RewriteFilter struct {
    pathValue string
    target string
}
//可变参数。第1个是 rewrie-target:的值 如 /$1  第2个是 原始的path 值 ,如/jtthink/{(.*)}
func(this *RewriteFilter) SetValue(values ...string){
	if len(values)!=2{
		return
	}
	this.target=values[0]
	this.pathValue=values[1]
	this.pathValue=strings.Replace(this.pathValue,"{","",-1)
	this.pathValue=strings.Replace(this.pathValue,"}","",-1)
}
func(this *RewriteFilter) Do(ctx *fasthttp.RequestCtx){
	getUrl:=ctx.Request.URI().String() //获取 请求URL  譬如  /jtthink/users
	reg,err:=regexp.Compile(this.pathValue)
	if err!=nil{
		log.Println(err)
		return
	}

	getUrl=reg.ReplaceAllString(getUrl,this.target)

	ctx.Request.SetRequestURI(getUrl)
	if err!=nil{
		log.Println(err)
		return
	}

增加请求头过滤器 #

package filters

import (
	"github.com/valyala/fasthttp"
	"strings"
)
const AddRequestHeaderAnnotation=AnnotationPrefix+"/add-request-header"

func init() {
	registerFilter(AddRequestHeaderAnnotation,(*AddRequestHeaderFilter)(nil) )
}
type AddRequestHeaderFilter struct {
    pathValue string
    target string  //注解  值
    path string
}
func(this *AddRequestHeaderFilter) SetPath(value  string){}
//可变参数。第1个是 注解值:的值 如 /$1
func(this *AddRequestHeaderFilter) SetValue(values ...string){
	this.target=values[0]
}
func(this *AddRequestHeaderFilter) Do(ctx *fasthttp.RequestCtx){
	 kvList:=strings.Split(this.target,";")
	 for _,kv:=range kvList{
	 	k_v:=strings.Split(kv,"=")
	 	if len(k_v)==2{
			ctx.Request.Header.Add(k_v[0],k_v[1])
		}
	 }

}

添加响应头过滤器 #

package filters

import (
	"github.com/valyala/fasthttp"
	"strings"
)
const AddReponseHeaderAnnotation=AnnotationPrefix+"/add-response-header"

func init() {
	//注册响应 过滤器
	registerReponseFilter(AddReponseHeaderAnnotation,(*AddResponseHeaderFilter)(nil) )
}
type AddResponseHeaderFilter struct {
    pathValue string
    target string  //注解  值
    path string
}
func(this *AddResponseHeaderFilter) SetPath(value  string){}
//可变参数。第1个是 注解值:的值 如 /$1
func(this *AddResponseHeaderFilter) SetValue(values ...string){
	this.target=values[0]
}
func(this *AddResponseHeaderFilter) Do(ctx *fasthttp.RequestCtx){
	 kvList:=strings.Split(this.target,";")
	 for _,kv:=range kvList{
	 	k_v:=strings.Split(kv,"=")
	 	if len(k_v)==2{
			ctx.Response.Header.Add(k_v[0],k_v[1])
		}
	 }

}

创建最基本的控制器 #

package k8sconfig

import (
	"context"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

//@Controller
type JtProxyController struct {
	client.Client
}

func NewJtProxyController() *JtProxyController {
	return &JtProxyController{}
}
func (r *JtProxyController) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {

	return reconcile.Result{}, nil
}
func (r *JtProxyController) InjectClient(c client.Client) error {
	r.Client = c
	return nil
}

发布Ingress、自定义控制器接收 #

package k8sconfig

import (
	"context"
	"fmt"
	v1 "k8s.io/api/networking/v1"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

//@Controller
type JtProxyController struct {
	client.Client
}

func NewJtProxyController() *JtProxyController {
	return &JtProxyController{}
}
func (r *JtProxyController) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
	obj:=&v1.Ingress{}
	err:=r.Get(ctx,req.NamespacedName,obj)
	if err!=nil{
		return reconcile.Result{}, err
	}
	if v,ok:=obj.Annotations["kubernetes.io/ingress.class"];ok && v=="jtthink"{
		fmt.Println(obj)
	}


	return reconcile.Result{}, nil
}
func (r *JtProxyController) InjectClient(c client.Client) error {
	r.Client = c
	return nil
}

准备工作、网关和控制器同时启动 #

package main

import (
	"fmt"
	"github.com/valyala/fasthttp"
	"jtproxy/pkg/filters"
	"jtproxy/pkg/k8sconfig"
	"jtproxy/pkg/sysinit"
	"log"
	"os"
	"sigs.k8s.io/controller-runtime/pkg/builder"
	logf "sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/log/zap"
	"sigs.k8s.io/controller-runtime/pkg/manager"
	"sigs.k8s.io/controller-runtime/pkg/manager/signals"
)
func ProxyHandler(ctx *fasthttp.RequestCtx){
	//代表匹配到了 path
	if getProxy:=sysinit.GetRoute(ctx.Request);getProxy!=nil{
		filters.ProxyFilters(getProxy.RequestFilters).Do(ctx) //过滤
		getProxy.Proxy.ServeHTTP(ctx) //反代
		filters.ProxyFilters(getProxy.ResponseFilters).Do(ctx) //过滤
	}else{
		ctx.Response.SetStatusCode(404)
		ctx.Response.SetBodyString("404...")
	}


}
func main() {
	logf.SetLogger(zap.New())
	var mylog=logf.Log.WithName("jtproxy")
	mgr, err := manager.New(k8sconfig.K8sRestConfig(), manager.Options{})
	if err != nil {
		mylog.Error(err, "unable to set up manager")
		os.Exit(1)
	}
	//++
	err=k8sconfig.SchemeBuilder.AddToScheme(mgr.GetScheme())
	if err != nil {
		mylog.Error(err, "unable add schema")
		os.Exit(1)
	}

	err=builder.ControllerManagedBy(mgr).
		//For(&v1.Ingress{}).Complete(k8sconfig.NewJtProxyController())
		For(&k8sconfig.Route{}).Complete(k8sconfig.NewJtProxyController())
	if err != nil {
		mylog.Error(err, "unable to create manager")
		os.Exit(1)
	}
	sysinit.InitConfig()  //初始化  业务系统配置
	errCh:=make(chan error)
	go func() {
		//启动控制器管理器
		if err=mgr.Start(signals.SetupSignalHandler());err!=nil{
			errCh<-err
		}
	}()
	go func() {
		 // 启动网关
		 if err=fasthttp.ListenAndServe(fmt.Sprintf(":%d", sysinit.SysConfig.Server.Port),ProxyHandler);err!=nil{
			 errCh<-err
		 }
	}()
	getError:=<-errCh
	log.Println(getError.Error())

}

发布自己的ingress、控制器监听和持久化配置 #

main.go

package main

import (
	"fmt"
	"github.com/valyala/fasthttp"
	"jtproxy/pkg/filters"
	"jtproxy/pkg/k8sconfig"
	"jtproxy/pkg/sysinit"
	v1 "k8s.io/api/networking/v1"
	"log"
	"os"
	"sigs.k8s.io/controller-runtime/pkg/builder"
	logf "sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/log/zap"
	"sigs.k8s.io/controller-runtime/pkg/manager"
	"sigs.k8s.io/controller-runtime/pkg/manager/signals"
)
func ProxyHandler(ctx *fasthttp.RequestCtx){
	//代表匹配到了 path
	if getProxy:=sysinit.GetRoute(ctx.Request);getProxy!=nil{
		filters.ProxyFilters(getProxy.RequestFilters).Do(ctx) //过滤
		getProxy.Proxy.ServeHTTP(ctx) //反代
		filters.ProxyFilters(getProxy.ResponseFilters).Do(ctx) //过滤
	}else{
		ctx.Response.SetStatusCode(404)
		ctx.Response.SetBodyString("404...")
	}


}
func main() {
	logf.SetLogger(zap.New())
	var mylog=logf.Log.WithName("jtproxy")
	mgr, err := manager.New(k8sconfig.K8sRestConfig(), manager.Options{})
	if err != nil {
		mylog.Error(err, "unable to set up manager")
		os.Exit(1)
	}
	//++
	err=k8sconfig.SchemeBuilder.AddToScheme(mgr.GetScheme())
	if err != nil {
		mylog.Error(err, "unable add schema")
		os.Exit(1)
	}

	err=builder.ControllerManagedBy(mgr).
		For(&v1.Ingress{}).Complete(k8sconfig.NewJtProxyController())
	//For(&k8sconfig.Route{}).Complete(k8sconfig.NewJtProxyController())
	if err != nil {
		mylog.Error(err, "unable to create manager")
		os.Exit(1)
	}
	sysinit.InitConfig()  //初始化  业务系统配置
	errCh:=make(chan error)
	go func() {
		//启动控制器管理器
		if err=mgr.Start(signals.SetupSignalHandler());err!=nil{
			errCh<-err
		}
	}()
	go func() {
		 // 启动网关
		 if err=fasthttp.ListenAndServe(fmt.Sprintf(":%d", sysinit.SysConfig.Server.Port),ProxyHandler);err!=nil{
			 errCh<-err
		 }
	}()
	getError:=<-errCh
	log.Println(getError.Error())
}

config.go

package sysinit

import (
	"io/ioutil"
	"k8s.io/api/networking/v1"
	"log"
	"os"
	"sigs.k8s.io/yaml"
)

type Server struct {
	Port int   //代表是代理启动端口
}
type SysConfigStruct struct {
	Server Server
	Ingress []v1.Ingress
}
var SysConfig =new(SysConfigStruct)
func InitConfig()  {
	config,err:=ioutil.ReadFile("./app.yaml")
	if err!=nil{
		log.Fatal(err)
	}
	err=yaml.Unmarshal(config,SysConfig)
	if err!=nil{
		log.Fatal(err)
	}
   ParseRule()

}
//更新 ingress对象 配置 ,更新内存配置 和 配置持久化
func ApplyConfig(ingress *v1.Ingress) error   {
  isEdit:=false
  for index,config:=range SysConfig.Ingress{
  	   if config.Name==ingress.Name && config.Namespace==ingress.Namespace{
		   SysConfig.Ingress[index]=*ingress
		   isEdit=true
		   break
	   }
  }
  if !isEdit{
	  SysConfig.Ingress=append(SysConfig.Ingress,*ingress)
  }
  b,_:=yaml.Marshal(SysConfig)
  appYamlFile,err:=os.OpenFile("./app.yaml",os.O_CREATE|os.O_TRUNC|os.O_WRONLY,0644)
	if err != nil {
		return err
	}
	defer appYamlFile.Close()
   _,err=appYamlFile.Write(b)
	return err
}

controller.go

func (r *JtProxyController) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
	ingress:=&v1.Ingress{}
	err:=r.Get(ctx,req.NamespacedName,ingress)
	if err!=nil{
		return reconcile.Result{}, err
	}
	if v,ok:=ingress.Annotations["kubernetes.io/ingress.class"];ok && v=="jtthink"{
		 err=sysinit.ApplyConfig(ingress)
		 if err!=nil{
			 return reconcile.Result{}, err
		 }
	}
	return reconcile.Result{}, nil
}

发布Ingress出发配置重载 #

config.go

package sysinit

import (
	"io/ioutil"

	"github.com/gorilla/mux"
	v1 "k8s.io/api/networking/v1"

	"os"

	"sigs.k8s.io/yaml"
)

type Server struct {
	Port int //代表是代理启动端口
}
type SysConfigStruct struct {
	Server  Server
	Ingress []v1.Ingress
}

var SysConfig = new(SysConfigStruct)

func InitConfig() error {
	config, err := ioutil.ReadFile("./app.yaml")
	if err != nil {
		return err
	}
	err = yaml.Unmarshal(config, SysConfig)
	if err != nil {
		return err
	}
	ParseRule()
	return nil

}

// 更新 ingress对象 配置 ,更新内存配置 和 配置持久化
func ApplyConfig(ingress *v1.Ingress) error {
	isEdit := false
	for index, config := range SysConfig.Ingress {
		if config.Name == ingress.Name && config.Namespace == ingress.Namespace {
			SysConfig.Ingress[index] = *ingress
			isEdit = true
			break
		}
	}
	if !isEdit {
		SysConfig.Ingress = append(SysConfig.Ingress, *ingress)
	}
	b, _ := yaml.Marshal(SysConfig)
	appYamlFile, err := os.OpenFile("./app.yaml", os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
	if err != nil {
		return err
	}
	defer appYamlFile.Close()
	_, err = appYamlFile.Write(b)
	if err != nil {
		return err
	}

	return ReloadConfig()
}

// 重载配置
func ReloadConfig() error {
	MyRouter = mux.NewRouter()

	return InitConfig()
}
package main

import (
	"fmt"
	"github.com/valyala/fasthttp"
	"jtproxy/pkg/filters"
	"jtproxy/pkg/k8sconfig"
	"jtproxy/pkg/sysinit"
	v1 "k8s.io/api/networking/v1"
	"log"
	"os"
	"sigs.k8s.io/controller-runtime/pkg/builder"
	logf "sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/log/zap"
	"sigs.k8s.io/controller-runtime/pkg/manager"
	"sigs.k8s.io/controller-runtime/pkg/manager/signals"
)
func ProxyHandler(ctx *fasthttp.RequestCtx){
	//代表匹配到了 path
	if getProxy:=sysinit.GetRoute(ctx.Request);getProxy!=nil{
		filters.ProxyFilters(getProxy.RequestFilters).Do(ctx) //过滤
		getProxy.Proxy.ServeHTTP(ctx) //反代
		filters.ProxyFilters(getProxy.ResponseFilters).Do(ctx) //过滤
	}else{
		ctx.Response.SetStatusCode(404)
		ctx.Response.SetBodyString("404...")
	}


}

func main() {
	logf.SetLogger(zap.New())
	var mylog=logf.Log.WithName("jtproxy")
	mgr, err := manager.New(k8sconfig.K8sRestConfig(), manager.Options{})
	if err != nil {
		mylog.Error(err, "unable to set up manager")
		os.Exit(1)
	}
	//++
	err=k8sconfig.SchemeBuilder.AddToScheme(mgr.GetScheme())
	if err != nil {
		mylog.Error(err, "unable add schema")
		os.Exit(1)
	}

	err=builder.ControllerManagedBy(mgr).
		For(&v1.Ingress{}).
		Complete(k8sconfig.NewJtProxyController())
	//For(&k8sconfig.Route{}).Complete(k8sconfig.NewJtProxyController())
	if err != nil {
		mylog.Error(err, "unable to create manager")
		os.Exit(1)
	}
	if err=sysinit.InitConfig();err!=nil{//初始化  业务系统配置
		mylog.Error(err, "unable to load sysconfig")
		os.Exit(1)
	}
	errCh:=make(chan error)
	go func() {
		//启动控制器管理器
		if err=mgr.Start(signals.SetupSignalHandler());err!=nil{
			errCh<-err
		}
	}()
	go func() {
		 // 启动网关
		 if err=fasthttp.ListenAndServe(fmt.Sprintf(":%d", sysinit.SysConfig.Server.Port),ProxyHandler);err!=nil{
			 errCh<-err
		 }

	}()
	getError:=<-errCh
	log.Println(getError.Error())
}

删除ingress触发网关配置重载 #

	proxyCtl:=k8sconfig.NewJtProxyController()
	err=builder.ControllerManagedBy(mgr).
			For(&v1.Ingress{}).
		    Watches(&source.Kind{
									Type: &v1.Ingress{},
								},
								handler.Funcs{DeleteFunc: proxyCtl.OnDelete}).
		    Complete(proxyCtl)
package k8sconfig

import (
	"context"
	"jtproxy/pkg/sysinit"
	v1 "k8s.io/api/networking/v1"
	"k8s.io/client-go/util/workqueue"
	"log"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/event"
	"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

//@Controller
type JtProxyController struct {
	client.Client
}

func NewJtProxyController() *JtProxyController {
	return &JtProxyController{}
}
func (r *JtProxyController) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
	ingress:=&v1.Ingress{}

	err:=r.Get(ctx,req.NamespacedName,ingress)
	if err!=nil{
		return reconcile.Result{}, err
	}
	if r.IsJtProxy(ingress.Annotations){
		 err=sysinit.ApplyConfig(ingress)
		 if err!=nil{
			 return reconcile.Result{}, err
		 }
	}
	return reconcile.Result{}, nil
}
func (r *JtProxyController) InjectClient(c client.Client) error {
	r.Client = c
	return nil
}
//判断是否 是否 我们所需要处理的ingress
func (r *JtProxyController) IsJtProxy(annotations map[string]string) bool {
	 if   v,ok:=annotations["kubernetes.io/ingress.class"];ok && v=="jtthink"{
		 return true
	}
	return false
}
func (r *JtProxyController) OnDelete(event event.DeleteEvent, limitingInterface workqueue.RateLimitingInterface){
	if r.IsJtProxy(event.Object.GetAnnotations()){
		if err:=sysinit.DeleteConfig(event.Object.GetName(),event.Object.GetNamespace());err!=nil{
			log.Println(err)
		}
	}
}

手工简单部署打包镜像 #

FROM golang:1.15-alpine3.12 as builder
RUN mkdir /src
RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories
ADD . /src
WORKDIR /src
RUN  GOPROXY=https://goproxy.cn go build -o jtproxy test.go  && chmod +x jtproxy


FROM alpine:3.12
ENV ZONEINFO=/app/zoneinfo.zip
RUN mkdir /app
WORKDIR /app

COPY --from=builder /usr/local/go/lib/time/zoneinfo.zip /app

COPY --from=builder /src/jtproxy /app

ENTRYPOINT  ["./jtproxy"]
EXPOSE 80

开发时测试部署、体内访问apiserver #

init.go

package k8sconfig

import (
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"log"
	"os"
)


//POD里  体内
func K8sRestConfigInPod() *rest.Config{

	config,err:=rest.InClusterConfig()
	if err!=nil{
		log.Fatal(err)
	}
	return config
}
// 获取 config对象
func K8sRestConfig() *rest.Config{
	if os.Getenv("release")=="1"{ //自定义环境
		log.Println("run in cluster")
		return K8sRestConfigInPod()
	}
	log.Println("run outside cluster")
	config, err := clientcmd.BuildConfigFromFlags("","./resources/config" )
	if err!=nil{
	   log.Fatal(err)
	}
	config.Insecure=true
	return config
}

rbac.yaml

apiVersion: v1
kind: ServiceAccount
metadata:
  name: jtproxy-sa
  namespace: jtthink-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: jtproxy-clusterrole
rules:
  - apiGroups:
      - ""
    resources:
      - namespaces
    verbs:
      - get
  - apiGroups:
      - ""
    resources:
      - configmaps
      - pods
      - secrets
      - endpoints
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - ""
    resources:
      - services
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - networking.k8s.io
    resources:
      - ingresses
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - networking.k8s.io
    resources:
      - ingresses/status
    verbs:
      - update
  - apiGroups:
      - networking.k8s.io
    resources:
      - ingressclasses
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - ""
    resources:
      - configmaps
    verbs:
      - create
  - apiGroups:
      - ""
    resources:
      - events
    verbs:
      - create
      - patch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: jtproxy-ClusterRoleBinding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: jtproxy-clusterrole
subjects:
  - kind: ServiceAccount
    name: jtproxy-sa
    namespace: jtthink-system

deploy.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: jtproxy-controller
  namespace: jtthink-system
spec:
  selector:
    matchLabels:
      app: jtproxy-controller
  replicas: 1
  template:
    metadata:
      labels:
        app: jtproxy-controller
    spec:
      nodeName: jtthink2
      serviceAccountName: jtproxy-sa
      containers:
        - name: jtproxy
          image: golang:1.15-alpine3.12
          imagePullPolicy: IfNotPresent
          env:
            - name: "GOPROXY"
              value: "https://goproxy.cn"
            - name: "release"
              value: "1"
          workingDir: "/app"
          command: ["go","run","test.go"]
          volumeMounts:
            - name: app
              mountPath: /app
            - name: gopath
              mountPath: /go
          ports:
            - containerPort: 80
      volumes:
        - name: app
          hostPath:
             path: /home/shenyi/jtingress
        - name: gopath
          hostPath:
            path: /home/shenyi/gopath
---
apiVersion: v1
kind: Service
metadata:
  name: jtproxy-svc
  namespace: jtthink-system
spec:
  type: NodePort
  ports:
    - port: 80
      targetPort: 80
      nodePort: 31180
  selector:
    app: jtproxy-controller
---

为ingress显示ip address #

controller.go

package k8sconfig

import (
	"context"
	"jtproxy/pkg/sysinit"
	corev1 "k8s.io/api/core/v1"
	v1 "k8s.io/api/networking/v1"
	"k8s.io/client-go/util/workqueue"
	"log"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/event"
	"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

//@Controller
type JtProxyController struct {
	client.Client
}

func NewJtProxyController() *JtProxyController {
	return &JtProxyController{}
}
func (r *JtProxyController) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
	ingress:=&v1.Ingress{}

	err:=r.Get(ctx,req.NamespacedName,ingress)
	if err!=nil{
		return reconcile.Result{}, err
	}
	if r.IsJtProxy(ingress.Annotations){

		err=sysinit.ApplyConfig(ingress)
		if err!=nil{
			return reconcile.Result{}, err
		}
		//本课时新增 ,修改IP
		if ingress.Status.LoadBalancer.Ingress==nil{
			ingress.Status.LoadBalancer.Ingress=[]corev1.LoadBalancerIngress{
				{IP: ServiceIP},
			}
			err = r.Status().Update(ctx, ingress)
			if err!=nil{
				return reconcile.Result{}, err
			}
		}

	}
	return reconcile.Result{}, nil
}
func (r *JtProxyController) InjectClient(c client.Client) error {
	r.Client = c
	return nil
}
//判断是否 是否 我们所需要处理的ingress
func (r *JtProxyController) IsJtProxy(annotations map[string]string) bool {
	 if   v,ok:=annotations["kubernetes.io/ingress.class"];ok && v=="jtthink"{
		 return true
	}
	return false
}
func (r *JtProxyController) OnDelete(event event.DeleteEvent, limitingInterface workqueue.RateLimitingInterface){
	if r.IsJtProxy(event.Object.GetAnnotations()){
		if err:=sysinit.DeleteConfig(event.Object.GetName(),event.Object.GetNamespace());err!=nil{
			log.Println(err)
		}
	}
}

initConfig.go

package k8sconfig

import (
	"context"
	"io/ioutil"
	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"log"
	"os"
)
//全局变量
var ServiceIP string
const JtProxySvcName="jtproxy-svc" //默认的固定的 Svc名称
const NSFile="/var/run/secrets/kubernetes.io/serviceaccount/namespace"
//用来获取 svc的ip
func init() {
	if os.Getenv("release")!="1"{ //本机情况
		ServiceIP="127.0.0.1"  //写着玩的。
		return
	}
	config:=K8sRestConfig()
	//pod里会有这样一个文件
	//  namespace 在这里 /var/run/secrets/kubernetes.io/serviceaccount/namespace
	ns,_:=ioutil.ReadFile(NSFile) //取到了 命名空间
	client,err:=kubernetes.NewForConfig(config) //clientset
	if err!=nil{
		log.Fatal(err)
	}
	svc,err:=client.CoreV1().
		Services(string(ns)).
		Get(context.Background(),JtProxySvcName,v1.GetOptions{})
	if err!=nil{
		log.Fatal(err)
	}
	ServiceIP=svc.Spec.ClusterIP // 获取到控制器svc对应的clusterip

}
//POD里  体内
func K8sRestConfigInPod() *rest.Config{

	config,err:=rest.InClusterConfig()
	if err!=nil{
		log.Fatal(err)
	}
	return config
}
// 获取 config对象
func K8sRestConfig() *rest.Config{
	if os.Getenv("release")=="1"{ //自定义环境
		log.Println("run in cluster")
		return K8sRestConfigInPod()
	}
	log.Println("run outside cluster")
	config, err := clientcmd.BuildConfigFromFlags("","./resources/config" )
	if err!=nil{
	   log.Fatal(err)
	}
	config.Insecure=true
	return config
}