Select
-
select
可见监听Channel
上的数据流动; -
select
结构与switch
的结构类似,但select
有比较多的限制,其中最大的一条限制就是每个case
语句里必须是一个IO操作!select { case <-ch1: //ch1 成功读到数据,则执行该case case ch2 <- 1: //成功向 ch2 中写入数据,则执行该case default: //默认执行项 }
- 在一个
select
语句中,Go会按顺序从头至尾评估每一个发送和接收的语句; - 如果其中的任意一条语句可以继续执行,即没有被阻塞,那么就从这些语句中任意选择一条来使用;
- 如果没有一条语句可以执行,即所有的通道都被阻塞,那么有两种可能:
- 如果提供了
default
语句,那么就会执行,同时程序的执行会从select
语句后的语句中恢复; - 如果没有
default
语句,那么select
语句将被阻塞,直到至少有一个通信可以进行下去。
- 如果提供了
- 一旦每次轮询都进入了
default
语句,CPU就会进入忙轮询
状态,所以通常不会使用default
语句,而是选择阻塞,出让CPU
- 在一个
- 基本使用
-
select
语句本身不带循环机制,还是需要借助for
-
break
语句可以跳出select
语句,所以不能试图在select
语句中使用break
跳出for
循环;
func main() { ch := make(chan int) // 存放数据的通道 quit := make(chan bool) // 判断是否退出的通道 go func() { for { select { case num := <-ch: // 监听 ch 通道中的数据流动,有则读取出来,并进入分支 fmt.Print(num, " ") case <-quit: // 监听 quit 通道中的数据流动,有则进入分支 runtime.Goexit() // 终止GO程,还可以使用 return 结束 for循环 } } }() x,y := 1,1 for i:=0; i<20; i++ { //计算斐波那契数列 ch <- x // 向 ch 通道中写数据 x,y = y, x+y } quit <- true // 向 quit 通道中写数据 }
-
case
语句在判断通道上是否有数据时,不仅仅是在判断,如果有数据,它会直接从通道中取出数据,所以不能在分支内获取数据,否则数据会丢失。
case <-ch: // 本地数据被丢弃 num := <-ch // 此时获取的是下一次的数据
-
- 超时机制
- 为了避免整个程序进入阻塞情况,可以利用
select
来设置超时; -
time.After()
设置超时
go func() { for { select { case num := <-ch: fmt.Print(num) case <-time.After(5*time.Second): fmt.Print("timeout") } }()
- 如果
ch
通道中有数据,则进入num := <-ch
分支,那么time.After()
的时间会被重置; - 如果
num := <-ch
也是阻塞的,那么select
则会陷入阻塞,循环暂时终止;如果阻塞时间达到定时时间5s
,则进入<-time.After(5*time.Second)
分支;
func main() { ch := make(chan int) quit := make(chan bool) go func() { for { select { case num := <-ch: fmt.Print(num) case <-time.After(5*time.Second): fmt.Print("timeout") quit <- true // 超时时间到,向 quit 通道中写入数据,结束程序 } } }() //ch <- 89 // 不向 ch 通道中写入数据,模拟超时 <- quit // 主Go程从 quit 通道中读取到数据时,主Go程结束,那么子Go程也会随之结束 }
- 由此可见,虽然
ch
通道只有读端,没有写端,但select
语句并不会让当前Go程发生死锁,如果读端在select
语句之外,则会发生死锁。
- 为了避免整个程序进入阻塞情况,可以利用
锁
Go语言把锁集成到了Channel
中,Channel
具备了锁机制。
死锁
死锁并不是一种锁,而是使用锁导致的一种现象;
- 单Go程自己死锁:
Channel
应至少有2
个以上的Go程中进行通信,否则死锁!func main() { ch := make(chan int) ch <- 89 num := <-ch } // 写端阻塞,读端不会执行,造成死锁异常
- Go程间的
Channel
访问顺序导致死锁func main() { ch := make(chan int) num := <-ch go func() { ch <- 89 } } // 子Go程还没来得及执行,主Go程已经阻塞了,造成死锁异常
- 多Go程,多
Channel
交叉死锁func main() { ch1 := make(chan int) ch2 := make(chan int) go func() { for { select { case num := <-ch1: //读取通道 ch1 中的数据,写入通道 ch2 ch2 <- num } } }() for { select { case num := <-ch2: ch1 <- num } } }
互斥锁
- 每个资源都对应于一个称为
互斥锁
的标记,这个标记用来保证在任意时刻,只能有一个协程/线程访问该资源; - 互斥锁是传统并发编程对共享资源进行访问控制的主要手段,它由标准库
sync
中的Mutex
结构体类型表示; -
sync.Mutex
类型只有两个公开的指针方法,Lock(锁定当前资源)
和Unlock(解锁)
- 在使用互斥锁时,对资源操作完成后,一定要解锁,否则会出现流程执行异常、死锁等问题,通常在锁定后,立即使用
defer
语句解锁;var mutex sync.Mutex //创建一把互斥锁 mutex.Lock() //上锁 mutex.Unlock() //解锁
读写锁
- 互斥锁的本质是,当一个
Goroutine
访问时,其他Goroutine
都不能访问;这样在资源同步、避免竞争的同时,也降低了程序的并发能力,程序由原来的并行执行变成了串行执行; - 当对一个不会变化资源只做
读
操作时,是不存在资源竞争的,多少Goroutine
同时读取都没有问题;所以,竞争的问题在于写数据
,只有在写
与写
之间才存在数据同步问题,读
与读
之间不存在互斥操作的必要; - 在
读
与读
之间使用互斥锁显得很浪费资源,这也就衍生出另一种锁:读写锁
,一把具有读属性和写属性的锁; - 读写锁可以让多个读操作并发,即同时读取,但对于写操作却是完全互斥的,也就是说当一个
Goroutine
进行写操作时,其他Goroutine
既不能读,也不能写; - Go中的读写锁由结构体类型
sync.RWMutex
表示,包含两组方法:- 一组是对写操作的锁定和解锁,简称
写锁定
和写解锁
func (*RWMutex)Lock() func (*RWMutex)Unlock()
- 另一组是对读操作的锁定和解锁,简称
读锁定
和读解锁
func (*RWMutex)RLock() func (*RWMutex)RUnlock()
- 一组是对写操作的锁定和解锁,简称
-
Channel
本身就已经集成了锁,所以尽量不要把互斥锁、读写锁与Channel
混用,否则可能造成隐形死锁! - 在多Go程通信时,为了更好地实现数据同步,不会使用
Channel
,还是会选择使用效率更高的读写锁,这样可以精确控制锁定的范围。
条件变量
- 在
生产者 -> Channel -> 消费者
模型中,由于Channel
自带锁机制,生产者获取到CPU时间轮片时,就会去向Channel
中写数据,在这个过程中,Channel
会先加锁,然后生产者才能开始写数据,但如果缓冲区中的数据已经满了,本次加锁也就毫无意义,对于消费者来说依然如此,这就是条件变量存在的意义! - 原理:
生产者 -> 判断条件变量 -> 加锁 -> 向缓冲区中写入数据 -> 唤醒阻塞在条件变量上的消费者 消费者 -> 判断条件变量 -> 加锁 -> 从缓冲区中读取数据 -> 唤醒阻塞在条件变量上的生产者
- 条件变量并不保证同一时刻仅有一个协程/线程访问某个共享资源,而是在共享数据的状态发生变化时,通知阻塞在某个条件上的协程/线程;条件变量不是锁,在并发中不能达到同步的目的,所以经常与锁结合使用!
- Go标准库中的
sync.Cond
类型表示条件变量;type Cond struct { noCopy noCopy L Locker notify notifyList checker copyChecker }
-
L
表示与条件变量搭配使用的锁; - 对应3个常用方法:
Wait、Signal、Broadcast
-
-
func (c *Cond) Wait()
具备三个作用- 让当前Go程阻塞,等待条件变量满足;
- 释放已掌握的互斥锁(读写锁),相当于
cond.L.Unlock()
,所以调用Wait()
的Goroutine
一定已经加了锁; - 第
1、2
两步是一个原子操作,不可再分!当前Go程已经释放了锁,且处于阻塞状态,等待被唤醒; - 当被唤醒时,
Wait()
函数返回,解除阻塞状态,并重新获取互斥锁(读写锁),相当于cond.L.Lock()
,然后从当前位置继续向下执行。
-
func (c *Cond) Signal()
:单发通知,唤醒一个在条件变量上的等待的Goroutine
; -
func (c *Cond) Broadcast()
:广播通知,唤醒所有在条件变量上等待的Goroutine
。//全局条件变量 var cond sync.Cond func producer(w chan<- int, idx int) { for { //上锁 cond.L.Lock() for len(w) == 5 { // 判断通道是否已经满了,不能用 if cond.Wait() } num := rand.Intn(800) w <- num //向通道中写入数据 fmt.Printf("生产者%d 生产 %d\n", idx, num) //解锁 cond.L.Unlock() //唤醒消费者 cond.Signal() //睡眠一会,让出时间轮片,给其他Go程执行 time.Sleep(time.Millisecond*200) } } func consumer(r <-chan int, idx int) { for { cond.L.Lock() for len(r) == 0 { // 判断通道中是否有元素可以读取,不能用 if cond.Wait() } num := <-r fmt.Printf("消费者%d,消费 %d\n", idx, num) cond.L.Unlock() cond.Signal() time.Sleep(time.Millisecond*200) } } func main() { ch := make(chan int, 5) rand.Seed(time.Now().UnixNano()) //使用条件变量,并指定所使用的锁 cond.L = new(sync.Mutex) for i := 0; i < 5; i++ { go producer(ch, i+1) } for i := 0; i < 3; i++ { go consumer(ch, i+1) } for { ; } }
- 声明条件变量:
var cond sync.Cond
- 为条件变量指定使用的锁:
cond.L = new(sync.Mutex)
- 上锁:
cond.L.Lock()
,解锁:cond.L.Unlock()
- 在判断是否
cond.Wait()
时,使用for
,而不是if
,因为阻塞的Go程被重新唤醒时,会从被唤醒的代码处继续向下执行,而不会再去判断是否满足条件!for
循环则会在每次Go程被唤醒时,重新去判断是否满足条件,如果不满足跳出条件,则继续阻塞并释放锁。
- 声明条件变量: