並發編程一直是Golang區別與其他語言的很大優勢,也是實際工作場景中經常遇到的。近日筆者在組內分享了我們常見的並發場景,及代碼示例,以期望大家能在遇到相同場景下,能快速的想到解決方案,或者是拿這些方案與自己實現的比較,取長補短。現整理出來與大家共享。
很多時候,我們只想並發的做一件事情,比如測試某個接口的是否支持並發。那麼我們就可以這麼做:
func RunScenario1() {
count := 10
var wg sync.WaitGroup
for i := 0; i < count; i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()
doSomething(index)
}(i)
}
wg.Wait()
}
使用goroutine來實現異步,使用WaitGroup來等待所有goroutine結束。這裡要注意的是要正確釋放WaitGroup的counter(在goroutine裡調用Done()方法)。
但此種方式有個弊端,就是當goroutine的量過多時,很容易消耗完客戶端的資源,導致程序表現不佳。
我們仍然以測試某個後端API接口為例,如果我們想知道這個接口在持續高並發情況下是否有句柄洩露,這種情況該如何測試呢?
這種時候,我們需要能控制時間的高並發模型:
func RunScenario2() {
timeout := time.Now().Add(time.Second * time.Duration(10))
n := runtime.NumCPU()
waitForAll := make(chan struct{})
done := make(chan struct{})
concurrentCount := make(chan struct{}, n)
for i := 0; i < n; i++ {
concurrentCount <- struct{}{}
}
go func() {
for time.Now().Before(timeout) {
<-done
concurrentCount <- struct{}{}
}
waitForAll <- struct{}{}
}()
go func() {
for {
<-concurrentCount
go func() {
doSomething(rand.Intn(n))
done <- struct{}{}
}()
}
}()
<-waitForAll
}
上面的代碼裡,我們通過一個buffered channel來控制並發的數量(concurrentCount),然後另起一個channel來周期性的發起新的任務,而控制的條件就是 time.Now().Before(timeout),這樣當超過規定的時間,waitForAll 就會得到信號,而使整個程序退出。
這是一種實現方式,那麼還有其他的方式沒?我們接著往下看。
前面說的基於時間的並發模型,那如果只知道數據量很大,但是具體結束時間不確定,該怎麼辦呢?
比如,客戶給了個幾TB的文件列表,要求把這些文件從存儲裡刪除。再比如,實現個爬蟲去爬某些網站的所有內容。
而解決此類問題,最常見的就是使用工作池模式了(Worker Pool)。以刪文件為例,我們可以簡單這樣來處理:
雖然這只是個簡單Worker Pool模型,但已經能滿足我們的需求:
func RunScenario3() {
numOfConcurrency := runtime.NumCPU()
taskTool := 10
jobs := make(chan int, taskTool)
results := make(chan int, taskTool)
var wg sync.WaitGroup
// workExample
workExampleFunc := func(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
res := job * 2
fmt.Printf("Worker %d do things, produce result %d \n", id, res)
time.Sleep(time.Millisecond * time.Duration(100))
results <- res
}
}
for i := 0; i < numOfConcurrency; i++ {
wg.Add(1)
go workExampleFunc(i, jobs, results, &wg)
}
totalTasks := 100 // 本例就要從文件列表裡讀取
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < totalTasks; i++ {
n := <-results
fmt.Printf("Got results %d \n", n)
}
close(results)
}()
for i := 0; i < totalTasks; i++ {
jobs <- i
}
close(jobs)
wg.Wait()
}
在Go裡,分發任務,收集結果,我們可以都交給Channel來實現。從實現上更加的簡潔。
仔細看會發現,本模型也是適用於按時間來控制並發。只要把totalTask的遍歷換成時間控制就好了。
goroutine和channel的組合在實際編程時經常會用到,而加上Select更是無往而不利。
func RunScenario4() {
sth := make(chan string)
result := make(chan string)
go func() {
id := rand.Intn(100)
for {
sth <- doSomething(id)
}
}()
go func() {
for {
result <- takeSomthing(<-sth)
}
}()
select {
case c := <-result:
fmt.Printf("Got result %s ", c)
case <-time.After(time.Duration(30 * time.Second)):
fmt.Errorf("指定時間內都沒有得到結果")
}
}
在select的case情況,加上time.After()模型可以讓我們在一定時間范圍內等待異步任務結果,防止程序卡死。
上面我們說到持續的壓測某後端API,但並未實時收集結果。而很多時候對於性能測試場景,實時的統計吞吐率,成功率是非常有必要的。
func RunScenario5() {
concurrencyCount := runtime.NumCPU()
for i := 0; i < concurrencyCount; i++ {
go func(index int) {
for {
doUploadMock()
}
}(i)
}
t := time.NewTicker(time.Second)
for {
select {
case <-t.C:
// 計算並打印實時數據
}
}
}
這種場景就需要使用到Ticker,且上面的Example模型還能控制並發數量,也是非常實用的方式。
上面我們共提到了五種並發模式:
歸納下來其核心就是使用了Go的幾個知識點:Goroutine, Channel, Select, Time, Timer/Ticker, WaitGroup. 若是對這些不清楚,可以自行Google之。
另完整的Example 代碼可以參考這裡:https://github.com/jichangjun/golearn/blob/master/src/carlji.com/experiments/concurrency/main.go
使用方式: go run main.go <場景>
比如 :
這篇是Google官方推薦學習Go並發的資料,從初學者到進階,內容非常豐富,且權威。