This page looks best with JavaScript enabled

Concurrency in Go - II 第四章

 ·  ☕ 6 min read

Concurrency in go. 第四章 Concurrency Patterns in Go

這章節就是講一些常見的 Go concurrency pattern,有哪些方便使用以及如何安全的使用這些 pattern

confinement

要做到 concurrent safe 除了之前說的 channel 與 primitives 還有些方法

像是 Immutable data 是 concurrent safe 的,大家可用但是不能改,要不同的值就自己 create (像是 string)

而 confinement 有兩種 adhoc 與 lexical
adhoc 是類似大家約定好的寫程式規範,例如約定好只在某部分的 function 存取這個共同變數(即使在其他部分也可以存取這個變數),以這種方法來達到 concurrent safe
而開發者變多以後,不一定大家都會知道且遵守這規範,可能造成程式出錯

而 lexical confinement 是利用 compiler 及語言特性直接限制這種錯誤的發生,就如同只有 channel owner 可以傳資料,consumer 只會收到一個 read only 的 channel 這種方法
或著是如下面的 code

data := []byte("dataaatata")
go processData(&wg, data[:i])
go processData(&wg, data[i:])

把data切開,直接讓兩個 goroutine 不會互相影響
也可以盡量減少 goroutine 之間的溝通,大多數要互相溝通的程式寫起來會比較複雜一點

常見 pattern

for select

for { // 無窮迴圈或某個可iter的東西例如 []string{"x", "y", "z"}
    select {
    case <-done:
        // break or return
    case result <- data:
        fmt.Println("put something")
    default:
        // 做其他事
    } 
}

Goroutine leak

指的是我們不希望一直存在的 goroutine 一直存在,造成記憶體問題
例如

var ss chan string
go func() {
    for s := range ss {
        fmt.Println(s)
    }
}()

如果 ss 是 nil,或是根本沒人去關閉這個 chan
那這個 goroutine 就會一直存在直到 process 結束
程式碼中如果重複創造出這種東西到最後記憶體就用完了

要記得關閉或著是使用 for select + done chan 去提醒 goroutine 該結束了

go func(){
    // done is a <-chan inteface{}
    for {
        select { 
        case <-done:
            return
        case OOO: 
        }
    }
}()

The or-channel

p.94 很長一串,總之要記得把 orDone 傳下去,避免上層已經結束而下面的goroutine 都沒關掉

Error handling

p. 111
在一堆 concurrent processes 中,一個重要的問題是誰要處理 error
下面這段 code 是在 worker 裡面處理,但他最多也只能印出 err 讓人知道

worker := func(done <-chan interface{}, params) <-chan int {
    results := make(chan int)
    go func() {
        defer close(results)
        for {
            res, err := DoSomething(params)
            if err != nil {
                fmt.Println(err)
                continue
            }
            select {
            case <-done:
                return
            case results <- res:
            }
        }
    }()
    return results
}

如果像下面這樣將 error 傳回,接收這些 result 的 goroutine 也可以依據 error 來決定各個 worker 是否該繼續存活等等

type Result struct { 
    Error error
    Result int
}
worker := func(done <-chan interface{}, params) <-chan Result {
    results := make(chan Result)
    go func() {
        defer close(results)
        for {
            res, err := DoSomething(params)
            select {
            case <-done:
                return
            case results <- Result{Error: err, Result: res}:
            }
        }
    }()
    return results
}

就像是一般的 go function 會回傳 result + err,在一個對上下文不清楚,單純是負責執行任務的 goroutine 中,把 res + err 一起傳回去常常是很好的選擇 ,讓對上下文更清楚的 goroutine 去處理 error

Pipeline

一連串的 stage,輸入輸出資料,隨意組合
輸入與輸出同type,每個 stage 獨立易修改

// Batch processing
for _, i := range stage2(stage1(intSlice)) {
    fmt.Println(i)
}
// streaming
for _, i := range intSlice {
    fmt.Println(stage2(stage1(i)))
}

Best Practices for Constructing Pipelines

channel 很適合
上面那個 streaming 的例子是一個 element 拿出來,處理完所有 stage 之後才換下一個 element

p. 118 範例滿有趣的
使用 channel,每個 stage 各自跑起 goroutine,每個 element 經過一個 stage 之後會直接塞到 chan 中,讓下一個 stage 拿,自己也準備開始處理下一個 element

// example
func multiply(done <-chan interface{}, intStream <-chan int, mu int) <-chan int{
	outStream := make(chan int)
        go func(){
		defer close(outStream)
		for i := range intStream{
			select{
			case <-done:
				return
			case outStream <- i*mu:
			}
		}
	}()

	return outStream
}

// add 與 multiply 是 stage
// generator 是一個會一直有其他人塞資料進來的 chan,可能是去讀檔案或是網路收資料等等,generator 時常也會傳入同一個 done 一起控制開關
for v := range multiply(done, add(done, generator, 1), 2){
    fmt.Println(v)
}

Fan-out, Fan-in

p. 114
pipeline stage 中可能有某些 stage 要花很多時間來執行
那便會成為整條pipe的效能瓶頸
這時可以嘗試用多個 goroutine 來執行這個 stage,同時處理多個資料來加速
這就叫做 fan out
而多個 goroutine 處理完資料後,要將資料流到一個管道中,讓下一個 stage 可以從這個管道取出資料,則叫做 fan in
大概的使用方法如下,例子取自書中

// p. 116, fan-out
numFinders := runtime.NumCPU()
finders := make([]<-chan int, numFinders) 
for i := 0; i < numFinders; i++ {
        finders[i] = primeFinder(done, randIntStream)
    }

primeFinder 就是和之前那些例子一樣的一個 stage,接收一個 input channel 從裡面拉資料,會將結果傳回 return 的 channel,只是這個 stage 要花很多時間去計算

而 fan-in 的例子在 p. 117,主要就是會接收 n 個 input channel (上面那些 finders),並跑起 n 個 goroutine 來把這 n 個 input channel 的資料接到一個 output channel 上,還有用 wg 控制當 input chan 全關或 done 時,要把 output 關掉

The or-done-channel

p. 119
之前的 pipeline 例子上都是直接用 for 去 range 一個 channel
但實際上不是在使用 pipeline 時,被讀取的 channel 行為會無法預知,目前的 code 收到 done 時,不代表該 channel 會被關閉
用下面這種 code 可能導致 leak

for v := range channel {
   // do something
}

所以要像前面的預防 goroutine leak 那邊使用 for select 來處理
當目前的 code 收到 done 時,channel 就算沒關閉也可以跳出 range
orDone 主要是把展開很醜的東西包在 function 裡面,讓 code 乾淨點

for v := range orDone(done, channel) {
   // do something
}

以下截自 p. 120

orDone := func(done, c <-chan interface{}) <-chan interface{} {
    valStream := make(chan interface{})
    go func() {
        defer close(valStream)
        for {
            select {
            case <-done:
                return
            case v, ok := <-c:
                if ok == false {
                    return
                }
                // 如果直接 valStream <- v 而不用 select,收到 done 時也可能被 valStream 卡住
                select { 
                case valStream <- v: 
                case <-done:
                }
            }
        }
    }()
    return valStream
}

tee-channel

一個 chan 變兩個

bridge-channel

把 channel of channels 中每個 chan 拿出來並把東西都讀出

Queuing

https://www.cnblogs.com/276815076/p/8615500.html
p. 124
Go 裡面的 buffered channel 常常會被拿來當做 queue 使用
隨便增加 queue 可能會把一些問題藏起來,像是 dead lock, live lock

在 pipeline 中隨便加入 queue 通常不會增加整體的效能,只會改變一些些行為
例如下面的例子

a := A(done, inputStream) // 每個 element 花費1秒
b := B(done, a)           // 每個 element 花費3秒

B 要處理一個 element 的時間比較長,所以 A 把資料往 a 送時,可能會被 B 的讀取速度卡住
但就算在他們中間加了一個 buffer 也只是讓 A 可以快速的把 inputStream 處理完早早關閉,但總體執行時間還是被 B 限制住

書上舉了下面的例子

p := processRequest(done, acceptConnection(done, httpHandler))

如果是這種情況,我們就可能會想要在 accpet 到 process 之間加入 buffer,否則 client 的 request 們可能會慢慢出現 timeout, 被拒絕的情況
這邊我們要減少的是前一個 stage 被 block 的情況,而不是 process 的效能。讓這兩個 stage 的處理互相解耦

在這邊提到主要有兩個使用 queue 的理由

  1. 批次處理可以得到好處
  2. 某個 stage delay 或塞住會造成不好的 feedback loop

第一種滿常見的,譬如寫進 disk 之前我們會先希望在 memory 裡面盡量收集完資料再一次寫入,因為 disk 每次讀寫都很慢。又或著是需要存取遠端的資料庫,可能會想要收集多點 command 一次送去,減少來回消耗 等等等

第二種
pipeline 與其他系統之間可能會互相影響
舉個例子 web server 與 client
如果 web server 在接收 request 時沒有 buffer,必須要等 req 才能繼續接收下一個,可能造成 req timeout,導致 client 繼續送更多 req 而越來越糟。
先把 req queue 住,client 就以為只是處理 req 久了點而已

這種情況感覺比較難看出來,還需要對互動的系統有了解才行

總結 queue 可以放的地方也是兩個

  1. batching 有好處的地方
  2. pipeline 開頭

可以應用的情境不多,沒必要時也不必用,不然就多一個東西要管

書上還介紹了 Little’s law 但實在有點看不懂到底是怎麼用XD

The context Package

可以去看他的 interface,滿簡潔的

ctx 主要功能:

  1. 提供 API 去從 parent ctx 的位置去取消那些相關的 call,類似之前所說的 done
  2. 裝 request 相關資料到處傳

取消功能:

  1. 上層想取消目前的動作
  2. 目前的動作想取消他所產生出的子動作
  3. blocking 的動作需要可以被取消動作搶奪,避免 leak
ctx := context.Background()
ctx2, c := context.WithCancel(ctx) // create new instance
c()
fmt.Println(ctx.Err())   //  nil
fmt.Println(ctx2.Err())  //  context canceled

因為是 create new instance 所以把 ctx 往下傳之後,下層無法呼叫 cancel 來取消上層(保護)

原本的 done 方法也可以做到,但就要把 done 包來包去

傳值的部分

userID := "qq"
ctx := context.WithValue(context.Background(), "userID", userID)
fmt.Println(ctx.Value("userID"))

key 必須是可比較的 == != 之類的
value 必須是 concurrent safe

key如果都使用 string,你又把 ctx 傳去別人的 package,有可能會撞
推薦的方法是自定義 unexported type 例如:

// p. 143
type ctxKey int
const (
    ctxUserID ctxKey = iota ctxAuthToken
)

但要提供 func 去從 ctx 抓資料,完整範例在 p. 143
不過 ctx Value 好像也被很多人覺得很不安全,畢竟都是 interface{}
而且官方也建議 ctx 只放 request-scoped 的東西,但這東西也滿難定義的

感覺還是少用(?

Share on

Marko Peng
WRITTEN BY
Marko Peng
Good man