traefik-provider

providers.png provider是traefik配置的动态更新,提供了很多种方式,如我们调试用的文件,k8s上的ingress,gateway,crd方式,consul等, 原理其实都差不多,本文我们举例k8s ingress providers的实现。 通过分析代码后,主要分为以下几个模块

  • watcher:主逻辑,关联了所有的provider,以及通过如下3个函数实现配置的监听,通知,以及变更
  • listener:配置变更的具体逻辑,调用各个模块创建的函数
  • Provide:负责监听各种类型配置变更方法,本文将从ingress分析

watcher启动主逻辑

以下为watch启动的主逻辑,通过AddListener将config改变后需要执行的方法添加到数组里,省略了部分AddListener方法

//cmd/traefik/traefik.go
watcher := server.NewConfigurationWatcher(
    routinesPool,
    providerAggregator, //配置文件中的providers,判断开启了哪几种类型的provider
    getDefaultsEntrypoints(staticConfiguration),
    "internal",
)
// Server Transports
watcher.AddListener(func(conf dynamic.Configuration) {
    roundTripperManager.Update(conf.HTTP.ServersTransports)
    dialerManager.Update(conf.TCP.ServersTransports)
})

// Switch router
watcher.AddListener(switchRouter(routerFactory, serverEntryPointsTCP, serverEntryPointsUDP))

//pkg/server/server.go
//watch启动逻辑
s.watcher.Start()
//pkg/server/configurationwatcher.go
func (c *ConfigurationWatcher) Start() {
    c.routinesPool.GoCtx(c.receiveConfigurations)
    c.routinesPool.GoCtx(c.applyConfigurations)
    c.startProviderAggregator()
}

其中,主要为以下3个函数处理watcher的完整逻辑

  • c.receiveConfigurations: 接收provider发现的配置变更,通知c.applyConfigurations
  • c.applyConfigurations:调用watcher.AddListener里配置的方法进行配置变更
  • c.startProviderAggregator:根据配置文件中provider监听对应类型的config是否变更,变更将配置发送给c.receiveConfigurations

c.startProviderAggregator

我们以k8s,ingress举例,将k8s ingress/services/endpoint类型添加增删改事件,收到事件变更通知后去拉一份最新配置下来,对比上一份配置的hash
有变更的情况下发给函数的参数configurationChan,也就是c.allProvidersConfigs

//pkg/server/configurationwatcher.go
func (c *ConfigurationWatcher) startProviderAggregator() {
    safe.Go(func() {
        err := c.providerAggregator.Provide(c.allProvidersConfigs, c.routinesPool)
    })
}
//pkg/provider/kubernetes/ingress/kubernetes.go
func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.Pool) error {
    k8sClient, err := p.newK8sClient(ctxLog)
    pool.GoCtx(func(ctxPool context.Context) {
        operation := func() error {
            //事件通知,内部逻辑为监听了ingress,services,endpoint增删改,有变更事件会发送到eventsChan
            eventsChan, err := k8sClient.WatchAll(p.Namespaces, ctxPool.Done())
            throttleDuration := time.Duration(p.ThrottleDuration)
            throttledChan := throttleEvents(ctxLog, throttleDuration, pool, eventsChan)
            if throttledChan != nil {
                eventsChan = throttledChan
            }
    
            for {
                select {
                case <-ctxPool.Done():
                    return nil
                //判断到有事件变更,将k8s对象转换成traefik conf
                case event := <-eventsChan:
                    conf := p.loadConfigurationFromIngresses(ctxLog, k8sClient)
                    confHash, err := hashstructure.Hash(conf, nil)
                    switch {
                    //如果配置的hash跟之前配置一致则跳过
                    case p.lastConfiguration.Get() == confHash:
                        logger.Debug().Msgf("Skipping Kubernetes event kind %T", event)
                    default:
                        p.lastConfiguration.Set(confHash)
                        //将消息发送到configurationChan,也就是传进来的chan上
                        configurationChan <- dynamic.Message{
                            ProviderName:  "kubernetes",
                            Configuration: conf,
                        }
                    }
    
                    time.Sleep(throttleDuration)
                }
            }
        }
    })
    
    return nil
}

c.receiveConfigurations

provider将消息发送给c.allProvidersConfigs后,进行过滤校验等处理后发到c.newConfigs,同时由c.applyConfigurations最终处理

//pkg/server/configurationwatcher.go
//省略了多余的代码
func (c *ConfigurationWatcher) receiveConfigurations(ctx context.Context) {
	newConfigurations := make(dynamic.Configurations)
	var output chan dynamic.Configurations
	for {
		select {
		case output <- newConfigurations.DeepCopy():
			output = nil
		default:
			select {
			case configMsg, ok := <-c.allProvidersConfigs:
				newConfigurations[configMsg.ProviderName] = configMsg.Configuration.DeepCopy()
				output = c.newConfigs
			case output <- newConfigurations.DeepCopy():
				output = nil
			}
		}
	}
}

c.applyConfigurations

c.newConfigs收到配置后,最终调用最开始在主函数里watcher.addlistener里添加的函数,由各个模块设置的配置处理函数自行处理

//pkg/server/configurationwatcher.go
func (c *ConfigurationWatcher) applyConfigurations(ctx context.Context) {
	var lastConfigurations dynamic.Configurations
	for {
		select {
		case <-ctx.Done():
			return
		case newConfigs, ok := <-c.newConfigs:
			conf := mergeConfiguration(newConfigs.DeepCopy(), c.defaultEntryPoints)
			conf = applyModel(conf)
			for _, listener := range c.configurationListeners {
				listener(conf)
			}
			lastConfigurations = newConfigs
		}
	}
}