golang并发三板斧系列之三:context用于退出

这是本系列文章的第三篇,第一篇在此golang并发三板斧系列之一:channel用于通信和同步,第二篇在此golang并发三板斧系列之二:goroutine池用于并发

前文描述了手工作坊的时代和工业时代,现在我们进入信息时代。

万恶的资本主义

前文描述的工业时代其实是资本主义,来到世间的每个毛孔都在滴血。不信你看前文的代码,gen之类的函数创建了一堆任务之后就扔给下游的works处理了,也不管他们要处理多久,是不是加班到深夜,是不是996ICU。

func BenchmarkCPUPool(b *testing.B) {
    channum := 100
    gonum := runtime.NumCPU()
    for i := 0; i < b.N; i++ {
        f := func(w *Work) {
            if v, ok := w.input.(float64); ok {
                cpubound(v)
            }
        }
        list := benchmarkList()
        c := genPoolChanBuffer(list)

        p := InitPool(channum, gonum, f)
        p.RunWorker()
        p.FeedWorker(c)
        p.Wait()
    }
}

人民当家作主

感谢毛主席,解放之后我们进入了社会主义,我们有了工会这个关爱人民的组织。人民的工会爱人民,人民的工会力量大,工会可以在合适的时候给大家放假,让大家休息,对应到程序中就是按下了神圣的ctrl+c

如下是最粗暴的模型,工会一喊放假,大家都放下手上的工作开心的玩耍了。但是有的工作做了一半就放弃了,这确实是很没有职业操守的:

var WorkPool chan work

type work struct {
    sth string
}

func (w work) Working() {
    log.Printf("start %s", w.sth)

    time.Sleep(2 * time.Second)

    log.Printf("end %s", w.sth)
}

func RunWorkerSimple() {
    workers := 2
    for i := 0; i < workers; i++ {
        go HandleWorkerSimple()
    }
}

func HandleWorkerSimple() {
    for {
        select {
        case work := <-WorkPool:
            work.Working()
        }
    }

    return
}

func TestWorkerSimple(t *testing.T) {
    WorkPool = make(chan work)
    go RunWorkerSimple()

    go func() {
        list := benchmarkList()
        for _, l := range list {
            WorkPool <- work{fmt.Sprint(l)}
        }
    }()

    select {}
}

很明显从结果可以看出,有工作编号为2和3的没有完成就被扔掉了,其余没有启动的工作都放弃了。

C02S259EFVH3:go_concurrency baixiao$ go test -run TestWorkerSimple
2019/04/08 22:56:11 start 1
2019/04/08 22:56:11 start 0
2019/04/08 22:56:13 end 0
2019/04/08 22:56:13 start 2
2019/04/08 22:56:13 end 1
2019/04/08 22:56:13 start 3
^Csignal: interrupt
FAIL    _/Users/baixiao/Go/src/github.com/baixiaoustc/go_concurrency    2.790s

要对得起这份工作

人民的工人为人民,所以即便工会保障了工人的权益,该做好的工作还是要认真做完啊。对应到程序中,收到ctrl+c中断后,每个worker应该完成手上正在做的工作,并且由BOSS(主goroutine)把剩余队列中的工作保存起来(比如写数据库或者文件),留待明天上班继续做。

代码写起来就复杂多了,要监控系统的中断信号,要等待所有worker处理完手上的事情,最后再把剩余的事情保存起来。需要用两个chan来通信,stopChan 用于通知workers下班啦,stoppedChan 用于所有worker处理完之后告知BOSS,再由BOSS保存剩余的工作。

func (w work) Saving() {
    log.Printf("save %s", w.sth)
}

func RunWorkerHold(stop, stopped chan struct{}) {
    var wg sync.WaitGroup
    workers := 2
    for i := 0; i < workers; i++ {
        wg.Add(1)
        go HandleWorkerHold(stop, &wg)
    }
    wg.Wait()
    stopped <- struct{}{}
}

func HandleWorkerHold(stop chan struct{}, wg *sync.WaitGroup) {
    defer wg.Done()

    for {
        select {
        case work := <-WorkPool:
            work.Working()
        case <-stop:
            log.Println("worker: caller has told us to stop")
            return
        }
    }

    return
}

func TestWorkerHold(t *testing.T) {
    WorkPool = make(chan work)
    stopChan := make(chan struct{})
    stoppedChan := make(chan struct{})
    go RunWorkerHold(stopChan, stoppedChan)

    go func() {
        list := benchmarkList()
        for _, l := range list {
            WorkPool <- work{fmt.Sprint(l)}
        }
    }()

    // listen for C-c
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt)
    <-c
    log.Println("main: received C-c - shutting down")

    // tell the goroutine to stop
    log.Println("main: telling workers to stop")
    close(stopChan)
    // and wait for them to reply back
    <-stoppedChan
    log.Println("main: workers has told us they've finished")

    for work := range WorkPool {
        work.Saving()
    }

    return
}

这里没有把打印贴完,最终的结果是所有工作都save好了的:

