文章目录
- 一、使用方法
- 1.1 http 示例
- 1.1.1 retry.Do
- 1.1.2 retry.DoWithData
- 1.1.3 OnRetry
- 1.1.4 根据 error 的类型,决定 delay 的时长
- 1.1.5 自定义 retry function
- 二、API
- 2.1 Do 执行
- 2.1.1 Do
- 2.1.2 DoWithData
- 2.2 Delay 策略
- 2.3 错误处理
- 2.3.1 Unwrap
- 2.3.2 UnwrappedErrors
- 2.4 type config
- 2.5 Option,如 OnRetry
- 2.5.1 OnRetry
- 2.5.2 RetryIf
- 2.6 Context
- 2.7 Delay
- 三、实现
- 3.1 Config 和 Option
- 3.1.1 Delay 实现
- 3.1.1.1 BackOffDelay
- 3.1.1.2 CombineDelay
- 3.1.2 Retry 策略
- 3.1.2.1 OnRetry
- 3.2 主流程 DoWithData
- 3.2.1 Do 封装函数
在分布式情况下,因为网络或服务不稳定,经常需要 retry。golang 下有 retry-go 库,封装了常见的重试配置,很方便扩展。项目地址是 https://github.com/avast/retry-go。
一、使用方法
1.1 http 示例
1.1.1 retry.Do
package mainimport ("fmt""github.com/avast/retry-go""io""net/http"
)func main() {urls := []string{"http://example.com", // url 真实存在"http://not-exist.com", // url 不存在}for _, url := range urls {f(url)}
}func f(url string) {fmt.Println("开始处理: ", url)var body []byteerr := retry.Do(func() error {resp, err := http.Get(url)if err != nil {return err}defer resp.Body.Close()body, err = io.ReadAll(resp.Body)if err != nil {return err}return nil})if err != nil {panic(err)}_ = bodyfmt.Println("处理成功")
}// go run main.go
开始处理: http://example.com
处理成功开始处理: http://not-exist.com
panic: All attempts fail:
#1: Get "http://not-exist.com": dial tcp: lookup not-exist.com: no such host
#2: Get "http://not-exist.com": dial tcp: lookup not-exist.com: no such host
#3: Get "http://not-exist.com": dial tcp: lookup not-exist.com: no such host
#4: Get "http://not-exist.com": dial tcp: lookup not-exist.com: no such host
#5: Get "http://not-exist.com": dial tcp: lookup not-exist.com: no such host
#6: Get "http://not-exist.com": dial tcp: lookup not-exist.com: no such host
#7: Get "http://not-exist.com": dial tcp: lookup not-exist.com: no such host
#8: Get "http://not-exist.com": dial tcp: lookup not-exist.com: no such host
#9: Get "http://not-exist.com": dial tcp: lookup not-exist.com: no such host
#10: Get "http://not-exist.com": dial tcp: lookup not-exist.com: no such host
1.1.2 retry.DoWithData
package mainimport ("github.com/avast/retry-go/v4""github.com/sirupsen/logrus""io""net/http"
)func main() {urls := []string{"http://example.com", // url 真实存在"http://not-exist.com", // url 不存在}for _, url := range urls {//fDo(url)fDoWithData(url)}
}func fDo(url string) {logrus.Infoln("开始: ", url)err := retry.Do(func() error {resp, err := http.Get(url)if err != nil {return err}defer resp.Body.Close()body, err := io.ReadAll(resp.Body)if err != nil {return err}_ = bodyreturn nil})if err != nil {logrus.Errorln("失败: ", err)return}logrus.Infoln("成功")
}func fDoWithData(url string) {logrus.Infoln("开始: ", url)body, err := retry.DoWithData(func() ([]byte, error) {resp, err := http.Get(url)if err != nil {return nil, err}defer resp.Body.Close()body, err := io.ReadAll(resp.Body)if err != nil {return nil, err}return body, nil},)if err != nil {logrus.Errorln("失败: ", err)return}_ = bodylogrus.Infoln("成功")
}// go run main.go
INFO[0000] 开始: http://example.com
INFO[0000] 成功
INFO[0000] 开始: http://not-exist.com
ERRO[0052] 失败: All attempts fail:
#1: Get "http://not-exist.com": dial tcp: lookup not-exist.com: no such host
#2: Get "http://not-exist.com": dial tcp: lookup not-exist.com: no such host
#3: Get "http://not-exist.com": dial tcp: lookup not-exist.com: no such host
#4: Get "http://not-exist.com": dial tcp: lookup not-exist.com: no such host
#5: Get "http://not-exist.com": dial tcp: lookup not-exist.com: no such host
#6: Get "http://not-exist.com": dial tcp: lookup not-exist.com: no such host
#7: Get "http://not-exist.com": dial tcp: lookup not-exist.com: no such host
#8: Get "http://not-exist.com": dial tcp: lookup not-exist.com: no such host
#9: Get "http://not-exist.com": dial tcp: lookup not-exist.com: no such host
#10: Get "http://not-exist.com": dial tcp: lookup not-exist.com: no such host
1.1.3 OnRetry
设置当 retry 时的回调函数
package retry_testimport ("fmt""net/http""net/http/httptest""testing""github.com/avast/retry-go/v4""github.com/stretchr/testify/assert"
)// TestErrorHistory shows an example of how to get all the previous errors when
// retry.Do ends in success
func TestErrorHistory(t *testing.T) {attempts := 3 // server succeeds after 3 attemptsts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {if attempts > 0 {attempts--w.WriteHeader(http.StatusBadGateway)return}w.WriteHeader(http.StatusOK)}))defer ts.Close()var allErrors []errorerr := retry.Do(func() error {resp, err := http.Get(ts.URL)if err != nil {return err}defer resp.Body.Close()if resp.StatusCode != 200 {return fmt.Errorf("failed HTTP - %d", resp.StatusCode)}return nil},retry.OnRetry(func(n uint, err error) {allErrors = append(allErrors, err)}),)assert.NoError(t, err)assert.Len(t, allErrors, 3)
}
1.1.4 根据 error 的类型,决定 delay 的时长
// This test delay is based on kind of error
// e.g. HTTP response [Retry-After](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After)
package retry_testimport ("fmt""io/ioutil""net/http""net/http/httptest""testing""time""github.com/avast/retry-go/v4""github.com/stretchr/testify/assert"
)// 定义第一种 error
type RetryAfterError struct {response http.Response
}
func (err RetryAfterError) Error() string {return fmt.Sprintf("Request to %s fail %s (%d)",err.response.Request.RequestURI,err.response.Status,err.response.StatusCode,)
}// 定义第二种 error
type SomeOtherError struct {err stringretryAfter time.Duration
}
func (err SomeOtherError) Error() string {return err.err
}func TestCustomRetryFunctionBasedOnKindOfError(t *testing.T) {ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {fmt.Fprintln(w, "hello")}))defer ts.Close()var body []byteerr := retry.Do(func() error {resp, err := http.Get(ts.URL)if err == nil {defer func() {if err := resp.Body.Close(); err != nil {panic(err)}}()body, err = ioutil.ReadAll(resp.Body)}return err},retry.DelayType(func(n uint, err error, config *retry.Config) time.Duration { // 设置 retry 的一个 Optionswitch e := err.(type) { // 判断调用函数的返回值 errorcase RetryAfterError: // 如果是此 error 的话,则从 Header 中解析出一个 time.Time, 并计算 now() 与 其的 time.Durationif t, err := parseRetryAfter(e.response.Header.Get("Retry-After")); err == nil {return time.Until(t)}case SomeOtherError: // 因为此 error 已包含了一个 time.Duration,则直接使用return e.retryAfter}// 若未匹配 error,则使用默认的 backoffdelay 策略return retry.BackOffDelay(n, err, config)}),)assert.NoError(t, err)assert.NotEmpty(t, body)
}// use https://github.com/aereal/go-httpretryafter instead
func parseRetryAfter(_ string) (time.Time, error) {return time.Now().Add(1 * time.Second), nil
}
1.1.5 自定义 retry function
package retry_testimport ("fmt""io/ioutil""net/http""net/http/httptest""strconv""testing""time""github.com/avast/retry-go/v4""github.com/stretchr/testify/assert"
)// RetriableError is a custom error that contains a positive duration for the next retry
type RetriableError struct {Err errorRetryAfter time.Duration
}// Error returns error message and a Retry-After duration
func (e *RetriableError) Error() string {return fmt.Sprintf("%s (retry after %v)", e.Err.Error(), e.RetryAfter)
}var _ error = (*RetriableError)(nil)// TestCustomRetryFunction shows how to use a custom retry function
func TestCustomRetryFunction(t *testing.T) {// 首先定义 server 的行为attempts := 5 // server succeeds after 5 attemptsts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {if attempts > 0 {// 前 5 次会返回 HTTP 429 通知 client 1 秒后重试,第 6 次返回 HTTP 200// inform the client to retry after one second using standard// HTTP 429 status code with Retry-After header in secondsw.Header().Add("Retry-After", "1")w.WriteHeader(http.StatusTooManyRequests)w.Write([]byte("Server limit reached"))attempts--return}w.WriteHeader(http.StatusOK)w.Write([]byte("hello"))}))defer ts.Close()var body []byte// 其次定义 client 的行为,通过 retry.Do() 封装 http 调用err := retry.Do(// 执行的函数会返回 errorfunc() error {resp, err := http.Get(ts.URL)if err == nil {defer func() {if err := resp.Body.Close(); err != nil {panic(err)}}()body, err = ioutil.ReadAll(resp.Body)if resp.StatusCode != 200 {err = fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body))if resp.StatusCode == http.StatusTooManyRequests {// check Retry-After header if it contains seconds to wait for the next retry// 解析 Retry-After header,其中包含了下次重试的秒数if retryAfter, e := strconv.ParseInt(resp.Header.Get("Retry-After"), 10, 32); e == nil {// the server returns 0 to inform that the operation cannot be retriedif retryAfter <= 0 {return retry.Unrecoverable(err)}return &RetriableError{Err: err,RetryAfter: time.Duration(retryAfter) * time.Second, // 使用从 server 返回的 Retry-After header 中解析的秒数,作为下次重试的间隔}}// A real implementation should also try to http.Parse the retryAfter response header// to conform with HTTP specification. Herein we know here that we return only seconds.}}}return err},// 执行完 func() error 后会得到 err,会执行 retry.DelayType(err)// 如果 err 是 *RetriableError 类型,会返回 e.RetryAfter 的 time.Duration// 否则如果 err 不是 *RetriableError 类型,会返回默认的 retry.BackOffDelay(n, err, config)retry.DelayType(func(n uint, err error, config *retry.Config) time.Duration {fmt.Println("Server fails with: " + err.Error())if retriable, ok := err.(*RetriableError); ok {fmt.Printf("Client follows server recommendation to retry after %v\n", retriable.RetryAfter)return retriable.RetryAfter}// apply a default exponential back off strategyreturn retry.BackOffDelay(n, err, config)}),)fmt.Println("Server responds with: " + string(body))assert.NoError(t, err)assert.Equal(t, "hello", string(body))
}// go run
=== RUN TestCustomRetryFunction
Server fails with: HTTP 429: Server limit reached (retry after 1s)
Client follows server recommendation to retry after 1s
Server fails with: HTTP 429: Server limit reached (retry after 1s)
Client follows server recommendation to retry after 1s
Server fails with: HTTP 429: Server limit reached (retry after 1s)
Client follows server recommendation to retry after 1s
Server fails with: HTTP 429: Server limit reached (retry after 1s)
Client follows server recommendation to retry after 1s
Server fails with: HTTP 429: Server limit reached (retry after 1s)
Client follows server recommendation to retry after 1s
Server responds with: hello
--- PASS: TestCustomRetryFunction (5.01s)
PASS
ok github.com/avast/retry-go/v4/examples 5.473s
单测其实就是最好的 readme,可以看出使用方法的。
二、API
2.1 Do 执行
2.1.1 Do
// 没有返回体
func Do(retryableFunc RetryableFunc, opts ...Option) error
2.1.2 DoWithData
// 返回数据, retryableFunc 回调函数返回 (T, error), 整个外层函数返回的也是 (T, error)
func DoWithData[T any](retryableFunc RetryableFuncWithData[T], opts ...Option) (T, error) {// 使用示例
func TestDoWithDataFirstOk(t *testing.T) {returnVal := 1var retrySum uint// 这种使用形式是简写了类型 T = int,也可以写为 DoWithData[int]();val, err := DoWithData(func() (int, error) { return returnVal, nil }, // 函数的第一个返回值是 intOnRetry(func(n uint, err error) { retrySum += n }),)assert.NoError(t, err)assert.Equal(t, returnVal, val)assert.Equal(t, uint(0), retrySum, "no retry")
}
2.2 Delay 策略
// 定义回调函数类型:根据返回的 err, 和失败的 n 次,可以解析得到对应的 time.Duration
type DelayTypeFunc func(n uint, err error, config *Config) time.Duration// 如下实现:
// BackOffDelay is a DelayType which increases delay between consecutive retries
func BackOffDelay(n uint, _ error, config *Config) time.Duration// 固定的延迟间隔
func FixedDelay(_ uint, _ error, config *Config) time.Duration// 随机
func RandomDelay(_ uint, _ error, config *Config) time.Duration// 聚合多种 DelayTypeFunc 为一种
func CombineDelay(delays ...DelayTypeFunc) DelayTypeFunc
2.3 错误处理
// 定义了内部的 error struct// 通过 error.Is() 判断类型是否匹配
func IsRecoverable(err error) boolfunc Unrecoverable(err error) error
2.3.1 Unwrap
// 解封装 error
func (e Error) Unwrap() errorUnwrap the last error for compatibility with `errors.Unwrap()`. When you need to
unwrap all errors, you should use `WrappedErrors()` instead.err := Do(func() error {return errors.New("original error")},Attempts(1),)fmt.Println(errors.Unwrap(err)) # "original error" is printedAdded in version 4.2.0.
2.3.2 UnwrappedErrors
func (e Error) WrappedErrors() []errorWrappedErrors returns the list of errors that this Error is wrapping. It is an
implementation of the `errwrap.Wrapper` interface in package
[errwrap](https://github.com/hashicorp/errwrap) so that `retry.Error` can be
used with that library.
2.4 type config
type Config struct {
}
2.5 Option,如 OnRetry
type Option func(*Config)// n 是重试次数
type OnRetryFunc func(n uint, err error)// Attempts set count of retry. Setting to 0 will retry until the retried function succeeds. default is 10
func Attempts(attempts uint) Option// 设置 仅某种 err 重试 attempts 次
func AttemptsForError(attempts uint, err error) Option
2.5.1 OnRetry
func OnRetry(onRetry OnRetryFunc) OptionOnRetry function callback are called each retrylog each retry example:retry.Do(func() error {return errors.New("some error")},retry.OnRetry(func(n uint, err error) {log.Printf("#%d: %s\n", n, err)}),)
2.5.2 RetryIf
func RetryIf(retryIf RetryIfFunc) OptionRetryIf controls whether a retry should be attempted after an error (assuming
there are any retry attempts remaining)skip retry if special error example:retry.Do(func() error {return errors.New("special error")},retry.RetryIf(func(err error) bool {if err.Error() == "special error" {return false}return true}),)By default RetryIf stops execution if the error is wrapped using
`retry.Unrecoverable`, so above example may also be shortened to:retry.Do(func() error {return retry.Unrecoverable(errors.New("special error"))})
2.6 Context
func Context(ctx context.Context) OptionContext allow to set context of retry default are Background contextexample of immediately cancellation (maybe it isn't the best example, but it
describes behavior enough; I hope)ctx, cancel := context.WithCancel(context.Background())cancel()retry.Do(func() error {...},retry.Context(ctx),)
2.7 Delay
// Delay set delay between retry default is 100ms
func Delay(delay time.Duration) Option// DelayType set type of the delay between retries default is BackOff
func DelayType(delayType DelayTypeFunc) Option// 内部虽然记录了 error 数组,但外层只返回记录的最后一个
func LastErrorOnly(lastErrorOnly bool) Option// MaxDelay set maximum delay between retry does not apply by default
func MaxDelay(maxDelay time.Duration) Option// Jitter 是抖动的意思,其实是 RandomDelay 策略下,随机的抖动
func MaxJitter(maxJitter time.Duration) Option
三、实现
3.1 Config 和 Option
核心是定义了 Config struct
type Config struct {attempts uint // 重试几次attemptsForError map[error]uint // 各错误重试几次delay time.Duration // 延迟多久maxDelay time.Duration // 最多延迟多久的阈值maxJitter time.Duration // todo 抖动是什么onRetry OnRetryFunc // retry 时做什么retryIf RetryIfFunc // 什么时机 retrydelayType DelayTypeFunc // todo 有什么用lastErrorOnly bool // 只记录最后的 errorcontext context.Context // 上下文timer Timer // todo 貌似只有单测使用wrapContextErrorWithLastError bool // todo 有什么用maxBackOffN uint // 最多 backoff n 次
}
然后在 Do() 或 DoWithData() 函数内会使用 opts
func DoWithData[T any](retryableFunc RetryableFuncWithData[T], opts ...Option) (T, error) {// default 首先创建默认 structconfig := newDefaultRetryConfig()// apply opts,然后使用 opt 函数,逐个修改公共的 *Config 变量for _, opt := range opts {opt(config)}
}// 默认 struct 如下
func newDefaultRetryConfig() *Config {return &Config{attempts: uint(10),attemptsForError: make(map[error]uint),delay: 100 * time.Millisecond,maxJitter: 100 * time.Millisecond,onRetry: func(n uint, err error) {},retryIf: IsRecoverable,delayType: CombineDelay(BackOffDelay, RandomDelay), // 取 delay 之和lastErrorOnly: false,context: context.Background(),timer: &timerImpl{},}
}
默认实现了,很多 Option 的闭包封装,如下:
// 一种实现, 什么也不做
func emptyOption(c *Config) {// 空函数体实现
}// return the direct last error that came from the retried function
// default is false (return wrapped errors with everything)
// 闭包包围了 Option, 并返回了 Option
// 外层函数传入的 lastErrorOnly 被内层闭包函数捕获
func LastErrorOnly(lastErrorOnly bool) Option {return func(c *Config) {c.lastErrorOnly = lastErrorOnly}
}// Attempts set count of retry. Setting to 0 will retry until the retried function succeeds.
// default is 10
func Attempts(attempts uint) Option {return func(c *Config) {c.attempts = attempts}
}
3.1.1 Delay 实现
3.1.1.1 BackOffDelay
// BackOffDelay is a DelayType which increases delay between consecutive retries
func BackOffDelay(n uint, _ error, config *Config) time.Duration {// 1 << 63 would overflow signed int64 (time.Duration), thus 62.const max uint = 62// 首先 maxBackOffN 默认初始值就是 0if config.maxBackOffN == 0 {if config.delay <= 0 {config.delay = 1}// 在此处会设置其值,上限为 62,因为 time.Duration 是有符号的 int,最多左移 62 位还能保证是正数config.maxBackOffN = max - uint(math.Floor(math.Log2(float64(config.delay))))}if n > config.maxBackOffN { // 实际操作的值 n,是有上限的n = config.maxBackOffN}return config.delay << n // 执行左移
}
3.1.1.2 CombineDelay
// CombineDelay is a DelayType the combines all of the specified delays into a new DelayTypeFunc
func CombineDelay(delays ...DelayTypeFunc) DelayTypeFunc {const maxInt64 = uint64(math.MaxInt64)return func(n uint, err error, config *Config) time.Duration {var total uint64for _, delay := range delays {total += uint64(delay(n, err, config)) // 对每个 delay() 函数求值,并求和if total > maxInt64 {total = maxInt64}}return time.Duration(total)}
}
3.1.2 Retry 策略
3.1.2.1 OnRetry
在 DoWithData() 函数中会被调用
// OnRetry function callback are called each retry
//
// log each retry example:
//
// retry.Do(
// func() error {
// return errors.New("some error")
// },
// retry.OnRetry(func(n uint, err error) {
// log.Printf("#%d: %s\n", n, err)
// }),
// )
func OnRetry(onRetry OnRetryFunc) Option {if onRetry == nil {return emptyOption}return func(c *Config) {c.onRetry = onRetry}
}
3.2 主流程 DoWithData
DoWithData 是一个泛型函数,指定的泛型 T,会被返回
func DoWithData[T any](retryableFunc RetryableFuncWithData[T], opts ...Option) (T, error) {var n uintvar emptyT T// defaultconfig := newDefaultRetryConfig()// apply optsfor _, opt := range opts {opt(config)}if err := config.context.Err(); err != nil {return emptyT, err}// Setting attempts to 0 means we'll retry until we succeedvar lastErr errorif config.attempts == 0 {for {t, err := retryableFunc()if err == nil {return t, nil}if !IsRecoverable(err) {return emptyT, err}if !config.retryIf(err) {return emptyT, err}lastErr = errn++config.onRetry(n, err)select {case <-config.timer.After(delay(config, n, err)):case <-config.context.Done():if config.wrapContextErrorWithLastError {return emptyT, Error{config.context.Err(), lastErr}}return emptyT, config.context.Err()}}}// 主流程开始errorLog := Error{} // 这是一个数组, 记录了所有的错误// 因为后续会修改 attempts 值, 所以这里先拷贝一份, 后续使用拷贝的那一份attemptsForError := make(map[error]uint, len(config.attemptsForError))for err, attempts := range config.attemptsForError {attemptsForError[err] = attempts}shouldRetry := true // 当超出重试次数时, 会退出循环for shouldRetry {// 执行用户传入的主流程函数, 我们要重试的就是他t, err := retryableFunc()// 如果执行成功了, 直接返回, 不需要再重试了if err == nil {return t, nil}// 追加 errorerrorLog = append(errorLog, unpackUnrecoverable(err))// 用户可以自定义回调函数, 即根据返回的 err 判断是否需要重试if !config.retryIf(err) {break}// 当重试时, 需要执行的回调函数, 用户可以自定义config.onRetry(n, err)// 用户可以设置某种 err 需要重试几次. 此处会判断返回的 err 并减少需要重试的次数for errToCheck, attempts := range attemptsForError {if errors.Is(err, errToCheck) {attempts--attemptsForError[errToCheck] = attemptsshouldRetry = shouldRetry && attempts > 0}}// 既然最后一次 retryableFunc() 已经执行完了, 那就不需要再等待了// if this is last attempt - don't waitif n == config.attempts-1 {break}select {case <-config.timer.After(delay(config, n, err)): // 等待一段时间后再重试case <-config.context.Done(): // 如果用户把 context Done() 了, 则退出即可. 通常原因是用户主动 ctx.Cancel() 或者 ctx.Timeout() 自己到达了if config.lastErrorOnly {return emptyT, config.context.Err()}return emptyT, append(errorLog, config.context.Err())}n++shouldRetry = shouldRetry && n < config.attempts // 总的 attempts 次数也会控制是否需要重试}if config.lastErrorOnly {return emptyT, errorLog.Unwrap() // 这个 errorLog 其实是一个数组, Unwrap() 其实就是返回数组的最后一项}return emptyT, errorLog
}
3.2.1 Do 封装函数
其实 Do() 就是把 DoWithData() 的 retryableFuncWithData() 封装了一层
func Do(retryableFunc RetryableFunc, opts ...Option) error {retryableFuncWithData := func() (any, error) {// 执行 retryableFunc() 会返回 error// 再封装一个 any, 连同 error 一起返回return nil, retryableFunc()}_, err := DoWithData(retryableFuncWithData, opts...)return err
}