Go 并发编程:通道常见应用范式

通道经典应用

一、闭包实现通道访问限制

在Go的并发编程中,创建通道和开辟协程是非常方便且容易的,正因如此,有可能会导致开发者滥用。如果在团队开发中没有良好的协商和规范,更可能会导致并发数据不安全。
例如:

func Demo() {
    ch := make(chan int, 0)
    go dosomething(ch, 10)
    go dosomething(ch, 20)
    dosomething(ch, 30)
}

func dosomething(ch chan int, num int) {
    for i := 1; i < num; i++ {
        ch <- i
    }
    close(ch1)
}

以上是个非常明显的不规范使用通道的例子,你能得到的只有死锁!

fatal error: all goroutines are asleep - deadlock!

那么能不能限制通道的使用呢,即特定通道只让特定协程使用?
有几种解决方案:

  • 团队协商,在代码规范上下功夫,显然这是不安全的。
  • 使用同步功能
  • 闭包实现访问受限的通道

以下我们使用第三种方案:
闭包实现访问受限的通道,只允许特定协程使用

func Demo() {
    // 生产者:producter 内部开辟一条协程往里面发送数据,并返回一个只读通道
    producter := func() <-chan int {
        results := make(chan int, 5) // 该通道只作用于特定闭包内的作用域
        go func() {
            defer close(results)
            for i := 0; i <= 5; i++ {
                results <- i
            }
        }()
        return results
    }

    // 消费者:运行这个闭包时需要传入一个只读通道
    consumer := func(results <-chan int) { 
        for result := range results {
            fmt.Printf("Received: %d\n", result)
        }
        fmt.Println("Done receiving!")
    }
        
    consumer(producter())
}

二、for-select范式:关于多通道操作的整合

我们知道select语句是go专门为多通道操作提供的原语,单个select语句可以一次性的从多个通道选取一个来读写,只要哪个通道先不处于阻塞状态便选取哪个通道读写。而结合for循环语句构成的for-select结构可以循环不断的从多通道读写数据,直到特定条件退出。

for-select循环模式如下所示:

for { // 无限循环或遍历
    select {
    // 对通道进行操作
    }
}

常见的几种for-select循环的用法:
a. 在通道上发送迭代变量

for _, s := range []string{"a", "b", "c"} {
    select {
    case <-done:
        return
    case stringStream <- s:   // slice数据循环迭代写入channel
    }
}

b. 无限循环等待停止

// 第一种方式
for {
    select {
    case <-done: 
        return   // 停止返回
    default:
    }
    // 执行非抢占任务
}

// 第二种方式
for {
    select {
    case <-done:
        return 
    default:    
      // 将要执行的任务放入default分支中
      // 执行非抢占任务
    }
}

通过善用通道,我们可以在许多并发过程中尽量避免使用同步锁,select原语可以集中处理多个通道,大大提高了开发和运行效率。

三、or-channel :递归多个通道的或读取,只要有一个通道返回即完成


/*
递归多个通道的或读取,只要有一个通道返回即完成
*/
func Demo() {
    var or func(channels ...<-chan interface{}) <-chan interface{}
    // 建立了名为or的递归函数,接收数量可变的通道并返回单个通道。
    or = func(channels ...<-chan interface{}) <-chan interface{} 
    // 两个递归终止条件     
    switch len(channels) {
        case 0:  // 如果传入的切片是空的,我们简单的返回一个nil通道
            return nil
        case 1:  // 如果切片只含有一个元素,我们就返回给元素
            return channels[0]
        }
      
        orChannel := make(chan interface{})
        // 建立一个goroutine,以便可以不受阻塞地等待我们通道上的消息
        go func() { 
            defer close(orDone)

            switch len(channels) {
            case 2: // 由于我们这里是递归的,每次递归调用将至少有两个通道。作为保持goroutine数量受到限制的优化方法,们在这里为仅使用两个通道的时设置了一个特殊情况。
                select {
                case <-channels[0]:
                case <-channels[1]:
                }
            default: // 递归地在第三个索引之后,从切片中的所有通道中创建一个or通道,然后从中选择。递归操作会逐层累计直到取到第一个通道元素。我们在其中传递了orChannel通道,这样当该树状结构顶层的goroutines退出时,结构底层的goroutines也会退出。
                select {
                case <-channels[0]:
                case <-channels[1]:
                case <-channels[2]:
                case <-or(append(channels[3:], orChannel)...): 
                }
            }
        }()
        return orDone
    }



    // 下面这个例子将经过一段时间后关闭通道,然后使用or函数将这些通道合并到一个关闭的通道中:
    sig := func(after time.Duration) <-chan interface{} { // 创建了一个通道,当后续时间中指定的时间结束时将关闭该通道
        c := make(chan interface{})
        go func() {
            defer close(c)
            time.Sleep(after)
        }()
        return c
    }

    start := time.Now() // 设置追踪自or函数的通道开始阻塞的起始时间
    <-or(sig(2*time.Hour), sig(5*time.Minute), sig(1*time.Second), sig(1*time.Hour), sig(1*time.Minute))
    fmt.Printf("done after %v", time.Since(start)) // 打印阻塞发生的时间
}

