某徒弟每日工作就是把数据库里上十万条数据取出来进行一些操作(更新字段、检查链接状态等),把 Go 当 PHP 写,一个 for 循环,一两个小时过去了才能出结果(可能他就是想这么摸鱼吧)。他说并发编程容易写错,需求又急:),幸好我之前写过一点,整一个 demo 给他参考一下。
WaitGroup 和 数据库分页
package main
import (
"fmt"
"sync"
"time"
)
func main() {
start := time.Now()
var wg sync.WaitGroup
// 总条数,一般从数据库 COUNT 出来
count := 8823
// 每页处理的条数
pageSize := 1000
// 总页数 向上取整
page := (count + pageSize - 1) / pageSize
// 每页开一个 goroutine
for i := 0; i < page; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
// 计算当前页的偏移量
offset := i * pageSize
// demo 拼接 Sql 然后 按ID顺序 查出数据遍历处理 记录日志方便知道清洗的位置
fmt.Println("sql 里的 limit ", offset, ",", pageSize)
}(i)
}
wg.Wait()
end := time.Since(start)
fmt.Println("总共花了", end)
}
数据量更大,不能直接载入机器内存
package main
import (
"fmt"
"sync"
"time"
)
func main() {
start := time.Now()
// 大量数据一次开多个 goroutine 全部取出数据放到机器内存里,数据库和内存都可能会崩
// 采用同步加异步的方式处理
// 外部循环同步,处理完再开始下一轮 内部循环并发执行
// 总条数
count := 121231
// 外部循环每次处理的条数,考虑机器内存可以适当调整
pageOutSize := 10000
// 外部循环次数
pageOut := (count + pageOutSize - 1) / pageOutSize
for i := 0; i < pageOut; i++ {
// 内循环
var wg sync.WaitGroup
// 内循环每页处理的条数
pageInnerSize := 1000
// 内部循环需要处理的总条数
innerCount := pageOutSize
if i == pageOut-1 {
// 最后一页了 只需要处理剩下的条数即可
innerCount = count - pageOutSize*(pageOut-1)
}
// 内循环的总页数,每页开启一个 goroutine
pageInner := (innerCount + pageInnerSize - 1) / pageInnerSize
for j := 0; j < pageInner; j++ {
wg.Add(1)
go func(j int) {
defer wg.Done()
// 计算偏移量,需要考虑外部循环的轮次
offset := i*pageOutSize + j*pageInnerSize
// demo 拼接 Sql 然后 按ID顺序 查出数据遍历处理 记录日志方便知道清洗的位置
fmt.Println("sql 里的 limit ", offset, ",", pageInnerSize)
}(j)
}
wg.Wait()
}
end := time.Since(start)
fmt.Println("总共花了", end)
}
如果遇到错误需要终止执行,可以考虑将 WaitGroup 换成 errgroup,看具体需求。不足之处欢迎留言指正:)