Help us understand the problem. What is going on with this article?

go-ethereumを読む(5) 補足 FeedとSubscription

More than 1 year has passed since last update.

gethのソースコードを読んでいるとgoroutineとchannelが頻出します。
特にFeedという使った1対多の通知のための独自structはselectをリフレクションしているため、非常に追いにくくなっています
v1.8.15ベースに解説します

Feed

SubscribeでFeedを購読するようにして、Sendで購読しているchannelに一斉に配信します

// event/feed.go

package event

import (
    "errors"
    "reflect"
    "sync"
)

var errBadChannel = errors.New("event: Subscribe argument does not have sendable channel type")

// フィードは、イベントのキャリアがチャネルである場合、1対多のサブスクリプションを実装する
// フィードに送信された値は、すべてのサブスクライブされたチャネルに同時に配信される
//
// フィードは、1つのタイプでのみ使用できます。 タイプは最初の送信またはサブスクリプション操作で決まる
// もし後続の呼び出しで型が異なる型をでメソッドを呼び出した場合、panicが発生する
//
// The zero value is ready to use.
type Feed struct {
    once      sync.Once        // ensures that init only runs once
    sendLock  chan struct{}    // sendLock has a one-element buffer and is empty when held.It protects sendCases.
    removeSub chan interface{} // interrupts Send
    sendCases caseList         // the active set of select cases used by Send

    // The inbox holds newly subscribed channels until they are added to sendCases.
    mu     sync.Mutex
    inbox  caseList
    etype  reflect.Type
    closed bool
}

// sendCasesの最初の実際のサブスクリプション・チャネルのインデックス
// sendCases[0]は、removeSubチャネルのSelectRecvの為のもの
const firstSubSendCase = 1
// 省略
func (f *Feed) init() {
    f.removeSub = make(chan interface{})
    f.sendLock = make(chan struct{}, 1)
    f.sendLock <- struct{}{}
    f.sendCases = caseList{{Chan: reflect.ValueOf(f.removeSub), Dir: reflect.SelectRecv}}
}

// Subscribeはチャンネルをフィードに追加する。将来送信されるものはは、サブスクリプションがキャンセルされるまでチャンネルで配信される
// 追加されるすべてのチャンネルのエレメントタイプは同じでなければならない
// チャネルには、他のsubscriberをブロックするのを防ぐために十分なバッファスペースが必要
// 遅いのsubscriberは削除されない
func (f *Feed) Subscribe(channel interface{}) Subscription {
    f.once.Do(f.init)

    chanval := reflect.ValueOf(channel)
    chantyp := chanval.Type()
    if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 {
        panic(errBadChannel)
    }
    sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)}

    f.mu.Lock()
    defer f.mu.Unlock()
    if !f.typecheck(chantyp.Elem()) {
        panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)})
    }
    // select caseをリフレクションにしてinboxに持たせる
    cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval}
    f.inbox = append(f.inbox, cas)
    return sub
}

// 省略

func (f *Feed) remove(sub *feedSub) {
    // まだf.sendCasesに追加されていないチャンネルをカバーするため、inboxから最初に削除する。
    ch := sub.channel.Interface()
    f.mu.Lock()
    index := f.inbox.find(ch)
    if index != -1 {
        f.inbox = f.inbox.delete(index)
        f.mu.Unlock()
        return
    }
    f.mu.Unlock()

    select {
    case f.removeSub <- ch:
        // Send will remove the channel from f.sendCases.
    case <-f.sendLock:
        // No Send is in progress, delete the channel now that we have the send lock.
        f.sendCases = f.sendCases.delete(f.sendCases.find(ch))
        f.sendLock <- struct{}{}
    }
}

// Sendは、すべての加入チャンネルに同時に配信する。 値が送信されたサブスクライバの数を返す。
func (f *Feed) Send(value interface{}) (nsent int) {
    rvalue := reflect.ValueOf(value)

    f.once.Do(f.init)
    <-f.sendLock

    // Add new cases from the inbox after taking the send lock.
    f.mu.Lock()
    f.sendCases = append(f.sendCases, f.inbox...)
    f.inbox = nil

    if !f.typecheck(rvalue.Type()) {
        f.sendLock <- struct{}{}
        panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype})
    }
    f.mu.Unlock()

    // channelに送った値を設定する
    for i := firstSubSendCase; i < len(f.sendCases); i++ {
        f.sendCases[i].Send = rvalue
    }

    // removeSubを除くすべてのチャンネルが選択されるまで送信する。 'cases'はsendCasesのprefixを記録する。 
    // 送信が成功すると、対応するcaseが'cases'最後に移動し、1つの要素だけ縮小する
    cases := f.sendCases
    for {
        // Fast path: selectのセットに追加する前にブロックしないで送信しようとする
        // 通常、subscriberが十分に速く、空きバッファスペースを持っている場合に成功する
        for i := firstSubSendCase; i < len(cases); i++ {
            if cases[i].Chan.TrySend(rvalue) {
                nsent++
                cases = cases.deactivate(i)
                i--
            }
        }
        if len(cases) == firstSubSendCase {
            break
        }
        // Select on all the receivers, waiting for them to unblock.
        chosen, recv, _ := reflect.Select(cases)
        if chosen == 0 /* <-f.removeSub */ {
            index := f.sendCases.find(recv.Interface())
            f.sendCases = f.sendCases.delete(index)
            if index >= 0 && index < len(cases) {
                // Shrink 'cases' too because the removed case was still active.
                cases = f.sendCases[:len(cases)-1]
            }
        } else {
            cases = cases.deactivate(chosen)
            nsent++
        }
    }

    // Forget about the sent value and hand off the send lock.
    for i := firstSubSendCase; i < len(f.sendCases); i++ {
        f.sendCases[i].Send = reflect.Value{}
    }
    f.sendLock <- struct{}{}
    return nsent
}

