简介
学过java SpringBoot的同学应该知道,有一个非常好用的类DeferredResult,他可以设置一个超时时间,如果在超时时间内有结果,那么返回结果,如果延期了,返回预期设置的结果
说到这功能就会想到基于长轮训实现的实时消息推送系统,在很多基于spring的框架都用到了该功能在管理程序去做一个monitor,监视某些数据的变化,如果监视的数据有变动,那么直接返回给请求者,如果没有那么告诉请求者一个状态码,让对方立刻重新请求
典型的例子有配置中心Apollo
实现
首先DeferredResult应该有这么几个关键属性: 超时时间 timeout,超时后的结果 timeoutResult,正常结果result,有了正常结果后应该做啥操作 resultHandler
代码实现如下
type DeferredResult struct {timeout time.Duration // 超时时间timeoutResult any // 超时结果,预先定义resultHandler func(result any) // 结果处理回调done bool // 关闭result chan any // 设置正确结果
}func DeferredResultInstance(timeout time.Duration, timeoutResult any) *DeferredResult {deferred := &DeferredResult{timeout: timeout,timeoutResult: timeoutResult,result: make(chan any),}return deferred
}func NewDeferredResult(timeout time.Duration, timeoutResult any, callback ...func(deferred *DeferredResult)) {instance := DeferredResultInstance(timeout, timeoutResult)if len(callback) > 0 {go callback[0](instance)}instance.listener()
}func (deferred *DeferredResult) SetDeferredResultHandler(resultHandler func(result any)) {deferred.resultHandler = resultHandler
}func (deferred *DeferredResult) SetResult(result any) error {if deferred.done {return errors.New("the deferred result is done")}deferred.result <- resultreturn nil
}// 关键实现,通过一个chan与Timer实现
func (deferred *DeferredResult) listener() {defer func() { deferred.done = true }()timer := time.NewTimer(deferred.timeout)select {case result := <-deferred.result:deferred.resultHandler(result)case <-timer.C:deferred.resultHandler(deferred.timeoutResult)}
}
测试
func TestDeferredResult_SetDeferredResultHandler(t *testing.T) {var result *DeferredResultNewDeferredResult(5*time.Second, "超时了", func(deferred *DeferredResult) {result = deferredresult.SetDeferredResultHandler(func(result any) {log.Println(err)})time.Sleep(6 * time.Second)_ = result.SetResult("success")})}=== RUN TestDeferredResult_SetDeferredResultHandler
deferred_test.go:15: 超时了
--- PASS: TestDeferredResult_SetDeferredResultHandler (5.00s)
PASS// 将time.Sleep(6 * time.Second)改成 4秒
// 等待4秒返回=== RUN TestDeferredResult_SetDeferredResultHandler
deferred_test.go:15: success
--- PASS: TestDeferredResult_SetDeferredResultHandler (4.00s)
PASS
扩展
以上对返回结果只是一个简单的打印,如果我们的请求是一个http请求,结果需要会写到ResponseWriter,那么我们就需要有一个io.Writer去写,于是进行扩展
扩展代码如下
ype DeferredResultWriter struct {*DeferredResultwrite io.Writer
}func NewDeferredResultWriter(timeout time.Duration, timeoutResult any, write io.Writer, callback ...func(deferred *DeferredResultWriter)) {deferred := &DeferredResultWriter{DeferredResult: DeferredResultInstance(timeout, timeoutResult),write: write,}deferred.SetDeferredResultHandler(func(result any) {marshal, err := json.Marshal(result)if err != nil {log.Println(err)}_, err = deferred.write.Write(marshal)if err != nil {log.Println(err)}})if len(callback) > 0 {go callback[0](deferred)}deferred.listener()
}
扩展功能测试
type testHandler struct {
}func (test *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {var deferredResultWriter *DeferredResultWriterNewDeferredResultWriter(5*time.Second, "超时了", w, func(deferred *DeferredResultWriter) {deferredResultWriter = deferredtime.Sleep(6 * time.Second)_ = deferredResultWriter.SetResult("success")})}func TestNewHttpDeferredResult(t *testing.T) {http.Handle("/", &testHandler{})_ = http.ListenAndServe(":8000", nil)
}
以上代码监听了一个8000的端口,当访问8000端口时设置超时时间五秒,请求会超时6秒,那么五秒后会返回超时了
启动TestNewHttpDeferredResult,使用curl请求测试一下
$ curl 127.0.0.1:8000
超时了## 设置为4秒
$ curl 127.0.0.1:8000
success
总结
以上就是Go实现一个DeferredResult的逻辑了,可以当作长轮训功能使用
欢迎关注,学习不迷路!