怎么限制go语言协程的并发数

前言

在使用协程并发处理某些任务时, 其并发数量往往因为各种因素的限制不能无限的增大. 例如网络请求、数据库查询等等。从运行效率角度考虑,在相关服务可以负载的前提下(限制最大并发数),尽可能高的并发。本文就这个问题探寻一下解决方案和实现。共两种思路,一是使用带缓冲的通道实现,二是使用锁实现。

一、使用带缓冲的通道限制并发数

1.1方案详情

先上代码如下, 逻辑很简单.

package golimit

type GoLimit struct {
    ch chan int
}

func NewGoLimit(max int) *GoLimit {
    return &GoLimit{ch: make(chan int, max)}
}

func (g *GoLimit) Add() {
    g.ch <- 1
}

func (g *GoLimit) Done() {
    <-g.ch
}

按允许最大并发数创建一个带缓冲的通道, 创建协程之前调用Add()往通道里写一个数据, 协程完成是调用Done()方法读取一个数据. 若无法往通道里写数据时, 表示通道已经写满, 也就是目前的协程并发数为允许的最大数量. Add()方法将被阻塞, 也就无法创建新的协程. 直到有协程运行完成, 调用Done()方法读取了通道了一个数据.

以下是使用示例

package main

import (
    "golimit"
    "log"
    "time"
)

func main() {
    log.Println("开始测试...")
    g := golimit.NewGoLimit(2) //max_num(最大允许并发数)设置为2

    for i := 0; i < 10; i++ {
        //尝试增加一个协程, 若已达到最大并发数,将阻塞
        g.Add()

        go func(g *golimit.GoLimit, i int) {
            defer g.Done() //一个并发协程已经完成

            time.Sleep(time.Second * 2)
            log.Println(i, "done")
        }(g, i)
    }


    log.Println("循环结束")

    time.Sleep(time.Second * 3)//等待执行完成
    log.Println("测试结束")

}

1.2评估总结

  • 优点:此方案的实现逻辑简单明了,易理解、易维护。若能满足需求,在一般的场景下,此方案为首选。
  • 隐忧:使用通道的缓冲区的大小来表示最大可并发数,在允许并发数较大,如几千几万甚至更大的情况下,通道的性能和内存的负载是否会有问题,我不太清楚,若哪位朋友知道请告知一下。
  • 不足:运行中难以调整最大可并发数。而在某些场景下是有这种需求的,如A服务依赖的B服务有扩容或缩减,但A服务不能停止,需要调整请求B服务接口的最大可并发数。

二、使用锁实现协程并发数量限制

2.1方案详情

同样先上代码(注:此代码我已经在github上开源 https://github.com/zh-five/golimit

// 协程并发数限制库

package golimit

import (
    "sync"
)

type GoLimit struct {
    max       uint             //并发最大数量
    count     uint             //当前已有并发数
    isAddLock bool             //是否已锁定增加
    zeroChan  chan interface{} //为0时广播
    addLock   sync.Mutex       //(增加并发数的)锁
    dataLock  sync.Mutex       //(修改数据的)锁
}

func NewGoLimit(max uint) *GoLimit {
    return &GoLimit{max: max, count: 0, isAddLock: false, zeroChan: nil}
}

//并发计数加1.若 计数>=max_num, 则阻塞,直到 计数<max_num
func (g *GoLimit) Add() {
    g.addLock.Lock()
    g.dataLock.Lock()

    g.count += 1

    if g.count < g.max { //未超并发时解锁,后续可以继续增加
        g.addLock.Unlock()
    } else { //已到最大并发数, 不解锁并标记. 等数量减少后解锁
        g.isAddLock = true
    }

    g.dataLock.Unlock()
}

//并发计数减1
//若计数<max_num, 可以使原阻塞的Add()快速解除阻塞
func (g *GoLimit) Done() {
    g.dataLock.Lock()

    g.count -= 1

    //解锁
    if g.isAddLock == true && g.count < g.max {
        g.isAddLock = false
        g.addLock.Unlock()
    }

    //0广播
    if g.count == 0 && g.zeroChan != nil {
        close(g.zeroChan)
        g.zeroChan = nil
    }

    g.dataLock.Unlock()
}

//更新最大并发计数为, 若是调大, 可以使原阻塞的Add()快速解除阻塞
func (g *GoLimit) SetMax(n uint) {
    g.dataLock.Lock()

    g.max = n

    //解锁
    if g.isAddLock == true && g.count < g.max {
        g.isAddLock = false
        g.addLock.Unlock()
    }

    //加锁
    if g.isAddLock == false && g.count >= g.max {
        g.isAddLock = true
        g.addLock.Lock()
    }

    g.dataLock.Unlock()
}

//若当前并发计数为0, 则快速返回; 否则阻塞等待,直到并发计数为0
func (g *GoLimit) WaitZero() {
    g.dataLock.Lock()

    //无需等待
    if g.count == 0 {
        g.dataLock.Unlock()
        return
    }

    //无广播通道, 创建一个
    if g.zeroChan == nil {
        g.zeroChan = make(chan interface{})
    }

    //复制通道后解锁, 避免从nil读数据
    c := g.zeroChan
    g.dataLock.Unlock()

    <-c
}

//获取并发计数
func (g *GoLimit) Count() uint {
    return g.count
}

//获取最大并发计数
func (g *GoLimit) Max() uint {
    return g.max
}

总共使用了两把锁,一把是数据锁(dataLock),用来锁定数据,保证数据修改安全,加锁解锁是在修改数据前后进行的;另一把是增加能否增加协程的锁(addLock),增加协程时必须先加锁,加锁成功后修改并发数,若并发数小于最大可并发数,则解锁,否则不解锁,促使后续增加协程的加锁操作阻塞,从而限制协程的并发数。使用示例如下:

package main

import (
    "github.com/zh-five/golimit"
    "log"
    "time"
)

func main() {
    log.Println("开始测试...")
    g := golimit.NewGoLimit(2) //max_num(最大允许并发数)设置为2

    for i := 0; i < 10; i++ {
        //并发计数加1.若 计数>=max_num, 则阻塞,直到 计数<max_num
        g.Add()

        //运行过程中可以随时修改最大可并发数据
        //g.SetMax(3)

        go func(g *golimit.GoLimit, i int) {
            defer g.Done() //并发计数减1

            time.Sleep(time.Second * 2)
            log.Println(i, "done")
        }(g, i)
    }


    log.Println("循环结束")

    g.WaitZero() //阻塞, 直到所有并发都完成
    log.Println("测试结束")

}

方案2的GoLimit除了增加了SetMax()方法用于修改最大可并发数。出于好玩和偷懒增加了一个WaitZero()方法(其实外部使用sync.WaitGroup也可以快速实现此功能),用于阻塞等待所有并发协程都执行完成。大约可以用于如下场景:有一大批url需要有限制的并发采集数据,主程序里只需要简单的调用一下WaitZero()方法,就可以阻塞等等所有采集的协程完成。

2.2评估总结

  • 优点: 从实现逻辑上说,可以确定性能和消耗不会随着最大可并发数增加而线性增加。另外还有很多可扩展的想象。
  • 缺点:实现逻辑比较复杂

其它

其实我很想对比测试一下两种方案的性能,特别是最大可并发比较大时。但我一直没有找到一种好的测试方法,若哪个朋友有方法或思路,欢迎交流。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,451评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,172评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,782评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,709评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,733评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,578评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,320评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,241评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,686评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,878评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,992评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,715评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,336评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,912评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,040评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,173评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,947评论 2 355