如何用go实现一个简单的并行任务

Golang并发

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6)
 7
 8var wg sync.WaitGroup
 9
10func printer(ch chan int) {
11	for i := range ch {
12		fmt.Printf("Received %d ", i)
13	}
14	wg.Done()   //
15}
16
17// main is the entry point for the program.
18func main() {
19	c := make(chan int)
20	go printer(c)
21	wg.Add(1)
22
23	// Send 10 integers on the channel.
24	for i := 1; i <= 10; i++ {
25		c <- i
26	}
27
28	close(c)
29	wg.Wait()
30}

正常情况下goroutine的结束过程是不可控制的。

存在这么一种情况,主程序已经结束了,但是主程序中新激活的goroutine并没有运行完。主程序一旦结束,主程序中的goroutine也就结束了。

 1import (
 2	"fmt"
 3	"time"
 4)
 5
 6func main() {
 7
 8	go func() {
 9		for index := 0; index < 10; index++ {
10			fmt.Print(index, " ")
11			time.Sleep(time.Second * 1)
12		}
13	}()
14
15	fmt.Println("end")
16}
17// 运行结果为
18// end

匿名函数中的代码并没有运行,使用sync.WaitGroup修改代码如下

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6	"time"
 7)
 8
 9var wg sync.WaitGroup
10
11func main() {
12	wg.Add(1)   // 表示等待一个 goroutine运行结束
13	go func() {
14		for index := 0; index < 10; index++ {
15			fmt.Print(index, " ")
16			time.Sleep(time.Second*1)
17		}
18		wg.Done()
19	}()
20
21	wg.Wait()
22	fmt.Println("end")
23}
24// 运行结果为
25// 0 1 2 3 4 5 6 7 8 9 end

wg.Add(1),且有两个执行速度不一样的线程时候

 1func main() {
 2	wg.Add(1)
 3	go func() {
 4		for index := 0; index < 10; index++ {
 5			fmt.Print(index, " ")
 6			time.Sleep(time.Second*1)
 7		}
 8		wg.Done()
 9	}()
10
11	go func() {
12		for index := 0; index < 10; index++ {
13			fmt.Print(index, "+ ")
14			time.Sleep(time.Second*10)
15		}
16		wg.Done()
17	}()
18
19	wg.Wait()
20	fmt.Println("end")
21}
22// 运行结果为
23// 0+ 0 1 2 3 4 5 6 7 8 9 1+ end

sycc.WaitGroup 有三个方法

Add()用来设置或者添加需要等待完成的goroutine的数量, Add(2) 或者两次调用Add(1)表示要等待两个goroutine完成

Done()在goroutine真正完成之前调用本方法来表示goroutine已经完成了,会对等待计数器的值减一。 Done()的数目一定要与Add()的数量一致,否则会造成永久阻塞而出现死锁。fatal error: all goroutines are asleep - deadlock!

Wait() 在等待计数器减为0之前,Wait() 会一直阻塞当前的goroutine。

简单的协程池

开辟固定数量的协程池处理任务,通过chan 传递Task。

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6	"sync/atomic"
 7	"time"
 8)
 9
10var (
11	count int64
12	wg    = sync.WaitGroup{}
13	ch    = make(chan int)  // no buffer chan need consume first
14)
15
16func main() {
17
18	// total task N
19	const N = 1000
20
21	// build a pool with 5 goroutine, every goroutine deal the data form the chan
22	for i := 0; i < 5; i++ {
23		wg.Add(1)
24		go func() {
25			defer wg.Done()
26			for val := range ch {
27				count2 := atomic.AddInt64(&count, 1)
28				<-time.After(100 * time.Millisecond)
29				fmt.Printf("%vvar: %v\n", count2, val) // count2 is a new local variable
30			}
31		}()
32	}
33
34	fmt.Println(">>> Start Work")
35	startAt := time.Now()
36
37	// task producer
38	for i := 0; i < N; i++ {
39		ch <- i
40	}
41
42	close(ch) // need to close the chan first, or lead to deadlock due to all goroutines are asleep.
43	wg.Wait()
44
45	fmt.Println(">>> Finished")
46	totalTime := time.Now().Sub(startAt)
47	fmt.Printf("Total time is %v, total task: %v, avg time: %v", totalTime, count, totalTime/time.Duration(N))
48}
1>>> Finished
2Total time is 20.359523649s, total task: 1000, avg time: 20.359523ms

如果数据处理的量远远小于开辟的协程池的数量无疑是资源的浪费,但是每次都复用协程,减少了协程创建和销毁的开销

动态协程池

当每一个任务来临的时候创建一个协程,创建的协程的最大数量由有容量的chan 控制

 1package main
 2
 3import (
 4	"fmt"
 5	"sync"
 6	"sync/atomic"
 7	"time"
 8)
 9
10var (
11	count    int64
12	wg       = sync.WaitGroup{}
13	concurCh = make(chan bool, 5)
14)
15
16func main() {
17	// total task N
18	const N = 1000
19
20	fmt.Println(">>> Start Work")
21	startAt := time.Now()
22
23	for i := 0; i < N; i++ {
24		wg.Add(1)
25		go func(val int) {
26			defer func() {
27				<-concurCh
28				wg.Done()
29			}()
30			concurCh <- true
31			count2 := atomic.AddInt64(&count, 1)
32			<-time.After(100 * time.Millisecond)
33			fmt.Printf("%2d var: %v\n", count2, val)
34		}(i)
35	}
36
37	wg.Wait()
38	defer close(concurCh)
39	fmt.Println(">>> Finished")
40	totalTime := time.Now().Sub(startAt)
41	fmt.Printf("Total time is %v, total task: %v, avg time: %v", totalTime, count, totalTime/time.Duration(N))
42}
1>>> Finished
2Total time is 20.392798952s, total task: 1000, avg time: 20.392798ms

需要不断地创建和销毁goroutine可能会造成额外的开销

实际测试下来这两种写法的性能差别不是很大。

Goroutine 的资源开销非常小,要避免过渡的滥用。特别是在并发请求的时候,如果Goroutine设置的过多很容易把下游打挂

 1func TestConcurrency(t *testing.T) {
 2	ids := make([]int, 0)
 3	for i := 0; i < 100; i++ {
 4		ids = append(ids, i)
 5	}
 6
 7	work := func(i, j int) {
 8		<-time.After(1 * time.Second)
 9		println(i)
10
11	}
12
13	blchan := make(chan int, 10)
14
15	startAt := time.Now()
16	wg := sync.WaitGroup{}
17	for i, v := range ids {
18		wg.Add(1)
19		blchan <- 1
20		go func(i, j int) {
21			defer wg.Done()
22			work(i, j)
23			<- blchan
24		}(i, v)
25	}
26	wg.Wait()
27	close(blchan)
28	println(time.Now().Sub(startAt).Round(time.Second).String())
29}