这是一种奇妙的做法,你可以将任意数量的通道组合到单个通道中,只要任何作为组件的通道关闭或被写入,整个通道就会关闭。

四、chRange 封装安全的通道遍历读取

有时你会与来自系统不同部分的通道交互。与管道不同的是,当你使用的代码通过done通道取消操作时,你无法对通道的行为方式做出判断。也就是说,你不知道正在执行读取操作的goroutine现在是什么状态。出于这个原因,正如我们在“防止Goroutine泄漏”中所阐述的那样,需要用select语句来封装我们的读取操作和done通道。可以简单的写成这样:

for val := range myChan {
    // 对 val 进行处理
}

展开后可以写成这样:

loop:
    for {
        select {
        case <-done:
            break loop
        case maybeVal, ok := <-myChan:
            if ok == false {
                return // or maybe break from for
            }
            // Do something with val
        }
    }

这样做可以快速退出嵌套循环。继续使用goroutines编写更清晰的并发代码,而不是过早优化的主题,我们可以用一个goroutine来解决这个问题。 我们封装了细节,以便其他人调用更方便:

/*
封装一个通用的安全的通道读取器,以便于可安全地for range遍历任意通道
*/
var chRange = func(done, ch <-chan interface{}) <-chan interface{} {
    valStream := make(chan interface{})
    // 使用协程闭包封装安全的读取通道 
    go func() {
        defer close(valStream)
        for {
            select {
            case <-done:
                return
            case v, ok := <-ch:
                if ok == false {
                    return
                }
                select {
                case valStream <- v:
                case <-done:
                }
            }
        }
    }()

    return valStream
}

调用示例:这样对任意通道我们都可以简单安全的读取

func Demo() {
    done := make(chan interface{})
    defer close(done)

    ch := make(chan interface{})
    go func() {
        defer close(ch)
        for i := 0; i < 10; i++ {
            ch <- i
        }
    }()

    for val := range chRange(done, ch) {
        fmt.Printf("read %v \n", val)
    }
}

五、tee-channel 分割通道数据流

tee-channel类似Linux的tee命令,分割来自通道的值,以便将它们发送到两个独立区域。想象一下:你可能想要在一个通道上接收一系列操作指令,将它们发送给执行者,同时记录操作日志。

var tee = func(done <-chan interface{}, in <-chan interface{}) (_, _ <-chan interface{}) {

    out1 := make(chan interface{})
    out2 := make(chan interface{})

    go func() {
        defer close(out1)
        defer close(out2)
        for val := range chRange(done, in) {
            select {
            case <-done:
            default:
                out1 <- val
                out2 <- val
            }

        }
    }()
    return out1, out2
}

注意写入out1和out2是紧密耦合的。 直到out1和out2都被写入,迭代才能继续。 通常这不是问题,因为无论如何,处理来自每个通道的读取流程的吞吐量应该是tee之外的关注点,但值得注意。 这是一个快速调用示例:

func Demo()  {
    done := make(chan interface{})
    defer close(done)

    ch := make(chan interface{})
    go func() {
        for i := 0; i < 10; i++ {
            ch <- i
        }
        defer close(ch)
    }()

    out1, out2 := tee(done, ch)

    for val1 := range out1 {
        fmt.Printf("out1: %v, out2: %v\n", val1, <-out2)
    }

利用这种模式,很容易使用通道作为系统数据的连接点。

六、bridge-channel

在某些情况下,你可能会发现自己想要使用一系列通道,即你可能需要从一个通道中获取多个通道的值:

<-chan <-chan interface{}

这与将某个通道的数据切片合并到一个通道中稍有不同,这种调用方式意味着一系列通道有序的写入操作。从通道读取一系列通道的值 ,类似多通道过独木桥。

// 通道桥接
var bridge = func(done <-chan interface{}, chanStream <-chan <-chan interface{}) <-chan interface{} {

    valStream := make(chan interface{}) // 1
    go func() {
        defer close(valStream)
        for { // 2
            var stream <-chan interface{}
            select {
            case maybeStream, ok := <-chanStream:
                if ok == false {
                    return
                }
                stream = maybeStream
            case <-done:
                return
            }
            for val := range chDone(done, stream) { // 3
                select {
                case valStream <- val:
                case <-done:
                }
            }
        }
    }()
    return valStream
}

使用示例:

func Demo() {
    genVals := func() <-chan <-chan interface{} {

        chanStream := make(chan (<-chan interface{}))

        go func() {
            defer close(chanStream)
            for i := 0; i < 10; i++ {
                stream := make(chan interface{}, 1)
                stream <- i
                close(stream)
                chanStream <- stream
            }
        }()
        return chanStream
    }

    for v := range bridge(nil, genVals()) {
        fmt.Printf("%v ", v)
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,826评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,968评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,234评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,562评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,611评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,482评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,271评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,166评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,608评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,814评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,926评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,644评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,249评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,866评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,991评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,063评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,871评论 2 354

推荐阅读更多精彩内容