C02S259EFVH3:go_concurrency baixiao$ go test -run TestWorkerHold
2019/04/08 23:32:02 start 1
2019/04/08 23:32:02 start 0
2019/04/08 23:32:04 end 1
2019/04/08 23:32:04 start 2
2019/04/08 23:32:04 end 0
2019/04/08 23:32:04 start 3
^C2019/04/08 23:32:05 main: received C-c - shutting down
2019/04/08 23:32:05 main: telling workers to stop
2019/04/08 23:32:06 end 2
2019/04/08 23:32:06 worker: caller has told us to stop
2019/04/08 23:32:06 end 3
2019/04/08 23:32:06 start 4
2019/04/08 23:32:08 end 4
2019/04/08 23:32:08 start 5
2019/04/08 23:32:10 end 5
2019/04/08 23:32:10 worker: caller has told us to stop
2019/04/08 23:32:10 main: workers has told us they've finished
2019/04/08 23:32:10 save 6
2019/04/08 23:32:10 save 7
2019/04/08 23:32:10 save 8
2019/04/08 23:32:10 save 9
2019/04/08 23:32:10 save 10
2019/04/08 23:32:10 save 11
2019/04/08 23:32:10 save 12
2019/04/08 23:32:10 save 13
2019/04/08 23:32:10 save 14
2019/04/08 23:32:10 save 15
。。。

但是值得注意的是,不是close(stopChan)一执行,马上所有的worker都能结束工作了。如上其中一个worker在接着完成work4和work5之后才走了退出流程,是因为对于select来讲,如果多个chan都准备好了的话,是随机选择其中一个,所以会有概率一直接着work的。

更进一步

上面的模式还是有缺陷,如果worker下面还有徒弟怎么办(又新开了goroutine)?最后BOSS在做剩余工作的保存时也不想耽误太久怎么办?保存工作写数据库也想受控制(比如database/sql包提供了相应支持)怎么办?

终于我们的主角登场了,golang提供了context模式用于解决goroutine的高效且安全退出问题,教程在网上很多了,不用细讲,只贴一下主要函数:

// 带cancel返回值的Context,一旦cancel被调用,即取消该创建的context

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

// 带有效期cancel返回值的Context,即必须到达指定时间点调用的cancel方法才会被执行

func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)

// 带超时时间cancel返回值的Context,类似Deadline,前者是时间点,后者为时间间隔
// 相当于WithDeadline(parent, time.Now().Add(timeout)).

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

最后进化到我们的代码,注意SavingDB()方法只是一个伪代码示意:

import "database/sql"

//fake, just a example
func (w work) SavingDB(ctx context.Context) {
    log.Printf("save %s", w.sth)

    stmt := "select name from db.table"
    db := sql.DB{}
    conn, _ := db.Conn(ctx)
    rows, err := conn.QueryContext(ctx, stmt)
    if err != nil {
        if err == context.DeadlineExceeded {
            // context canceled
        }
        return
    }

    var name string
    for rows.Next() {
        if err := rows.Scan(&name); err != nil {
            if err == context.DeadlineExceeded {
                log.Println("scan canceled")
            }
        }
    }
}

func RunWorkerContext(ctx context.Context, stopped chan struct{}) {
    var wg sync.WaitGroup
    workers := 2
    for i := 0; i < workers; i++ {
        wg.Add(1)
        go HandleWorkerContext(ctx, &wg)
    }
    wg.Wait()
    stopped <- struct{}{}
}

func HandleWorkerContext(ctx context.Context, wg *sync.WaitGroup) {
    defer wg.Done()

    for {
        select {
        case work := <-WorkPool:
            work.Working()
        case <-ctx.Done():
            log.Println("worker: caller has told us to stop")
            return
        }
    }

    return
}

func TestWorkerContext(t *testing.T) {
    WorkPool = make(chan work)
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    stoppedChan := make(chan struct{})
    go RunWorkerContext(ctx, stoppedChan)

    go func() {
        list := benchmarkList()
        for _, l := range list {
            WorkPool <- work{fmt.Sprint(l)}
        }
    }()

    // listen for C-c
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt)
    <-c
    log.Println("main: received C-c - shutting down")

    // tell the goroutine to stop
    log.Println("main: telling workers to stop")
    cancel()
    // and wait for them to reply back
    <-stoppedChan
    log.Println("main: workers has told us they've finished")

    ctxTimeout, cancelTimeout := context.WithTimeout(ctx, 100*time.Microsecond)
    defer cancelTimeout()
    for {
        select {
        case work := <-WorkPool:
            work.SavingDB(ctxTimeout)
        case <-ctxTimeout.Done():
            log.Println("main: cann't wait any more")
            return
        }
    }

    return
}

通过cancel()方法通知该context.Context其下的所有goroutine进入退出流程,并可以启动带timeout的ctx开始保存工作流程,所有流程都是受控的。


这像不像是现代企业的扁平化管理,BOSS直接控制所有员工,提升所有的工作效率?


所有代码都在https://github.com/baixiaoustc/go_concurrency/blob/master/third_post_test.go中能找到。

原文载于golang并发三板斧系列之三:context用于退出

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,165评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,503评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,295评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,589评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,439评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,342评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,749评论 3 387
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,397评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,700评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,740评论 2 313
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,523评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,364评论 3 314
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,755评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,024评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,297评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,721评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,918评论 2 336

推荐阅读更多精彩内容