type feedSub struct {
    feed    *Feed
    channel reflect.Value
    errOnce sync.Once
    err     chan error
}

func (sub *feedSub) Unsubscribe() {
    sub.errOnce.Do(func() {
        sub.feed.remove(sub)
        close(sub.err)
    })
}

// 省略

type caseList []reflect.SelectCase

// find returns the index of a case containing the given channel.
func (cs caseList) find(channel interface{}) int {
    for i, cas := range cs {
        if cas.Chan.Interface() == channel {
            return i
        }
    }
    return -1
}

// delete removes the given case from cs.
func (cs caseList) delete(index int) caseList {
    return append(cs[:index], cs[index+1:]...)
}

// deactivate moves the case at index into the non-accessible portion of the cs slice.
func (cs caseList) deactivate(index int) caseList {
    last := len(cs) - 1
    cs[index], cs[last] = cs[last], cs[index]
    return cs[:last]
}

Subscription

Err() <-chan errorUnsubscribe()を実装してればSubscriptionになるので、Feedは自前のSubscriptionを作成しており、NewSubscriptionはあんまり使っていない...

SubscriptionScope

FeedはSubscriptionScopeと一緒に使われてる箇所が多いです
SubscriptionScopeは複数の購読を一度に購読解除する機能を提供します

// event/subscription.go
package event

import (
    "context"
    "sync"
    "time"

    "github.com/ethereum/go-ethereum/common/mclock"
)


// Subscriptionは、イベントの流れを表す。 イベントの保持者は、通常、channelになる
// サブスクリプションが確立されている間は失敗する可能性がある。エラーは、エラーchannelを通じて報告される
// サブスクリプションに問題がある場合(イベントを配信するネットワーク接続が閉じられているなど)、値を受け取りる。1度だけ値が送信される
// サブスクリプションが正常に終了すると(つまり、イベントのソースが閉じられたとき)、エラーチャネルは閉じられる。 また、Unsubscribeが呼び出されると閉じる。
// Unsubscribeメソッドは、イベントの送信をキャンセルする。 サブスクリプションに関連するリソースが確実にリリースされるように、すべてのケースでサブスクリプション解除を呼び出す必要がある。何度呼んでも問題はない
type Subscription interface {
    Err() <-chan error // returns the error channel
    Unsubscribe()      // cancels sending of events, closing the error channel
}

// 省略

// SubscriptionScopeは複数の購読を一度に購読解除する機能を提供する
//
// The zero value is ready to use.
type SubscriptionScope struct {
    mu     sync.Mutex
    subs   map[*scopeSub]struct{}
    closed bool
}

type scopeSub struct {
    sc *SubscriptionScope
    s  Subscription
}

// Trackはサブスクリプションの追跡を開始します。 スコープが閉じている場合、Trackはnilを返す
// 返されるサブスクリプションはラッパーで、ラッパーが購読を解除すると、スコープから削除する
func (sc *SubscriptionScope) Track(s Subscription) Subscription {
    sc.mu.Lock()
    defer sc.mu.Unlock()
    if sc.closed {
        return nil
    }
    if sc.subs == nil {
        sc.subs = make(map[*scopeSub]struct{})
    }
    ss := &scopeSub{sc, s}
    sc.subs[ss] = struct{}{}
    return ss
}

// Closeは追跡していたサブスクリプションのUnsubscribeを呼び、トラッキングにあたらに追加されるのを防ぐ。
// Close後にTrackを呼んでもnilを返す
func (sc *SubscriptionScope) Close() {
    sc.mu.Lock()
    defer sc.mu.Unlock()
    if sc.closed {
        return
    }
    sc.closed = true
    for s := range sc.subs {
        s.s.Unsubscribe()
    }
    sc.subs = nil
}

// Count returns the number of tracked subscriptions.
// It is meant to be used for debugging.
func (sc *SubscriptionScope) Count() int {
    sc.mu.Lock()
    defer sc.mu.Unlock()
    return len(sc.subs)
}

func (s *scopeSub) Unsubscribe() {
    s.s.Unsubscribe()
    s.sc.mu.Lock()
    defer s.sc.mu.Unlock()
    delete(s.sc.subs, s)
}

func (s *scopeSub) Err() <-chan error {
    return s.s.Err()
}
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした