[istio源码分析][galley] galley之runtime

1. 前言

转载请说明原文出处, 尊重他人劳动成果!

源码位置: https://github.com/nicktming/istio
分支: tming-v1.3.6 (基于1.3.6版本)

1. [istio源码分析][galley] galley之上游(source)
2. [istio源码分析][galley] galley之runtime
3. [istio源码分析][galley] galley之下游(mcp)

2. runtime

本文将着重分析galley中一个承上启下的component.

[root@master pkg]# pwd
/root/go/src/istio.io/istio/galley/pkg
[root@master pkg]# tree -L 1
.
├── authplugin
├── authplugins
├── config
├── crd
├── meshconfig
├── metadata
├── runtime
├── server
├── source
├── testing
└── util

11 directories, 0 files
[root@master pkg]# 

source会产生事件的源头, runtime负责接收source中的事件并交给下游处理. 本文的重点将放到runtime中.

3. source

// Source to be implemented by a source configuration provider.
type Source interface {
    // 开始方法 对k8s而言 就是开始监控一些crd资源交由handler处理
    Start(handler resource.EventHandler) error
    // 停止监控
    Stop()
}

4. processing

4.1 Dispatcher 和 handler

type Handler interface {
    Handle(e resource.Event)
}

type Dispatcher struct {
    handlers map[resource.Collection][]Handler
}
// Dispatcher是一个Handler的实现类
// 并且针对每一种collection 都有其对应的一系列handler
func (d *Dispatcher) Handle(e resource.Event) {
    handlers, found := d.handlers[e.Entry.ID.Collection]
    if !found {
        scope.Warnf("Unhandled resource event: %v", e)
        return
    }

    for _, h := range handlers {
        h.Handle(e)
    }
}

可以看到Dispatcher是以collection为类, 每个collection都有其对应的Handler数组. (collection就是一些crd的名字, 比如istio/networking/v1alpha3/virtualservices)

Dispatcher实现了Handler接口, 针对每一个event会找到其collection所有的handler一个个进行处理.

4.2 Listener

// Listener gets notified when resource of a given collection has changed.
type Listener interface {
    CollectionChanged(c resource.Collection)
}

当某一个collection发生变化时会触发该方法

5. state

// 记录着内存状态的galley
type State struct {
    listener processing.Listener
    config *Config
    // version counter is a nonce that generates unique ids for each updated view of State.
    versionCounter int64
    // entries for per-message-type State.
    entriesLock sync.Mutex
    entries     map[resource.Collection]*resourceTypeState

    // Virtual version numbers for Gateways & VirtualServices for Ingress projected ones
    ingressGWVersion   int64
    ingressVSVersion   int64
    lastIngressVersion int64
    // 等待被发布的事件个数
    pendingEvents int64
    // 上一次发布的时间
    lastSnapshotTime time.Time
}
type resourceTypeState struct {
    // 当前状态的version
    version  int64
    entries  map[resource.FullName]*mcp.Resource
    versions map[resource.FullName]resource.Version
}

5.1 Handler方法

func (s *State) Handle(event resource.Event) {
    pks, found := s.getResourceTypeState(event.Entry.ID.Collection)
    if !found {
        return
    }
    switch event.Kind {
    case resource.Added, resource.Updated:
        // Check to see if the version has changed.
        if curVersion := pks.versions[event.Entry.ID.FullName]; curVersion == event.Entry.ID.Version {
            log.Scope.Debugf("Received event for the current, known version: %v", event)
            return
        }
        // 将事件的entry转成mcp.Resource类型
        entry, ok := s.toResource(event.Entry)
        if !ok {
            return
        }
        // 保存当前对象内存中的值以及版本
        pks.entries[event.Entry.ID.FullName] = entry
        pks.versions[event.Entry.ID.FullName] = event.Entry.ID.Version
        monitoring.RecordStateTypeCount(event.Entry.ID.Collection.String(), len(pks.entries))
        monitorEntry(event.Entry.ID, true)

    case resource.Deleted:
        // 删除当前对象内存中的值以及版本
        delete(pks.entries, event.Entry.ID.FullName)
        delete(pks.versions, event.Entry.ID.FullName)
        monitoring.RecordStateTypeCount(event.Entry.ID.Collection.String(), len(pks.entries))
        monitorEntry(event.Entry.ID, false)

    default:
        log.Scope.Errorf("Unknown event kind: %v", event.Kind)
        return
    }
    // 更新version
    s.versionCounter++
    pks.version = s.versionCounter

    log.Scope.Debugf("In-memory State has changed:\n%v\n", s)
    s.pendingEvents++
    // 通知listener对该collection以已经发生变化
    s.listener.CollectionChanged(event.Entry.ID.Collection)
}

