[翻译]GO并发模型二:Pipeline和Cancellation

image.png

简书不维护了,欢迎关注我的知乎:波罗学的个人主页

紧接上文:[翻译]GO并发模型一:Pipeline和Cancellation

明确地取消

当主函数在没有从输出channel中接收完所有值便退出时,它必须告诉上游停止数据发送。可以通过向done channel发送停止信号实现。此处有两个可能阻塞的goroutine,所以需发两个值。

func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutine that both read from in.
    c1 := sq(in)
    c2 := sq(in)

    // Consume the first value from output
    done := make(chan struct{}, 2)
    out := merge(done, c1, c2)
    fmt.Println(<-out)

    // Tell the remaining senders we're leaving
    done <- struct{}{}
    done <- struct{}{}
}

merge中的发送goroutine用select语句取代了原来的发送操作,它将负责将数据发出和接收done channel的消息。Done将接收的值是空结构体,因为该值没有任何意义:它仅仅是用来表明应该停止向输出channel发送数据了。该goroutine将会不停循环地从输入channel中接收数据,以确保上游不被阻塞。(待会我们将会讨论怎么提早从循环退出)。

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)
    
    // Start an output goroutine for each input channel in cs. output
    // copies values from c to out until c is closed or it receives a value
    // from done, then output calls wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            select {
                    case out <- n:
                    case <-done:
            }
            }
        wg.Done()
    }
    // ... the rest is unchanged ...
}

这种方式的问题是:每个下游都需知道上游将发送的数据量,以便向其发送消息实现提早退出。但毫无疑问,时刻监控已发送数量是非常荒诞,也是非常容易出错的。

我们需要一种在上游goroutine数量未知或无限大的情况下使其停止的方式。在GO中,我们可以通过关闭channel来实现,因为在已关闭的channel上接收数据会被立刻处理并返回一个零值。

这意味着main函数中可仅仅通过关闭done channel来使发送方解除阻塞。该关闭操作会产生一个有效的广播信号并传递给发送方。我们可以扩展pipeline中的函数,使其可以多接受一个done参数,然后通过defer语句对执行关闭以便于在main退出时发送给各阶段完成信号来实现退出。

func main() {
    // Set up a done channel that's shared by the whole pipeline,
    // and close that channel when this pipeline exits, as a signal
    // for all the goroutines we started to exit.
    done := make(chan struct{})
    defer close(done)

    in := gen(done, 2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(done, in)
    c2 := sq(done, in)

    // Consume the first value from output.
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // done will be closed by the deferred call.
}

一旦done channel关闭,各个阶段就可以成功返回退出。当done被关闭,merge就会知道上游会停止发送数据,merge函数就会停止从输入channel接收数据并返回。输出channel通过defer语句确保所有的wg.Done在函数时能被调用。

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c or done is closed, then calls
    // wg.Done.
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }
    // ... the rest is unchanged ...
}

相似地,只要done channel一关闭,sq函数也会立刻返回。通过defer语句,sql函数确保它们输出channel一定能被顺利关闭。

func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

pipeline几个指导原则:

  • 各个阶段在所有的发送操作完成便会关闭输出channels;
  • 各个阶段会不停的接收数据,直到这些channel都被关闭或者发送方不再阻塞;

Pipelines中可以通过为数据发送提供足够的buffer大小或在接收方确定放弃继续接收数据时发送完成信号来解除发送方的阻塞。

对目录中的文件执行摘要

让我们来看一个更实际的例子.

MD5是一种消息摘要算法,在checksum校验文件方面非常有用。通过命令行工具md5sum,我们打印了一系列文件的摘要值。

% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

我们的例子是一个类似于md5sum的程序,它接受单一目录作为参数,并打印该目录下每个文件的摘要值。文件是按文件名升序排列打印。

% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

程序的main主函数调用了一个名为MD5All的函数,它返回的是一个map,key为路径名,value为摘要值。最后,对结果进行了排序和打印。

func main() {
    // Calculate the MD5 sum of all files under the specified directory,
    // then print the results sorted by path name.
    m, err := MD5All(os.Args[1])
    if err != nil {
        fmt.Println(err)
        return
    }
    var paths []string
    for path := range m {
        paths = append(paths, path)
    }
    sort.Strings(paths)
    for _, path := range paths {
        fmt.Printf("%x  %s\n", m[path], path)
    }
}

MD5All函数我们将重点讨论。在serial.go文件中,并没有使用并发技术,仅仅是依次读取每个文件内容并对其调用md5.Sum。

// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents.  If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(root string) (map[string][md5.Size]byte, error) {
    m := make(map[string][md5.Size]byte)
    err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }
        if !info.Mode().IsRegular() {
            return nil
        }
        data, err := ioutil.ReadFile(path)
        if err != nil {
            return err
        }
        m[path] = md5.Sum(data)
        return nil
    })
    if err != nil {
        return nil, err
    }
    return m, nil
}

并行执行摘要

在parellel.go文件中,我们把MD5ALL拆成了两阶段。第一阶段,在sumFiles函数中,它遍历目录并在各个goroutine中执行文件摘要,最后将结果发送给channel。

type result struct {
    path string
    sum  [md5.Size]byte
    err  error
}

sumFiles函数返回了两个channels:一个用于传输结果,另一个用于返回filepath.Walk的错误。walk函数为每个文件启动了一个新的goroutine来处理它们,同时也检查是否done 。如果done被关闭,walk函数将立刻返回。

func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
    // For each regular file, start a goroutine that sums the file and sends
    // the result on c.  Send the result of the walk on errc.
    c := make(chan result)
    errc := make(chan error, 1)
    go func() {
        var wg sync.WaitGroup
        err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            wg.Add(1)
            go func() {
                data, err := ioutil.ReadFile(path)
                select {
                case c <- result{path, md5.Sum(data), err}:
                case <-done:
                }
                wg.Done()
            }()
            // Abort the walk if done is closed.
            select {
            case <-done:
                return errors.New("walk canceled")
            default:
                return nil
            }
        })
        // Walk has returned, so all calls to wg.Add are done.  Start a
        // goroutine to close c once all the sends are done.
        go func() {
            wg.Wait()
            close(c)
        }()
        // No select needed here, since errc is buffered.
        errc <- err
    }()
    return c, errc
}

MD5All函数从c(channel)中接收摘要值。但发现错误,它会提早返回,并通过defer语句关闭done。

func MD5All(root string) (map[string][md5.Size]byte, error) {
    // MD5All closes the done channel when it returns; it may do so before
    // receiving all the values from c and errc.
    done := make(chan struct{})
    defer close(done)

    c, errc := sumFiles(done, root)

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

对并行增加限制

parallel.go中的MD5All通过为每个文件系统一个新的goroutine实现。如果一个目录中有太多的大文件,这可能会导致分配的内存超过机器的可用内存。

我们可以通过限制并行读取文件的数量来限制内容分配。在bounded.go文件中,我们创建了固定数量的goroutines来读取文件。现在我们的pipeline涉及了三个阶段:遍历目录树、读取文件并执行摘要和收集摘要结果。

第一阶段,walkFiles,负责发送目录树中文件路径:

func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
    paths := make(chan string)
    errc := make(chan error, 1)
    go func() {
        // Close the paths channel after Walk returns.
        defer close(paths)
        // No select needed for this send, since errc is buffered.
        errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            select {
            case paths <- path:
            case <-done:
                return errors.New("walk canceled")
            }
            return nil
        })
    }()
    return paths, errc
}

第二阶段,我们为digester函数启动了固定数量的goroutine,它将从paths中接收文件名处理并发送摘要结果给channel c:

func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
    for path := range paths {
        data, err := ioutil.ReadFile(path)
        select {
        case c <- result{path, md5.Sum(data), err}:
        case <-done:
            return
        }
    }
}

不同于之前的例子,digester不会关闭输出channel,因为太多的goroutine共用了一个channel。取而代之,在所有digester执行完毕,MD5All会着手关闭channel。

    // Start a fixed number of goroutines to read and digest files.
    c := make(chan result)
    var wg sync.WaitGroup
    const numDigesters = 20
    wg.Add(numDigesters)
    for i := 0; i < numDigesters; i++ {
        go func() {
            digester(done, paths, c)
            wg.Done()
        }()
    }
    go func() {
        wg.Wait()
        close(c)
    }()

我们可以为每个digester创建独立的channel,然后返回。但是这样我们就需要增加额外的goroutines来对结果进行合并。

最后阶段,我们从channel c中接收所有的结果并检查errc是否返回了错误。该检查无法过早执行,因为过早检查,可能会导致walkFile阻塞。

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    // Check whether the Walk failed.
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

总结

这篇文章给我们展示了如何在GO中构建流式数据pipeline。pipeline中处理失败是需要一定的技巧的,因为每个尝试给下游发送数据的阶段都可能被阻塞,因为下游可能不在接收上游的输入数据。我们展示了如何通过关闭channel来给所有的goroutine发送 "done" 信号和定义了正确构建pipeline的指导原则。

作者:Sameer Ajmani

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345