Spring的WebClient提供了一个非阻塞客户端来进行服务调用。 Hystrix虽然现在处于维护模式,但已通过防止级联故障,为慢速或故障上游服务的呼叫提供断路器来保护服务对服务的呼叫。
在本文中,我将探讨Spring Cloud如何提供一种更新的功能方法,以使用Hystrix封装远程调用。
考虑一个简单的服务,该服务返回使用实体Wiremock工具建模的实体列表(例如城市列表):
WIREMOCK_SERVER.stubFor(WireMock.get(WireMock.urlMatching( "/cities" )) .withHeader( "Accept" , WireMock.equalTo( "application/json" )) .willReturn(WireMock.aResponse() .withStatus(HttpStatus.OK.value()) .withFixedDelay( 5000 ) .withHeader( "Content-Type" , "application/json" )))
当使用类型为“ / cities”的uri进行调用时,此Wiremock终结点将使用以下类型的json进行响应:
[ { "country" : "USA" , "id" : 1 , "name" : "Portland" , "pop" : 1600000 }, { "country" : "USA" , "id" : 2 , "name" : "Seattle" , "pop" : 3200000 }, { "country" : "USA" , "id" : 3 , "name" : "SFO" , "pop" : 6400000 } ]
延迟5秒后。
传统方法
使用Hystrix的方法有很多,传统上,我倾向于使用显式的Hystrix Command保护远程调用的方法,具体如下:
import com.netflix.hystrix.HystrixCommandGroupKey import com.netflix.hystrix.HystrixCommandKey import com.netflix.hystrix.HystrixCommandProperties import com.netflix.hystrix.HystrixObservableCommand import org.bk.samples.model.City import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.http.MediaType import org.springframework.web.reactive.function.client.WebClient import org.springframework.web.reactive.function.client.bodyToFlux import org.springframework.web.util.UriComponentsBuilder import reactor.core.publisher.Flux import rx.Observable import rx.RxReactiveStreams import rx.schedulers.Schedulers import java.net.URI CitiesHystrixCommand( class CitiesHystrixCommand( private val webClientBuilder: WebClient.Builder, private val citiesBaseUrl: String ) : HystrixObservableCommand<City>( HystrixObservableCommand.Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey( "cities-service" .withGroupKey(HystrixCommandGroupKey.Factory.asKey( "cities-service" )) .andCommandKey(HystrixCommandKey.Factory.asKey( "cities-service" .andCommandKey(HystrixCommandKey.Factory.asKey( "cities-service" )) .andCommandPropertiesDefaults(HystrixCommandProperties.Setter() .withExecutionTimeoutInMilliseconds( 4000 ))) { override fun construct(): Observable<City> { val buildUri: URI = UriComponentsBuilder .fromUriString(citiesBaseUrl) .path( "/cities" ) .build() .encode() .toUri() val webClient: WebClient = this .webClientBuilder.build() val result: Flux<City> = webClient.get() .uri(buildUri) .accept(MediaType.APPLICATION_JSON) .exchange() .flatMapMany { clientResponse -> clientResponse.bodyToFlux<City>() } return RxReactiveStreams.toObservable(result) } override fun resumeWithFallback(): Observable<City> { LOGGER.error( "Falling back on cities call" , executionException) LOGGER.error( , executionException) return Observable.empty() } companion object { private val LOGGER: Logger = LoggerFactory.getLogger(CitiesHystrixCommand:: class .java) } }
现在可以使用以下代码通过以下方式进行远程调用:
import org.springframework.http.MediaType import org.springframework.web.reactive.function.client.WebClient class CitiesHystrixCommandBasedClient( private val webClientBuilder: WebClient.Builder, private val citiesBaseUrl: String ) { fun getCities(): Flux<City> { val citiesObservable: Observable<City> = CitiesHystrixCommand(webClientBuilder, citiesBaseUrl) .observe() .subscribeOn(Schedulers.io()) return Flux .from(RxReactiveStreams .toPublisher(citiesObservable)) } }
这里要注意两件事:
1. WebClient返回代表城市列表的Project Reactor“ Flux”类型 ,但是Hystrix 基于Rx-Java 1 ,因此使用RxJavaReactiveStreams 提供的 “ RxReactiveStreams.toObservable()”调用将Flux转换为Rx-Java Observable。 这里的图书馆 。
2.我仍然希望在应用程序的其余部分中使用Project Reactor“ Flux”类型,因此还有另一个适配器将Rx-Java Observable转换回Flux“ Flux.from(RxReactiveStreams.toPublisher(citiesObservable))”一旦包装在Hystrix中的呼叫返回。
如果我尝试使用5秒钟延迟的Wiremock样本尝试此客户端,则它将正确处理延迟并在一秒钟后返回。
功能方法
使用HystrixCommands的新功能性方法可以避免使用以前方法的许多样板,该功能性方法是Spring Cloud附带的实用程序类,它提供了使用Hystrix进行远程调用的功能性方法。
使用HystrixCommands进行的整个调用如下所示:
import com.netflix.hystrix.HystrixCommandProperties import org.bk.samples.model.City import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.cloud.netflix.hystrix.HystrixCommands import org.springframework.http.MediaType import org.springframework.web.reactive.function.client.WebClient import org.springframework.web.reactive.function.client.bodyToFlux import org.springframework.web.util.UriComponentsBuilder import reactor.core.publisher.Flux import rx.schedulers.Schedulers import java.net.URI class CitiesFunctionalHystrixClient( private val webClientBuilder: WebClient.Builder, private val citiesBaseUrl: String ) { fun getCities(): Flux<City> { return HystrixCommands .from(callCitiesService()) .commandName( "cities-service" ) .groupName( "cities-service" ) .commandProperties( HystrixCommandProperties.Setter() .withExecutionTimeoutInMilliseconds( 1000 ) ) .toObservable { obs -> obs.observe() .subscribeOn(Schedulers.io()) } .fallback { t: Throwable -> LOGGER.error(t.message, t) Flux.empty() } .toFlux() } fun callCitiesService(): Flux<City> { val buildUri: URI = UriComponentsBuilder .fromUriString(citiesBaseUrl) .path( "/cities" ) .build() .encode() .toUri() val webClient: WebClient = this .webClientBuilder.build() return webClient.get() .uri(buildUri) .accept(MediaType.APPLICATION_JSON) .exchange() .flatMapMany { clientResponse -> clientResponse.bodyToFlux<City>() } } companion object { private val LOGGER: Logger = LoggerFactory.getLogger(CitiesHystrixCommand:: class .java) } }
这种方法避免了很多样板–
1.不再需要显式命令 2.通话和后备均以流畅的方式编码 3.可以明确指定任何替代–在此特定情况下,超时时间为1秒。
结论
我喜欢HystrixCommands带给WebClient使用Hystrix的简洁性。 我的github仓库中提供了整个示例– https://github.com/bijukunjummen/webclient-hystrix-sample,使这些示例正常工作所需的所有依赖项都属于此仓库。 如果您对使用Rx-Java 1感兴趣,那么可以介绍一种方法
在这里可以帮助您避免使用香草Hystrix
翻译自: https://www.javacodegeeks.com/2019/05/functional-hystrix-using-spring-cloud-hystrixcommands.html