This page looks best with JavaScript enabled

Concurrency in Go III 第五章 Concurrency at Scale

 ·  ☕ 5 min read

筆記都沒啥整理R

Error Propagation

// 好像跟 concurrency 比較無關(?

寫 concurrent code 的時候,debug 會比平常難很多,如果互相傳遞資訊時都附上 error 會比較好

Error需要包含一些東西:

  1. 發生的事情
    例如硬碟滿了、網路斷線等等。在傳遞途中我們也可以自己包些資訊

  2. 時間與地點
    要記錄發生的時間
    要包含完整的 stack trace
    還有上下文(這個不太有想像),例如分散式系統,要知道是哪個機器出錯

  3. 顯示用訊息
    給使用者看的,不包含 stack trace 這類太詳細的資訊
    類似前面兩點的精簡版

  4. error id
    讓看到顯示用訊息的使用者可以找到詳細資訊

這些資訊大多都要我們自己包,一般的 error 無

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func High(param string) error {
    result, err := Low.Do()
    if err != nil {
         // 檢查資訊是否有包完整,基本上都要包,逃過檢查的就是沒包好
        if _, ok := err.(Low.Error); ok { 
            err = WrapErr(err, "cannot do High id:%v", param) 
            // 會不會使用 defer 包比較好啊? 就不用在每個 func call 都包一次,dup code
        }
        return err
    }
    // ...
}

通常只在 module boundaries(像是public function/method) WrapErr
在最上層時,如果發現不是自製的 error,就輸出 default 訊息,如果是的話就直接輸出 err.Message 即可。但兩個都要記得輸出 error id

以下是課本上 p. 151 舉的例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
type MyError struct {
    Inner error 
    Message string
    StackTrace string
    Misc map[string]interface{}
}

func wrapError(err error, messagef string, msgArgs ...interface{}) MyError {
    return MyError{
        Inner: err, // 完整的底層資訊
        Message: fmt.Sprintf(messagef, msgArgs...), // 顯示用,會把下層的 err.Message 藏起來
        StackTrace: string(debug.Stack()), 
        Misc: make(map[string]interface{}), // 雜亂資訊放這
    }
}

Timeouts and Cancellation

為何想要 timeout?
很邊緣的 req 可能會想要讓他 timeout 而不是讓他等超久
何時適合 timeout?

  1. to後不會一直重複來(造成negative feedback loop)
  2. 沒有資源去儲存此 req,像是 queue 滿了之類
  3. req 會過時,等很久換到它時候,已經處理也沒意義
  4. 預防 deadlock,死結也時常會發生在程式執行一段時間後,測試時不一定抓得到。死結時通常只能重啟機器,換成 timeout 雖然有可能也是活結,但至少發現後可修,不是直接爆炸

何時想取消?

  1. 上面那些情況
  2. 使用者想取消
  3. parent 如果停止了,該 concurrent op 也要取消
  4. Replicated requests,某些情況下(p. 159)為了效能,可能會收到兩個一樣的 req,一個完成後,其他取消

簡單範例
G -> A -> B
當 A 沒效率時可能會開啟 A2
如此 B 可能收到重複的

一個 op 取消時,其餘的相關 op 該怎麼辦(正在執行的東西或其他子op等等等)
一個可能會被取消的 op 底下會呼叫到的子 op 都必須檢查,這些 op 如果執行時間超出取消後可以等待的範圍,那就必須把這些 op 做成 preemptable

簡單處理 Replicated requests 方式就是當子 goroutine 回報結果後,要把往另一個重複的 req 送取消指令(但這就需要雙向溝通)
或是不處理,或要求 parent permission(A要送給B時問G同意)

timeout和取消是程式完成後很難加入的,要一開始就考慮

Heartbeat

讓別人知道你活著,比較常見的是一個 server 開一個 endpoint 讓別人知道 server 掛了沒
這邊的 heartbeat 是一個 goroutine 要回傳一個 heartbeat chan,定時傳東西出去表示自己沒掛
直接把 p. 162 的例子摳逼上來

doWork := func(
		done <-chan interface{},
		pulseInterval time.Duration,
	) (<-chan interface{}, <-chan time.Time) {
		heartbeat := make(chan interface{})
		results := make(chan time.Time)
		go func() {
			defer close(heartbeat)
			defer close(results)
			pulse := time.Tick(pulseInterval)
			workGen := time.Tick(2 * pulseInterval)
			sendPulse := func() {
				select {
				case heartbeat <- struct{}{}:
				default:
				}
			}
			sendResult := func(r time.Time) {
				for {
					select {
					case <-done:
						return
					case <-pulse:
						sendPulse()
					case results <- r:
						return
					}
				}
			}
			for {
				select {
				case <-done:
					return
				case <-pulse:
					sendPulse()
				case r := <-workGen:
					sendResult(r)
				}
			}
		}()
		return heartbeat, results
	}

要注意的是傳東西去 chan 都可能會被卡住包括 heartbeat,所以都使用 select 預防,且送 result 中間可能有多個心跳,所以要用 for 包住 (常常忘記)

使用上可以在 for 裡面包 select heartbeat, result, time.After(timeout) 這幾個 case,如果太久沒聽到 heartbeat 或 result 就掛掉

Replicated Requests

就是同時會有多個處理 request 的單元(機器、process、goroutine 都有可能),丟一樣的 req 給他們,拿最快的回應來使用

Rate limiting

https://www.alexedwards.net/blog/how-to-rate-limit-http-requests
預防 rate 太高造成系統壞掉。
可能是攻擊,或是系統設計上本來就沒考慮到高 rate 的情況
或是之前說的 negative feedback loop 等等等
甚至對不同使用者(例如有無付費)設定不同的限制

可以先將系統能接受的 request 數量限制在自己已知可掌控的範圍內

不只服務提供方可以設置 rate limit,如果客戶使用需要付費的服務,那幫自己設 rate limit 也合理

常用的方法叫做 token bucket,要使用資源就必須持有 token,要去桶子裡拿。桶子裡最多可以有 d 個 token,每秒會補充 r 個 token 回去桶子。

golang.org/x/time/rate package 實作了 token bucket

func NewLimiter(r Limit, b int) *Limiter // 還有一些 function 可以幫忙產生r,像是 Every(duration) 可以自動轉成 r

func (lim *Limiter) Wait(ctx context.Context) (err error)// 等於 lim.WaitN(ctx, 1),

func (lim *Limiter) WaitN(ctx context.Context, n int)  // block 直到擁有 n 個 token,會去檢查 ctx 的死線和是否取消了


使用範例
lim := rate.NewLimiter(rate.Limit(1), 1)
func XXX(ctx context.Context) error {
   if err != lim.Wait(ctx); err != nil{
      return err 
   }

}

也可以把一堆 limiter 包起來,用來做一個有各種不同限制的 limiter,例如把 disk limiter、network limiter、 second limiter(每秒限制)、minute limiter(每分鐘限制)
在用 for 迴圈去跑需要啟動的 limiter

範例,書上還有一些優化,不過大致概念是這樣

// 讓 MultiLimiter 接收 interface,如此一來不止可以接收原生 Go limiter,也可以接收 multiLimiter,包好幾層
type RateLimiter interface{
    Wait(context.Context) error
}

type multiLimiter struct { 
    limiters []RateLimiter
}

func MultiLimiter(limiters ...RateLimiter) *multiLimiter{
    return &multiLimiter{limiters: limiters}
}

func (lim *multiLimiter) Wait(ctx context.Context) error{
    for _, l := range lim.limiters {
        if err := l.Wait(ctx); err != nil{
            return err
        }
    }
    return nil
}

使用上可以在不同的 func 裡面去戳自己想要用的 limiter
type XXX struct {
    ALimiter, 
    BLimiter, 
    CLimiter, 
}
func (x *XXX) DoSomething (ctx context.Context) error {
    err := MultiLimiter(x.ALimiter, x.BLimiter).Wait(ctx)
    .....
}

詳細請參考 p.183

補一些書上沒有的用法

func (lim *Limiter) Allow() bool  // 等於 AllowN(time.Now(), 1)
func (lim *Limiter) AllowN(now time.Time, n int) bool // 回傳 now 的時候桶子裡面有沒有 n 個 token,如果有會回傳 true 並消耗掉


func (lim *Limiter) Reserve() *Reservation // ReserveN(time.Now(), 1)
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation 
預約 n 個 token,會先把 limiter 裡面的 token 先卡住
r.OK() // 有沒有可能等到 n 個token
r.Delay() // 多久才可以得到 n 個 
r.Cancel() // 取消預約

// package 註解的用法
//   r := lim.ReserveN(time.Now(), 1)
//   if !r.OK() {
//     // Not allowed to act! Did you remember to set lim.burst to be > 0 ?
//     return
//   }
//   time.Sleep(r.Delay())
//   Act()


func (lim *Limiter) SetLimit(newLimit Limit)
func (lim *Limiter) SetBurst(newBurst int) 

Healing Unhealthy Goroutines

會活很久的 process 中通常也會有一些一直都活著的 goroutine,等待資料來,處理,回應。總之他們很有可能會掛掉(? 需要一些方法來復活他們

這章節的 code 也太長了ㄅ
基本上就是要跑起另一個 goroutine 去偵測目標的 heartbeat,當不正常時,就傳 done 給目標,然後重啟一個

Summary

這章主要是在說要怎麼讓系統穩定點
滿多主題的觀念也不只限定在 Go 的 concurrency 上

Share on

Marko Peng
WRITTEN BY
Marko Peng
Good man