-
现在有一个需求,两个子协程分别执行两个一次性长耗时操作,其中一个协程因为错误退出的时候,另外一个协程也需要退出,当我阅读相关文章的时候都告诉我,用如下代码实现:
package main import ( "context" "errors" "sync" "time" ) func main() { ctx, cancel := context.WithCancel(context.Background()) wg := sync.WaitGroup{} errChan := make(chan error) wg.Add(2) // 子协程1 go func(ctx context.Context) { defer wg.Done() for { select { case <-ctx.Done(): return default: // 模拟一个阻塞30秒的长耗时任务 time.Sleep(30 * time.Second) } } }(ctx) // 子协程2 go func() { defer wg.Done() // 模拟执行3秒以后出现了错误退出协程 time.Sleep(3 * time.Second) errChan <- errors.New("something is wrong") }() // cancel本身应该在子协程出现错误退出的时候调用 // 因为子协程1和子协程2都可能会出现错误而退出 // 为了避免忘记调用cancel的情况,专门另起一个协程来控制cancel操作 go func() { if err := <-errChan; err != nil { cancel() } }() wg.Wait() close(errChan) }
但是仔细分析后,发现这样的代码并不能满足我们的需求。
先我们先明确一下我们需求:
- 子协程1和子协程2都是只需要执行一次的长耗时任务
- 子协程2因为发生了错误退出,此时子协程1也需要退出
我们再来分析上面的代码,是否能满足我们的需求:
- 当子协程2发生错误退出了,将错误放入errChan中,errChan拿出值发现err != nil,调用cancel
- 此时子协程1正在被阻塞中,等待30秒阻塞完成以后,进入下一次循环,发现当前当前协程应该cancel了,于是当前子协程1退出协程。
显然执行的结果并不能满足我们的预期需求:
假如子协程1中的任务执行了一次以后,进入下一次循环,发现ctx还没有接收到cancel的信号,就会第二次执行任务,现在与我们的需求是违背的。
此时的解决方案可以有两种:
-
在子协程1中加入一个bool类型的变量来判断任务是否已经执行过,代码如下:
// 子协程1 go func(ctx context.Context) { defer wg.Done() var isExec bool for { select { case <-ctx.Done(): return default: if !isExec { // 模拟一个阻塞30秒的长耗时任务 time.Sleep(30 * time.Second) } } } }(ctx)
这样做其实也没有意义,这个任务本身就应该只执行一次,执行结束后,难道一直循环着等其他地方cancel以后才退出当前协程吗?
-
任务执行完成以后return直接退出,代码如下:
// 子协程1 go func(ctx context.Context) { defer wg.Done() for { select { case <-ctx.Done(): return default: // 模拟一个阻塞30秒的长耗时任务 time.Sleep(30 * time.Second) return } } }(ctx)
这样做以后就会导致ctx的cancel没有任何意义,不管怎样,子协程1中的任务都是会执行完成以后才会退出的
仔细分析下来,这样的写法其实并不能满足我们的需求。
那么到底应该如何书写才能满足我们的需求呢。
需要分为三种情况来看:
-
任务本身是可以通过context.Context控制的,比如http请求
package main import ( "context" "errors" "fmt" "io" "net/http" "sync" "time" ) func main() { ctx, cancel := context.WithCancel(context.Background()) // 当有两个协程往同一个通道中写入数据的时候,但是又只有一处读的情况下,至少需要一个缓冲区 // 否则会造成死锁 errChan := make(chan error, 1) wg := sync.WaitGroup{} wg.Add(2) // 子协程1 go func(ctx context.Context) { defer wg.Done() request, err := http.NewRequestWithContext(ctx, "GET", "http://127.0.0.1:8081", nil) if err != nil { errChan <- err return } resp, err := http.DefaultClient.Do(request) if err != nil { errChan <- err return } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { errChan <- err return } fmt.Println(string(body)) }(ctx) // 子协程2 go func() { defer wg.Done() time.Sleep(3 * time.Second) errChan <- errors.New("something is wrong") }() // cancel本身应该在子协程出现错误退出的时候调用 // 因为子协程1和子协程2都可能会出现错误而退出 // 为了避免忘记调用cancel的情况,专门另起一个协程来控制cancel操作 go func() { if err := <-errChan; err != nil { fmt.Println(err) cancel() } }() wg.Wait() }
上面的代码中,子协程1中访问的是一个耗时较长的http接口(我在此接口中sleep了30秒来模拟因为网络原因或者其他原因导致接口访问时间较长的情况),假如子协程2运行了3秒以后出现了错误,调用了cancel,那么子协程1也会因为context的控制产生错误直接退出,不需要等待30秒请求结束以后才会退出。
-
如果任务本身不能通过ctx控制,但是任务本身是可以拆分为多次完成的任务。比如,子协程1中的任务是读取一个100M文件。
package main import ( "context" "errors" "fmt" "sync" "time" ) func main() { ctx, cancel := context.WithCancel(context.Background()) // 当有两个协程往同一个通道中写入数据的时候,但是又只有一处读的情况下,至少需要一个缓冲区 // 否则会造成死锁 errChan := make(chan error, 1) wg := sync.WaitGroup{} wg.Add(2) // 子协程1 go func(ctx context.Context) { for i := 0; i < 100; i++ { select { case <-ctx.Done(): return default: time.Sleep(1 * time.Second) fmt.Println("读取1M的数据") } } }(ctx) // 子协程2 go func() { defer wg.Done() time.Sleep(3 * time.Second) errChan <- errors.New("something is wrong") }() // cancel本身应该在子协程出现错误退出的时候调用 // 因为子协程1和子协程2都可能会出现错误而退出 // 为了避免忘记调用cancel的情况,专门另起一个协程来控制cancel操作 go func() { if err := <-errChan; err != nil { fmt.Println(err) cancel() } }() wg.Wait() }
上面的代码中,读取100M的文件,分为100次读取,每次读取1M数据,假如子协程2运行了3秒出现错误退出以后,子协程1在读取了最近的1M数据以后进入下一次循环也会发现被cancel了,就会退出协程, 不继续执行任务
如果任务本身是一次性任务,并且不能拆分为多次任务,又不能被context.Context控制的任务,只能等待任务执行结束,不需要传入context.Context来进行取消控制
除了自己控制context.Context来控制协程取消操作以外,还可以利用ErrGroup的方式来更简单控制协程的取消
package main
import (
"context"
"fmt"
"io"
"net/http"
"time"
"golang.org/x/sync/errgroup"
)
func main() {
eg, ctx := errgroup.WithContext(context.Background())
eg.Go(func() error {
request, err := http.NewRequestWithContext(ctx, "GET", "http://192.168.101.131:8081", nil)
if err != nil {
return err
}
resp, err := http.DefaultClient.Do(request)
if err != nil {
fmt.Println(err)
return err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
fmt.Println(string(body))
return nil
})
eg.Go(func() error {
for i := 0; i < 10; i++ {
fmt.Printf("wait %d second\n", i)
time.Sleep(time.Second)
}
return fmt.Errorf("something is wrong")
})
if err := eg.Wait(); err != nil {
fmt.Println(err)
return
}
fmt.Println("task is success")
}
上面的代码,可以用非常简单的方式来处理子协程 2出现错误的情况下,子协程1也同时需要退出的需求。不需要自己控制sync.Group和errChan导致代码复杂化。