func (s *State) getResourceTypeState(name resource.Collection) (*resourceTypeState, bool) {
    s.entriesLock.Lock()
    defer s.entriesLock.Unlock()
    // 根据collection找到当前内存中存在的对象 
    // 比如collection是virtualservice 那就是得到内存中所有virtualservice的对象
    pks, found := s.entries[name]
    return pks, found
}

Handler的主要工作是将当前事件的类型转化成mcp.Resource类型并将其保存到内存中. 那保留在内存中干什么呢? 在s.listener.CollectionChanged(event.Entry.ID.Collection)中会进行处理, 在下面processor中会明白.

6. processor

func NewProcessor(src Source, distributor Distributor, cfg *Config) *Processor {
    stateStrategy := publish.NewStrategyWithDefaults()
    return newProcessor(src, cfg, stateStrategy, distributor, nil)
}
func newProcessor(
    src Source,
    cfg *Config,
    stateStrategy *publish.Strategy,
    distributor Distributor,
    postProcessHook postProcessHookFn) *Processor {
    now := time.Now()
    p := &Processor{
        stateStrategy:   stateStrategy,
        distributor:     distributor,
        source:          src,
        eventCh:         make(chan resource.Event, 1024),
        postProcessHook: postProcessHook,
        worker:          util.NewWorker("runtime processor", scope),
        lastEventTime:   now,
        fullSyncCond:    sync.NewCond(&sync.Mutex{}),
    }
    stateListener := processing.ListenerFromFn(func(c resource.Collection) {
        if p.distribute {
            stateStrategy.OnChange()
        }
    })
    p.state = newState(cfg, stateListener)
    // 这个暂时可以先不用看 以后分析serviceentry的时候需要
    p.serviceEntryHandler = serviceentry.NewHandler(cfg.DomainSuffix, processing.ListenerFromFn(func(_ resource.Collection) {
        scope.Debug("Processor.process: publish serviceEntry")
        s := p.serviceEntryHandler.BuildSnapshot()
        p.distributor.SetSnapshot(groups.SyntheticServiceEntry, s)
    }))
    p.handler = buildDispatcher(p.state, p.serviceEntryHandler)
    p.seedMesh()
    return p
}

1. 初始化p.state, 并且传入了listener.
2. 初始化p.handler, 传入p.state, p.serviceEntryHandler.

func buildDispatcher(state *State, serviceEntryHandler processing.Handler) *processing.Dispatcher {
    b := processing.NewDispatcherBuilder()
    // 所有注册的crds
    stateSchema := resource.NewSchemaBuilder().RegisterSchema(state.config.Schema).Build()
    for _, spec := range stateSchema.All() {
        b.Add(spec.Collection, state)
    }
    if state.config.SynthesizeServiceEntries {
        for _, spec := range serviceentry.Schema.All() {
            b.Add(spec.Collection, serviceEntryHandler)
        }
    }
    return b.Build()
}

可以看到每个collection都有一个基本的handler, 就是传进来的p.state.

6.1 Start方法

func (p *Processor) Start() error {
    // 启动方法
    setupFn := func() error {
        err := p.source.Start(func(e resource.Event) {
            // 将事件e传给管道p.eventCh
            p.eventCh <- e
        })
        if err != nil {
            return fmt.Errorf("runtime unable to Start source: %v", err)
        }
        return nil
    }
    // 运行方法
    runFn := func(ctx context.Context) {
        scope.Info("Starting processor...")
        defer func() {
            scope.Debugf("Process.process: Exiting worker thread")
            p.source.Stop()
            close(p.eventCh)
            p.stateStrategy.Close()
        }()

        scope.Debug("Starting process loop")

        for {
            select {
            case <-ctx.Done():
                // Graceful termination.
                scope.Debug("Processor.process: done")
                return
            case e := <-p.eventCh:
                // 从管道p.eventCh中取出要处理的事件
                p.processEvent(e)
            case <-p.stateStrategy.Publish:
                scope.Debug("Processor.process: publish")
                // 将当前state对象内存中保存的对象建立一个快照
                s := p.state.buildSnapshot()
                // 该快照将交由distributor处理
                p.distributor.SetSnapshot(groups.Default, s)
            }

            if p.postProcessHook != nil {
                p.postProcessHook()
            }
        }
    }
    // 通过工具类来运行这两个方法
    return p.worker.Start(setupFn, runFn)
}

再看processEvent方法

func (p *Processor) processEvent(e resource.Event) {
    if scope.DebugEnabled() {
        scope.Debugf("Incoming source event: %v", e)
    }
    p.recordEvent()

    if e.Kind == resource.FullSync {
        scope.Infof("Synchronization is complete, starting distribution.")

        p.fullSyncCond.L.Lock()
        // 把distribute设置为true
        p.distribute = true
        p.fullSyncCond.Broadcast()
        p.fullSyncCond.L.Unlock()
        // 这个将会触发runFn中的<-p.stateStrategy.Publish
        p.stateStrategy.OnChange()
        return
    }
    // 将该event交由dispatcher处理
    // 现在可以理解为就是p.state来处理, 原因p.handler就是一个dispatcher
    // dispatcher里面每一个collection都注册了一个p.state这样的handler
    p.handler.Handle(e)
}

1. 如果是FullSync, 也就是第一次做同步, 有两个动作:

1.1 将p.distribute设置为true. 现在回头来看一下newProcessor方法.

// processor.go
...
stateListener := processing.ListenerFromFn(func(c resource.Collection) {
        // When the state indicates a change occurred, update the publishing strategy
        if p.distribute {
            stateStrategy.OnChange()
        }
    })
...
// state.go中的Handler方法
func (s *State) Handle(event resource.Event) {
...
    // 通知listener对该collection以已经发生变化
    s.listener.CollectionChanged(event.Entry.ID.Collection)
...
}

所以当p.distribute=true时将调用stateStrategy.OnChange()这个时候就会触发到Processor的Start()方法中的<-p.stateStrategy.Publish:进而调用p.state.buildSnapshot()生成当前内存快照交由p.distributor处理. 这部分在分析mcp的时候会涉及到.

图片.png

1.2 通过p.stateStrategy.OnChange()触发<-p.stateStrategy.Publish.

  1. 调用p.handler.Handle(e)方法, 目前可以理解为调用p.state.Handle(e), 因为p.handlerp.dispatcher并且为每个collection注册了p.statehandler方法.

6.2 buildSnapshot

按照state.entries中的内容创建一个内存快照.

// 返回snapshot.Snapshot
func (s *State) buildSnapshot() snapshot.Snapshot {
    s.entriesLock.Lock()
    defer s.entriesLock.Unlock()

    now := time.Now()
    monitoring.RecordProcessorSnapshotPublished(s.pendingEvents, now.Sub(s.lastSnapshotTime))
    s.lastSnapshotTime = now
    // 创建快照
    b := snapshot.NewInMemoryBuilder()

    for collection, state := range s.entries {
        entries := make([]*mcp.Resource, 0, len(state.entries))
        for _, entry := range state.entries {
            entries = append(entries, entry)
        }
        version := fmt.Sprintf("%d", state.version)
        b.Set(collection.String(), version, entries)
    }

    // Build entities that are derived from existing ones.
    s.buildProjections(b)
    // 将pendingEvents清空
    sn := b.Build()
    s.pendingEvents = 0
    return sn
}

func (s *State) buildProjections(b *snapshot.InMemoryBuilder) {
    s.buildIngressProjectionResources(b)
}

func (s *State) buildIngressProjectionResources(b *snapshot.InMemoryBuilder) {
    ingressByHost := make(map[string]resource.Entry)
    // Build ingress projections
    state := s.entries[metadata.K8sExtensionsV1beta1Ingresses.Collection]
    if state == nil || len(state.entries) == 0 {
        return
    }
    ...
}

7. 总结

上流: sourcek8s或者fs中读取信息并整理成event.
处理: 将source中的事件event放入p.eventch, 并且processEventp.eventCh中读取, 将信息保存在内存中, 然后生成快照.
下流: 将生成的快照交由p.distributor处理.

图片.png

8. 参考

1. istio 1.3.6源码
2. https://cloud.tencent.com/developer/article/1409159

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,204评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,091评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,548评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,657评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,689评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,554评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,302评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,216评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,661评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,851评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,977评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,697评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,306评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,898评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,019评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,138评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,927评论 2 355

推荐阅读更多精彩内容