Hystrix具有折叠(或批处理)请求的高级功能。 如果两个或多个命令同时运行相似的请求,Hystrix可以将它们组合在一起,运行一个批处理请求,并将拆分结果分派回所有命令。 首先让我们看看Hystrix如何工作而不会崩溃。 假设我们有一个StockPrice
给定Ticker
StockPrice
的服务:
import lombok.Value;
import java.math.BigDecimal;
import java.time.Instant;@Value
class Ticker {String symbol;
}@Value
class StockPrice {BigDecimal price;Instant effectiveTime;
}interface StockPriceGateway {default StockPrice load(Ticker stock) {final Set<Ticker> oneTicker = Collections.singleton(stock);return loadAll(oneTicker).get(stock);}ImmutableMap<Ticker, StockPrice> loadAll(Set<Ticker> tickers);
}
为了方便起见, StockPriceGateway
核心实现必须提供loadAll()
批处理方法,而实现load()
方法则是为了方便。 因此,我们的网关能够批量加载多个价格(例如,以减少延迟或网络协议开销),但是目前我们不使用此功能,始终一次加载一个股票的价格:
class StockPriceCommand extends HystrixCommand<StockPrice> {private final StockPriceGateway gateway;private final Ticker stock;StockPriceCommand(StockPriceGateway gateway, Ticker stock) {super(HystrixCommandGroupKey.Factory.asKey("Stock"));this.gateway = gateway;this.stock = stock;}@Overrideprotected StockPrice run() throws Exception {return gateway.load(stock);}
}
这样的命令将始终为每个Ticker
调用StockPriceGateway.load()
,如以下测试所示:
class StockPriceCommandTest extends Specification {def gateway = Mock(StockPriceGateway)def 'should fetch price from external service'() {given:gateway.load(TickerExamples.any()) >> StockPriceExamples.any()def command = new StockPriceCommand(gateway, TickerExamples.any())when:def price = command.execute()then:price == StockPriceExamples.any()}def 'should call gateway exactly once when running Hystrix command'() {given:def command = new StockPriceCommand(gateway, TickerExamples.any())when:command.execute()then:1 * gateway.load(TickerExamples.any())}def 'should call gateway twice when command executed two times'() {given:def commandOne = new StockPriceCommand(gateway, TickerExamples.any())def commandTwo = new StockPriceCommand(gateway, TickerExamples.any())when:commandOne.execute()commandTwo.execute()then:2 * gateway.load(TickerExamples.any())}def 'should call gateway twice even when executed in parallel'() {given:def commandOne = new StockPriceCommand(gateway, TickerExamples.any())def commandTwo = new StockPriceCommand(gateway, TickerExamples.any())when:Future<StockPrice> futureOne = commandOne.queue()Future<StockPrice> futureTwo = commandTwo.queue()and:futureOne.get()futureTwo.get()then:2 * gateway.load(TickerExamples.any())}}
如果您不了解Hystrix,则通过将外部调用包装在命令中可以获得许多功能,例如超时,断路器等。但这不是本文的重点。 看一下最后两个测试:当两次,顺序或并行( queue()
)两次询问任意行情的价格时,我们的外部gateway
也被两次调用。 上一次测试特别有趣–我们几乎同时要求相同的报价,但Hystrix不能弄清楚。 这两个命令是完全独立的,将在不同的线程中执行,彼此之间一无所知-即使它们几乎同时运行。
折叠就是找到类似的请求并将其组合。 批处理(我将这个术语与崩溃互换使用)不会自动发生,并且需要一些编码。 但是首先让我们看看它的行为:
def 'should collapse two commands executed concurrently for the same stock ticker'() {given:def anyTicker = TickerExamples.any()def tickers = [anyTicker] as Setand:def commandOne = new StockTickerPriceCollapsedCommand(gateway, anyTicker)def commandTwo = new StockTickerPriceCollapsedCommand(gateway, anyTicker)when:Future<StockPrice> futureOne = commandOne.queue()Future<StockPrice> futureTwo = commandTwo.queue()and:futureOne.get()futureTwo.get()then:0 * gateway.load(_)1 * gateway.loadAll(tickers) >> ImmutableMap.of(anyTicker, StockPriceExamples.any())
}def 'should collapse two commands executed concurrently for the different stock tickers'() {given:def anyTicker = TickerExamples.any()def otherTicker = TickerExamples.other()def tickers = [anyTicker, otherTicker] as Setand:def commandOne = new StockTickerPriceCollapsedCommand(gateway, anyTicker)def commandTwo = new StockTickerPriceCollapsedCommand(gateway, otherTicker)when:Future<StockPrice> futureOne = commandOne.queue()Future<StockPrice> futureTwo = commandTwo.queue()and:futureOne.get()futureTwo.get()then:1 * gateway.loadAll(tickers) >> ImmutableMap.of(anyTicker, StockPriceExamples.any(),otherTicker, StockPriceExamples.other())
}def 'should correctly map collapsed response into individual requests'() {given:def anyTicker = TickerExamples.any()def otherTicker = TickerExamples.other()def tickers = [anyTicker, otherTicker] as Setgateway.loadAll(tickers) >> ImmutableMap.of(anyTicker, StockPriceExamples.any(),otherTicker, StockPriceExamples.other())and:def commandOne = new StockTickerPriceCollapsedCommand(gateway, anyTicker)def commandTwo = new StockTickerPriceCollapsedCommand(gateway, otherTicker)when:Future<StockPrice> futureOne = commandOne.queue()Future<StockPrice> futureTwo = commandTwo.queue()and:def anyPrice = futureOne.get()def otherPrice = futureTwo.get()then:anyPrice == StockPriceExamples.any()otherPrice == StockPriceExamples.other()
}
第一个测试证明,不是两次调用load()
,而是几乎一次调用loadAll()
。 还要注意,由于我们要求相同的Ticker
(来自两个不同的线程),因此loadAll()
仅请求一个代码。 第二个测试显示两个并发请求,两个不同的行情收录器被折叠为一个批处理调用。 第三次测试可确保我们仍能对每个单独的请求得到正确的响应。 相反,延长HystrixCommand
我们必须扩展更加复杂HystrixCollapser
。 现在是时候看到StockTickerPriceCollapsedCommand
实现,它无缝替换了StockPriceCommand
:
class StockTickerPriceCollapsedCommand extends HystrixCollapser<ImmutableMap<Ticker, StockPrice>, StockPrice, Ticker> {private final StockPriceGateway gateway;private final Ticker stock;StockTickerPriceCollapsedCommand(StockPriceGateway gateway, Ticker stock) {super(HystrixCollapser.Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("Stock")).andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(100)));this.gateway = gateway;this.stock = stock;}@Overridepublic Ticker getRequestArgument() {return stock;}@Overrideprotected HystrixCommand<ImmutableMap<Ticker, StockPrice>> createCommand(Collection<CollapsedRequest<StockPrice, Ticker>> collapsedRequests) {final Set<Ticker> stocks = collapsedRequests.stream().map(CollapsedRequest::getArgument).collect(toSet());return new StockPricesBatchCommand(gateway, stocks);}@Overrideprotected void mapResponseToRequests(ImmutableMap<Ticker, StockPrice> batchResponse, Collection<CollapsedRequest<StockPrice, Ticker>> collapsedRequests) {collapsedRequests.forEach(request -> {final Ticker ticker = request.getArgument();final StockPrice price = batchResponse.get(ticker);request.setResponse(price);});}}
这里有很多事情要做,所以让我们逐步回顾StockTickerPriceCollapsedCommand
。 前三种通用类型:
-
BatchReturnType
(在我们的示例中为ImmutableMap<Ticker, StockPrice>
)是批处理命令响应的类型。 如您将在后面看到的那样,崩溃程序会将多个小命令变成批处理命令。 这是该批处理命令的响应的类型。 请注意,它与StockPriceGateway.loadAll()
类型相同。 -
ResponseType
(StockPrice
)是要折叠的每个命令的类型。 在我们的例子中,我们正在折叠HystrixCommand<StockPrice>
。 稍后,我们将BatchReturnType
值BatchReturnType
为多个StockPrice
。 -
RequestArgumentType
(Ticker
)是我们将要折叠(批处理)的每个命令的输入。 当多个命令一起批处理时,我们最终将所有这些替换为一个批处理命令。 此命令应接收所有单个请求,以便执行一个批处理请求。
withTimerDelayInMilliseconds(100)
将在稍后说明。 createCommand()
创建一个批处理命令。 此命令应替换所有单个命令并执行批处理逻辑。 在我们的情况下,我们不会进行多次单独的load()
调用:
class StockPricesBatchCommand extends HystrixCommand<ImmutableMap<Ticker, StockPrice>> {private final StockPriceGateway gateway;private final Set<Ticker> stocks;StockPricesBatchCommand(StockPriceGateway gateway, Set<Ticker> stocks) {super(HystrixCommandGroupKey.Factory.asKey("Stock"));this.gateway = gateway;this.stocks = stocks;}@Overrideprotected ImmutableMap<Ticker, StockPrice> run() throws Exception {return gateway.loadAll(stocks);}
}
此类与StockPriceCommand
之间的唯一区别是,它需要一堆Ticker
并返回所有价格。 Hystrix将收集几个StockTickerPriceCollapsedCommand
实例,一旦它具有足够的StockTickerPriceCollapsedCommand
(稍后再介绍),它将创建一个StockPriceCommand
。 希望这很清楚,因为mapResponseToRequests()
涉及的更多。 折叠后的StockPricesBatchCommand
完成后,我们必须以某种方式拆分批处理响应,并将回复传达回各个命令,而不会崩溃。 从这个角度来看, mapResponseToRequests()
实现非常简单:我们收到批处理响应和包装CollapsedRequest<StockPrice, Ticker>
的集合。 现在,我们必须遍历所有正在等待的单个请求并完成它们( setResponse()
)。 如果我们不完成某些请求,它们将无限期挂起并最终超时。
怎么运行的
这是描述如何实现折叠的正确时机。 我之前说过崩溃是在同时发生两个请求时发生的。 没有一样的时间了 。 实际上,当第一个可折叠请求进入时,Hystrix会启动一个计时器。 在我们的示例中,我们将其设置为100毫秒。 在此期间,我们的命令被挂起,等待其他命令加入。 在此可配置时间段之后,Hystrix将调用createCommand()
,收集所有请求键(通过调用getRequestArgument()
)并运行它。 批处理命令完成后,它将使我们将结果分发给所有等待中的单个命令。 如果我们担心创建庞大的批处理,也可以限制已折叠请求的数量–另一方面,在此短时间内可以容纳多少并发请求?
用例和缺点
请求折叠应在承受高负载(请求频率很高)的系统中使用。 如果每个折叠时间窗口(在示例中为100毫秒)仅收到一个请求,则折叠只会增加开销。 这是因为每次您调用可折叠命令时,它都必须等待,以防万一其他命令想要加入并形成批处理。 仅当至少折叠了几个命令时,这才有意义。 节省网络等待时间和/或更好地利用合作者中的资源可以平衡浪费的等待时间(与单个呼叫相比,批处理请求通常要快得多)。 但是请记住,折叠是一把双刃剑,在特定情况下很有用。
最后要记住的一件事–为了使用请求折叠,您需要在try-finally
块中使用HystrixRequestContext.initializeContext()
和shutdown()
:
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {//...
} finally {context.shutdown();
}
崩溃与缓存
您可能会认为可以通过适当的缓存来代替崩溃。 这不是真的。 在以下情况下使用缓存:
- 资源可能会被多次访问
- 我们可以安全地使用先前的值,它会在一段时间内保持有效, 或者我们确切地知道如何使它无效
- 我们可以提供对同一资源的并发请求以多次计算
另一方面,折叠不会强制数据的局部性(1),它总是命中实际服务,并且永远不会返回陈旧的数据(2)。 最后,如果我们从多个线程中请求相同的资源,我们将仅调用一次备份服务(3)。 在进行缓存的情况下,除非您的缓存真的很聪明,否则两个线程将独立地发现缓存中没有给定的资源,并两次请求支持服务。 但是,折叠可以与缓存一起使用-通过在运行可折叠命令之前咨询缓存。
摘要
请求折叠是一个有用的工具,但是用例非常有限。 它可以显着提高我们系统的吞吐量,并限制外部服务的负载。 崩溃可以神奇地平抑流量高峰,而不是将其散布到各处。 只要确保将其用于以极高频率运行的命令即可。
翻译自: https://www.javacodegeeks.com/2014/11/batching-collapsing-requests-in-hystrix.html