「Go」并发实现
如何用go实现一个简单的并行任务
Golang并发
1package main
2
3import (
4 "fmt"
5 "sync"
6)
7
8var wg sync.WaitGroup
9
10func printer(ch chan int) {
11 for i := range ch {
12 fmt.Printf("Received %d ", i)
13 }
14 wg.Done() //
15}
16
17// main is the entry point for the program.
18func main() {
19 c := make(chan int)
20 go printer(c)
21 wg.Add(1)
22
23 // Send 10 integers on the channel.
24 for i := 1; i <= 10; i++ {
25 c <- i
26 }
27
28 close(c)
29 wg.Wait()
30}
正常情况下goroutine的结束过程是不可控制的。
存在这么一种情况,主程序已经结束了,但是主程序中新激活的goroutine并没有运行完。主程序一旦结束,主程序中的goroutine也就结束了。
1import (
2 "fmt"
3 "time"
4)
5
6func main() {
7
8 go func() {
9 for index := 0; index < 10; index++ {
10 fmt.Print(index, " ")
11 time.Sleep(time.Second * 1)
12 }
13 }()
14
15 fmt.Println("end")
16}
17// 运行结果为
18// end
匿名函数中的代码并没有运行,使用sync.WaitGroup
修改代码如下
1package main
2
3import (
4 "fmt"
5 "sync"
6 "time"
7)
8
9var wg sync.WaitGroup
10
11func main() {
12 wg.Add(1) // 表示等待一个 goroutine运行结束
13 go func() {
14 for index := 0; index < 10; index++ {
15 fmt.Print(index, " ")
16 time.Sleep(time.Second*1)
17 }
18 wg.Done()
19 }()
20
21 wg.Wait()
22 fmt.Println("end")
23}
24// 运行结果为
25// 0 1 2 3 4 5 6 7 8 9 end
当wg.Add(1)
,且有两个执行速度不一样的线程时候
1func main() {
2 wg.Add(1)
3 go func() {
4 for index := 0; index < 10; index++ {
5 fmt.Print(index, " ")
6 time.Sleep(time.Second*1)
7 }
8 wg.Done()
9 }()
10
11 go func() {
12 for index := 0; index < 10; index++ {
13 fmt.Print(index, "+ ")
14 time.Sleep(time.Second*10)
15 }
16 wg.Done()
17 }()
18
19 wg.Wait()
20 fmt.Println("end")
21}
22// 运行结果为
23// 0+ 0 1 2 3 4 5 6 7 8 9 1+ end
sycc.WaitGroup
有三个方法
Add()
用来设置或者添加需要等待完成的goroutine的数量, Add(2)
或者两次调用Add(1)
表示要等待两个goroutine完成
Done()
在goroutine真正完成之前调用本方法来表示goroutine已经完成了,会对等待计数器的值减一。 Done()
的数目一定要与Add()
的数量一致,否则会造成永久阻塞而出现死锁。fatal error: all goroutines are asleep - deadlock!
Wait()
在等待计数器减为0之前,Wait() 会一直阻塞当前的goroutine。
简单的协程池
开辟固定数量的协程池处理任务,通过chan
传递Task。
1package main
2
3import (
4 "fmt"
5 "sync"
6 "sync/atomic"
7 "time"
8)
9
10var (
11 count int64
12 wg = sync.WaitGroup{}
13 ch = make(chan int) // no buffer chan need consume first
14)
15
16func main() {
17
18 // total task N
19 const N = 1000
20
21 // build a pool with 5 goroutine, every goroutine deal the data form the chan
22 for i := 0; i < 5; i++ {
23 wg.Add(1)
24 go func() {
25 defer wg.Done()
26 for val := range ch {
27 count2 := atomic.AddInt64(&count, 1)
28 <-time.After(100 * time.Millisecond)
29 fmt.Printf("%vvar: %v\n", count2, val) // count2 is a new local variable
30 }
31 }()
32 }
33
34 fmt.Println(">>> Start Work")
35 startAt := time.Now()
36
37 // task producer
38 for i := 0; i < N; i++ {
39 ch <- i
40 }
41
42 close(ch) // need to close the chan first, or lead to deadlock due to all goroutines are asleep.
43 wg.Wait()
44
45 fmt.Println(">>> Finished")
46 totalTime := time.Now().Sub(startAt)
47 fmt.Printf("Total time is %v, total task: %v, avg time: %v", totalTime, count, totalTime/time.Duration(N))
48}
1>>> Finished
2Total time is 20.359523649s, total task: 1000, avg time: 20.359523ms
如果数据处理的量远远小于开辟的协程池的数量无疑是资源的浪费,但是每次都复用协程,减少了协程创建和销毁的开销
动态协程池
当每一个任务来临的时候创建一个协程,创建的协程的最大数量由有容量的chan
控制
1package main
2
3import (
4 "fmt"
5 "sync"
6 "sync/atomic"
7 "time"
8)
9
10var (
11 count int64
12 wg = sync.WaitGroup{}
13 concurCh = make(chan bool, 5)
14)
15
16func main() {
17 // total task N
18 const N = 1000
19
20 fmt.Println(">>> Start Work")
21 startAt := time.Now()
22
23 for i := 0; i < N; i++ {
24 wg.Add(1)
25 go func(val int) {
26 defer func() {
27 <-concurCh
28 wg.Done()
29 }()
30 concurCh <- true
31 count2 := atomic.AddInt64(&count, 1)
32 <-time.After(100 * time.Millisecond)
33 fmt.Printf("%2d var: %v\n", count2, val)
34 }(i)
35 }
36
37 wg.Wait()
38 defer close(concurCh)
39 fmt.Println(">>> Finished")
40 totalTime := time.Now().Sub(startAt)
41 fmt.Printf("Total time is %v, total task: %v, avg time: %v", totalTime, count, totalTime/time.Duration(N))
42}
1>>> Finished
2Total time is 20.392798952s, total task: 1000, avg time: 20.392798ms
需要不断地创建和销毁goroutine
可能会造成额外的开销
实际测试下来这两种写法的性能差别不是很大。
Goroutine 的资源开销非常小,要避免过渡的滥用。特别是在并发请求的时候,如果Goroutine设置的过多很容易把下游打挂
1func TestConcurrency(t *testing.T) {
2 ids := make([]int, 0)
3 for i := 0; i < 100; i++ {
4 ids = append(ids, i)
5 }
6
7 work := func(i, j int) {
8 <-time.After(1 * time.Second)
9 println(i)
10
11 }
12
13 blchan := make(chan int, 10)
14
15 startAt := time.Now()
16 wg := sync.WaitGroup{}
17 for i, v := range ids {
18 wg.Add(1)
19 blchan <- 1
20 go func(i, j int) {
21 defer wg.Done()
22 work(i, j)
23 <- blchan
24 }(i, v)
25 }
26 wg.Wait()
27 close(blchan)
28 println(time.Now().Sub(startAt).Round(time.Second).String())
29}