Select
-
select可见监听Channel上的数据流动; -
select结构与switch的结构类似,但select有比较多的限制,其中最大的一条限制就是每个case语句里必须是一个IO操作!select { case <-ch1: //ch1 成功读到数据,则执行该case case ch2 <- 1: //成功向 ch2 中写入数据,则执行该case default: //默认执行项 }- 在一个
select语句中,Go会按顺序从头至尾评估每一个发送和接收的语句; - 如果其中的任意一条语句可以继续执行,即没有被阻塞,那么就从这些语句中任意选择一条来使用;
- 如果没有一条语句可以执行,即所有的通道都被阻塞,那么有两种可能:
- 如果提供了
default语句,那么就会执行,同时程序的执行会从select语句后的语句中恢复; - 如果没有
default语句,那么select语句将被阻塞,直到至少有一个通信可以进行下去。
- 如果提供了
- 一旦每次轮询都进入了
default语句,CPU就会进入忙轮询状态,所以通常不会使用default语句,而是选择阻塞,出让CPU
- 在一个
- 基本使用
-
select语句本身不带循环机制,还是需要借助for -
break语句可以跳出select语句,所以不能试图在select语句中使用break跳出for循环;
func main() { ch := make(chan int) // 存放数据的通道 quit := make(chan bool) // 判断是否退出的通道 go func() { for { select { case num := <-ch: // 监听 ch 通道中的数据流动,有则读取出来,并进入分支 fmt.Print(num, " ") case <-quit: // 监听 quit 通道中的数据流动,有则进入分支 runtime.Goexit() // 终止GO程,还可以使用 return 结束 for循环 } } }() x,y := 1,1 for i:=0; i<20; i++ { //计算斐波那契数列 ch <- x // 向 ch 通道中写数据 x,y = y, x+y } quit <- true // 向 quit 通道中写数据 }-
case语句在判断通道上是否有数据时,不仅仅是在判断,如果有数据,它会直接从通道中取出数据,所以不能在分支内获取数据,否则数据会丢失。
case <-ch: // 本地数据被丢弃 num := <-ch // 此时获取的是下一次的数据 -
- 超时机制
- 为了避免整个程序进入阻塞情况,可以利用
select来设置超时; -
time.After()设置超时
go func() { for { select { case num := <-ch: fmt.Print(num) case <-time.After(5*time.Second): fmt.Print("timeout") } }()- 如果
ch通道中有数据,则进入num := <-ch分支,那么time.After()的时间会被重置; - 如果
num := <-ch也是阻塞的,那么select则会陷入阻塞,循环暂时终止;如果阻塞时间达到定时时间5s,则进入<-time.After(5*time.Second)分支;
func main() { ch := make(chan int) quit := make(chan bool) go func() { for { select { case num := <-ch: fmt.Print(num) case <-time.After(5*time.Second): fmt.Print("timeout") quit <- true // 超时时间到,向 quit 通道中写入数据,结束程序 } } }() //ch <- 89 // 不向 ch 通道中写入数据,模拟超时 <- quit // 主Go程从 quit 通道中读取到数据时,主Go程结束,那么子Go程也会随之结束 }- 由此可见,虽然
ch通道只有读端,没有写端,但select语句并不会让当前Go程发生死锁,如果读端在select语句之外,则会发生死锁。
- 为了避免整个程序进入阻塞情况,可以利用
锁
Go语言把锁集成到了Channel中,Channel具备了锁机制。
死锁
死锁并不是一种锁,而是使用锁导致的一种现象;
- 单Go程自己死锁:
Channel应至少有2个以上的Go程中进行通信,否则死锁!func main() { ch := make(chan int) ch <- 89 num := <-ch } // 写端阻塞,读端不会执行,造成死锁异常 - Go程间的
Channel访问顺序导致死锁func main() { ch := make(chan int) num := <-ch go func() { ch <- 89 } } // 子Go程还没来得及执行,主Go程已经阻塞了,造成死锁异常 - 多Go程,多
Channel交叉死锁func main() { ch1 := make(chan int) ch2 := make(chan int) go func() { for { select { case num := <-ch1: //读取通道 ch1 中的数据,写入通道 ch2 ch2 <- num } } }() for { select { case num := <-ch2: ch1 <- num } } }
互斥锁
- 每个资源都对应于一个称为
互斥锁的标记,这个标记用来保证在任意时刻,只能有一个协程/线程访问该资源; - 互斥锁是传统并发编程对共享资源进行访问控制的主要手段,它由标准库
sync中的Mutex结构体类型表示; -
sync.Mutex类型只有两个公开的指针方法,Lock(锁定当前资源)和Unlock(解锁) - 在使用互斥锁时,对资源操作完成后,一定要解锁,否则会出现流程执行异常、死锁等问题,通常在锁定后,立即使用
defer语句解锁;var mutex sync.Mutex //创建一把互斥锁 mutex.Lock() //上锁 mutex.Unlock() //解锁
读写锁
- 互斥锁的本质是,当一个
Goroutine访问时,其他Goroutine都不能访问;这样在资源同步、避免竞争的同时,也降低了程序的并发能力,程序由原来的并行执行变成了串行执行; - 当对一个不会变化资源只做
读操作时,是不存在资源竞争的,多少Goroutine同时读取都没有问题;所以,竞争的问题在于写数据,只有在写与写之间才存在数据同步问题,读与读之间不存在互斥操作的必要; - 在
读与读之间使用互斥锁显得很浪费资源,这也就衍生出另一种锁:读写锁,一把具有读属性和写属性的锁; - 读写锁可以让多个读操作并发,即同时读取,但对于写操作却是完全互斥的,也就是说当一个
Goroutine进行写操作时,其他Goroutine既不能读,也不能写; - Go中的读写锁由结构体类型
sync.RWMutex表示,包含两组方法:- 一组是对写操作的锁定和解锁,简称
写锁定和写解锁
func (*RWMutex)Lock() func (*RWMutex)Unlock()- 另一组是对读操作的锁定和解锁,简称
读锁定和读解锁
func (*RWMutex)RLock() func (*RWMutex)RUnlock() - 一组是对写操作的锁定和解锁,简称
-
Channel本身就已经集成了锁,所以尽量不要把互斥锁、读写锁与Channel混用,否则可能造成隐形死锁! - 在多Go程通信时,为了更好地实现数据同步,不会使用
Channel,还是会选择使用效率更高的读写锁,这样可以精确控制锁定的范围。
条件变量
- 在
生产者 -> Channel -> 消费者模型中,由于Channel自带锁机制,生产者获取到CPU时间轮片时,就会去向Channel中写数据,在这个过程中,Channel会先加锁,然后生产者才能开始写数据,但如果缓冲区中的数据已经满了,本次加锁也就毫无意义,对于消费者来说依然如此,这就是条件变量存在的意义! - 原理:
生产者 -> 判断条件变量 -> 加锁 -> 向缓冲区中写入数据 -> 唤醒阻塞在条件变量上的消费者 消费者 -> 判断条件变量 -> 加锁 -> 从缓冲区中读取数据 -> 唤醒阻塞在条件变量上的生产者 - 条件变量并不保证同一时刻仅有一个协程/线程访问某个共享资源,而是在共享数据的状态发生变化时,通知阻塞在某个条件上的协程/线程;条件变量不是锁,在并发中不能达到同步的目的,所以经常与锁结合使用!
- Go标准库中的
sync.Cond类型表示条件变量;type Cond struct { noCopy noCopy L Locker notify notifyList checker copyChecker }-
L表示与条件变量搭配使用的锁; - 对应3个常用方法:
Wait、Signal、Broadcast
-
-
func (c *Cond) Wait()具备三个作用- 让当前Go程阻塞,等待条件变量满足;
- 释放已掌握的互斥锁(读写锁),相当于
cond.L.Unlock(),所以调用Wait()的Goroutine一定已经加了锁; - 第
1、2两步是一个原子操作,不可再分!当前Go程已经释放了锁,且处于阻塞状态,等待被唤醒; - 当被唤醒时,
Wait()函数返回,解除阻塞状态,并重新获取互斥锁(读写锁),相当于cond.L.Lock(),然后从当前位置继续向下执行。
-
func (c *Cond) Signal():单发通知,唤醒一个在条件变量上的等待的Goroutine; -
func (c *Cond) Broadcast():广播通知,唤醒所有在条件变量上等待的Goroutine。//全局条件变量 var cond sync.Cond func producer(w chan<- int, idx int) { for { //上锁 cond.L.Lock() for len(w) == 5 { // 判断通道是否已经满了,不能用 if cond.Wait() } num := rand.Intn(800) w <- num //向通道中写入数据 fmt.Printf("生产者%d 生产 %d\n", idx, num) //解锁 cond.L.Unlock() //唤醒消费者 cond.Signal() //睡眠一会,让出时间轮片,给其他Go程执行 time.Sleep(time.Millisecond*200) } } func consumer(r <-chan int, idx int) { for { cond.L.Lock() for len(r) == 0 { // 判断通道中是否有元素可以读取,不能用 if cond.Wait() } num := <-r fmt.Printf("消费者%d,消费 %d\n", idx, num) cond.L.Unlock() cond.Signal() time.Sleep(time.Millisecond*200) } } func main() { ch := make(chan int, 5) rand.Seed(time.Now().UnixNano()) //使用条件变量,并指定所使用的锁 cond.L = new(sync.Mutex) for i := 0; i < 5; i++ { go producer(ch, i+1) } for i := 0; i < 3; i++ { go consumer(ch, i+1) } for { ; } }- 声明条件变量:
var cond sync.Cond - 为条件变量指定使用的锁:
cond.L = new(sync.Mutex) - 上锁:
cond.L.Lock(),解锁:cond.L.Unlock() - 在判断是否
cond.Wait()时,使用for,而不是if,因为阻塞的Go程被重新唤醒时,会从被唤醒的代码处继续向下执行,而不会再去判断是否满足条件!for循环则会在每次Go程被唤醒时,重新去判断是否满足条件,如果不满足跳出条件,则继续阻塞并释放锁。
- 声明条件变量:
