
uber 开源的限流算法,代码不多,尝试理解但是没怎么理解。有没有大佬指点一二.主要不理解的点在于这个 for 循环
// Take blocks to ensure that the time spent between multiple // Take calls is on average time.Second/rate. func (t *limiter) Take() time.Time { newState := state{} taken := false for !taken { now := t.clock.Now() previousStatePointer := atomic.LoadPointer(&t.state) oldState := (*state)(previousStatePointer) newState = state{} newState.last = now // If this is our first request, then we allow it. if oldState.last.IsZero() { taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState)) continue } // sleepFor calculates how much time we should sleep based on // the perRequest budget and how long the last request took. // Since the request may take longer than the budget, this number // can get negative, and is summed across requests. newState.sleepFor += t.perRequest - now.Sub(oldState.last) // We shouldn't allow sleepFor to get too negative, since it would mean that // a service that slowed down a lot for a short period of time would get // a much higher RPS following that. if newState.sleepFor < t.maxSlack { newState.sleepFor = t.maxSlack } if newState.sleepFor > 0 { newState.last = newState.last.Add(newState.sleepFor) } taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState)) } t.clock.Sleep(newState.sleepFor) return newState.last } 1 CRVV 2020-12-08 20:47:43 +08:00 var counter int64 然后如果有多个线程执行下面这个循环,每次 break 都会给 counter 加一 for { counter0 := atomic.LoadInt64(&counter) if atomic.CompareAndSwap(&counter, counter0, counter+1) { break } } 这种就是所谓的 lock-free,把它写成用 lock 的形式,再把那几个 if 去掉,这个函数其实就是 func (t *limiter) Take() time.Time { mutex.Lock() oldState := t.state newState = state{} now := t.clock.Now() newState.last = now newState.sleepFor += t.perRequest - now.Sub(oldState.last) t.state = &newState mutex.Unlock() t.clock.Sleep(newState.sleepFor) return newState.last } 然后应该很好懂了 |