【brpc学习实践五】brpc自适应限流案例

自适应限流

服务的处理能力是有客观上限的。当请求速度超过服务的处理速度时,服务就会过载。

如果服务持续过载,会导致越来越多的请求积压,最终所有的请求都必须等待较长时间才能被处理,从而使整个服务处于瘫痪状态。

与之相对的,如果直接拒绝掉一部分请求,反而能够让服务能够"及时"处理更多的请求。对应的方法就是设置最大并发。

自适应限流能动态调整服务的最大并发,在保证服务不过载的前提下,让服务尽可能多的处理请求。

使用场景

通常情况下要让服务不过载,只需在上线前进行压力测试,并通过little’s law计算出best_max_concurrency就可以了(并发度 = 时延 * QPS)。但在服务数量多,拓扑复杂,且处理能力会逐渐变化的局面下,使用固定的最大并发会带来巨大的测试工作量,很不方便。自适应限流就是为了解决这个问题。

使用自适应限流前建议做到:

客户端开启了重试功能。服务端有多个节点。

这样当一个节点返回过载时,客户端可以向其他的节点发起重试,从而尽量不丢失流量。

brpc开启自适应限流方法

目前只有method级别(即具体的rpc服务方法)支持自适应限流。如果要为某个method开启自适应限流,只需要将它的最大并发设置为"auto"即可。

// Set auto concurrency limiter for all methods
brpc::ServerOptions options;
options.method_max_concurrency = "auto";// Set auto concurrency limiter for specific method
server.MaxConcurrencyOf("example.EchoService.Echo") = "auto";

原理

名词及解释

  • concurrency(并发度): 同时处理的请求数,又被称为“并发度”。

  • max_concurrency:
    设置的最大并发度。超过并发的请求会被拒绝(返回ELIMIT错误),在集群层面,client应重试到另一台server上去

  • best_max_concurrency:
    并发的物理含义是任务处理槽位,天然存在上限,这个上限就是best_max_concurrency,也就是最佳的最大并发度,一般推荐设置最大并发为该值,若max_concurrency设置的过大,则concurrency可能大于best_max_concurrency,任务将无法被及时处理而暂存在各种队列中排队,系统也会进入拥塞状态。若max_concurrency设置的过小,则concurrency总是会小于best_max_concurrency,限制系统达到本可以达到的更高吞吐。

  • noload_latency:
    单纯处理任务的延时,不包括排队时间。另一种解释是低负载的延时。由于正确处理任务得经历必要的环节,其中会耗费cpu或等待下游返回,noload_latency是一个服务固有的属性,但可能随时间逐渐改变(由于内存碎片,压力变化,业务数据变化等因素)。

  • min_latency:
    实际测定的latency中的较小值的ema,当concurrency不大于best_max_concurrency时,min_latency和noload_latency接近(可能轻微上升)。

  • peak_qps: 服务处理qps的上限。注意是处理或回复的qps而不是接收的qps。值取决于best_max_concurrency /
    noload_latency,这两个量都是服务的固有属性,故peak_qps也是服务的固有属性,和拥塞状况无关,但可能随时间逐渐改变。

  • max_qps: 实际测定的qps中的较大值。由于qps具有上限,max_qps总是会小于peak_qps,不论拥塞与否。
    - Little’s Law

    在服务处于稳定状态时: concurrency = latency * qps。 这是自适应限流的理论基础。

当服务没有超载时,随着流量的上升,latency基本稳定(接近noload_latency),qps和concurrency呈线性关系一起上升。

当流量超过服务的peak_qps时,则concurrency和latency会一起上升,而qps会稳定在peak_qps。

假如一个服务的peak_qps和noload_latency都比较稳定,那么它的best_max_concurrency = noload_latency * peak_qps。

自适应限流就是要找到服务的noload_latency和peak_qps, 并将最大并发设置为靠近两者乘积的一个值。

自适应限流计算公式

自适应限流会不断的对请求进行采样,当采样窗口的样本数量足够时,会根据样本的平均延迟和服务当前的qps计算出下一个采样窗口的max_concurrency:

max_concurrency = max_qps * ((2+alpha) * min_latency - latency)
  • alpha为可接受的延时上升幅度,默认0.3。
  • latency是当前采样窗口内所有请求的平均latency。
  • max_qps是最近一段时间测量到的qps的极大值。
  • min_latency是最近一段时间测量到的latency较小值的ema,是noload_latency的估算值。

注意:当计算出来的 max_concurrency 和当前的 max_concurrency 的值不同时,每次对 max_concurrency 的调整的比例有一个上限,让 max_concurrency 的变化更为平滑。

当服务处于低负载时,min_latency约等于noload_latency,此时计算出来的max_concurrency会高于concurrency,但低于best_max_concurrency,给流量上涨留探索空间。而当服务过载时,服务的qps约等于max_qps,同时latency开始明显超过min_latency,此时max_concurrency则会接近concurrency,并通过定期衰减避免远离best_max_concurrency,保证服务不会过载。
估算noload_latency

服务的noload_latency并非是一成不变的,自适应限流必须能够正确的探测noload_latency的变化。当noload_latency下降时,是很容感知到的,因为这个时候latency也会下降。难点在于当latency上涨时,需要能够正确的辨别到底是服务过载了,还是noload_latency上涨了。

可能的方案有:

取最近一段时间的最小latency来近似noload_latency
取最近一段时间的latency的各种平均值来预测noload_latency
收集请求的平均排队等待时间,使用latency - queue_time作为noload_latency
每隔一段时间缩小max_concurrency,过一小段时间后以此时的latency作为noload_latency

方案1和方案2的问题在于:假如服务持续处于高负载,那么最近的所有latency都会高出noload_latency,从而使得算法估计的noload_latency不断升高。

方案3的问题在于,假如服务的性能瓶颈在下游服务,那么请求在服务本身的排队等待时间无法反应整体的负载情况。

方案4是最通用的,也经过了大量实验的考验。缩小max_concurrency和公式中的alpha存在关联。让我们做个假想实验,若latency极为稳定并都等于min_latency,那么公式简化为max_concurrency = max_qps * latency * (1 + alpha)。根据little’s law,qps最多为max_qps * (1 + alpha). alpha是qps的"探索空间",若alpha为0,则qps被锁定为max_qps,算法可能无法探索到peak_qps。但在qps已经达到peak_qps时,alpha会使延时上升(已拥塞),此时测定的min_latency会大于noload_latency,一轮轮下去最终会导致min_latency不收敛。定期降低max_concurrency就是阻止这个过程,并给min_latency下降提供"探索空间"。
减少重测时的流量损失

每隔一段时间,自适应限流算法都会缩小max_concurrency,并持续一段时间,然后将此时的latency作为服务的noload_latency,以处理noload_latency上涨了的情况。测量noload_latency时,必须让先服务处于低负载的状态,因此对max_concurrency的缩小是难以避免的。

由于max_concurrency < concurrency时,服务会拒绝掉所有的请求,限流算法将"排空所有的经历过排队等待的请求的时间" 设置为 latency * 2 ,以确保用于计算min_latency的样本绝大部分都是没有经过排队等待的。

由于服务的latency通常都不会太长,这种做法所带来的流量损失也很小。
应对抖动

即使服务自身没有过载,latency也会发生波动,根据Little’s Law,latency的波动会导致server的concurrency发生波动。

我们在设计自适应限流的计算公式时,考虑到了latency发生抖动的情况: 当latency与min_latency很接近时,根据计算公式会得到一个较高max_concurrency来适应concurrency的波动,从而尽可能的减少“误杀”。同时,随着latency的升高,max_concurrency会逐渐降低,以保护服务不会过载。

从另一个角度来说,当latency也开始升高时,通常意味着某处(不一定是服务本身,也有可能是下游服务)消耗了大量CPU资源,这个时候缩小max_concurrency也是合理的。
平滑处理

为了减少个别窗口的抖动对限流算法的影响,同时尽量降低计算开销,计算min_latency时会通过使用EMA来进行平滑处理:

if latency < min_latency:
min_latency = latency * ema_alpha + (1 - ema_alpha) * min_latency
else:
do_nothing

估算peak_qps
提高qps增长的速度

当服务启动时,由于服务本身需要进行一系列的初始化,tcp本身也有慢启动等一系列原因。服务在刚启动时的qps一定会很低。这就导致了服务启动时的max_concurrency也很低。而按照上面的计算公式,当max_concurrency很低的时候,预留给qps增长的冗余concurrency也很低(即:alpha * max_qps * min_latency)。从而会影响当流量增加时,服务max_concurrency的增加速度。

假如从启动到打满qps的时间过长,这期间会损失大量流量。在这里我们采取的措施有两个,

采样方面,一旦采到的请求数量足够多,直接提交当前采样窗口,而不是等待采样窗口的到时间了才提交
计算公式方面,当current_qps > 保存的max_qps时,直接进行更新,不进行平滑处理。

在进行了这两个处理之后,绝大部分情况下都能够在2秒左右将qps打满。
平滑处理

为了减少个别窗口的抖动对限流算法的影响,同时尽量降低计算开销,在计算max_qps时,会通过使用EMA来进行平滑处理:

if current_qps > max_qps:
max_qps = current_qps
else:
max_qps = current_qps * ema_alpha / 10 + (1 - ema_alpha / 10) * max_qps

将max_qps的ema参数置为min_latency的ema参数的十分之一的原因是: max_qps 下降了通常并不意味着极限qps也下降了。而min_latency下降了,通常意味着noload_latency确实下降了。
与netflix gradient算法的对比

netflix中的gradient算法公式为:max_concurrency = min_latency / latency * max_concurrency + queue_size。

其中latency是采样窗口的最小latency,min_latency是最近多个采样窗口的最小latency。min_latency / latency就是算法中的"梯度",当latency大于min_latency时,max_concurrency会逐渐减少;反之,max_concurrency会逐渐上升,从而让max_concurrency围绕在best_max_concurrency附近。

这个公式可以和本文的算法进行类比:
gradient算法中的latency和本算法的不同,前者的latency是最小值,后者是平均值。netflix的原意是最小值能更好地代表noload_latency,但实际上只要不对max_concurrency做定期衰减,不管最小值还是平均值都有可能不断上升使算法不收敛。最小值并不能带来额外的好处,反而会使算法更不稳定。
gradient算法中的max_concurrency / latency从概念上和qps有关联(根据little’s law),但可能严重脱节。比如在重测 min_latency前,若所有latency都小于min_latency,那么max_concurrency会不断下降甚至到0;但按照本算法,max_qps和min_latency仍然是稳定的,它们计算出的max_concurrency也不会剧烈变动。究其本质,gradient算法在迭代max_concurrency时,latency并不能代表实际并发为max_concurrency时的延时,两者是脱节的,所以max_concurrency / latency的实际物理含义不明,与qps可能差异甚大,最后导致了很大的偏差。
gradient算法的queue_size推荐为sqrt(max_concurrency),这是不合理的。netflix对queue_size的理解大概是代表各种不可控环节的缓存,比如socket里的,和max_concurrency存在一定的正向关系情有可原。但在我们的理解中,这部分queue_size作用微乎其微,没有或用常量即可。我们关注的queue_size是给concurrency上升留出的探索空间: max_concurrency的更新是有延迟的,在并发从低到高的增长过程中,queue_size的作用就是在max_concurrency更新前不限制qps上升。而当concurrency高时,服务可能已经过载了,queue_size就应该小一点,防止进一步恶化延时。这里的queue_size和并发是反向关系。

服务端代码实现

#include <gflags/gflags.h>
#include <butil/logging.h>
#include <brpc/server.h>
#include <butil/atomicops.h>
#include <butil/time.h>
#include <butil/logging.h>
#include <json2pb/json_to_pb.h>
#include <bthread/timer_thread.h>
#include <bthread/bthread.h>#include <cstdlib>
#include <fstream>
#include "cl_test.pb.h"DEFINE_int32(logoff_ms, 2000, "Maximum duration of server's LOGOFF state ""(waiting for client to close connection before server stops)");
DEFINE_int32(server_bthread_concurrency, 4, "Configuring the value of bthread_concurrency, For compute max qps, ");
DEFINE_int32(server_sync_sleep_us, 2500, "Usleep time, each request will be executed once, For compute max qps");
// max qps = 1000 / 2.5 * 4 DEFINE_int32(control_server_port, 9000, "");
DEFINE_int32(echo_port, 9001, "TCP Port of echo server");
DEFINE_int32(cntl_port, 9000, "TCP Port of controller server");
DEFINE_string(case_file, "", "File path for test_cases");
DEFINE_int32(latency_change_interval_us, 50000, "Intervalt for server side changes the latency");
DEFINE_int32(server_max_concurrency, 0, "Echo Server's max_concurrency");
DEFINE_bool(use_usleep, false, "EchoServer uses ::usleep or bthread_usleep to simulate latency ""when processing requests");bthread::TimerThread g_timer_thread;int cast_func(void* arg) {return *(int*)arg;
}void DisplayStage(const test::Stage& stage) {std::string type;switch(stage.type()) {case test::FLUCTUATE: type = "Fluctuate";break;case test::SMOOTH:type = "Smooth";break;default:type = "Unknown";}std::stringstream ss;ss << "Stage:[" << stage.lower_bound() << ':' << stage.upper_bound() <<  "]"<< " , Type:" << type;LOG(INFO) << ss.str();
}butil::atomic<int> cnt(0);
butil::atomic<int> atomic_sleep_time(0);
bvar::PassiveStatus<int> atomic_sleep_time_bvar(cast_func, &atomic_sleep_time);namespace bthread {
DECLARE_int32(bthread_concurrency);
}void TimerTask(void* data);class EchoServiceImpl : public test::EchoService {
public:EchoServiceImpl() : _stage_index(0), _running_case(false) {};virtual ~EchoServiceImpl() {};void SetTestCase(const test::TestCase& test_case) {_test_case = test_case;_next_stage_start = _test_case.latency_stage_list(0).duration_sec() + butil::gettimeofday_s();_stage_index = 0;_running_case = false;DisplayStage(_test_case.latency_stage_list(_stage_index));}void StartTestCase() {CHECK(!_running_case);_running_case = true;UpdateLatency();}void StopTestCase() {_running_case = false;}void UpdateLatency() {if (!_running_case) {return;}ComputeLatency();g_timer_thread.schedule(TimerTask, (void*)this, butil::microseconds_from_now(FLAGS_latency_change_interval_us));}virtual void Echo(google::protobuf::RpcController* cntl_base,const test::NotifyRequest* request,test::NotifyResponse* response,google::protobuf::Closure* done) {brpc::ClosureGuard done_guard(done); response->set_message("hello");::usleep(FLAGS_server_sync_sleep_us);if (FLAGS_use_usleep) {::usleep(_latency.load(butil::memory_order_relaxed));} else {bthread_usleep(_latency.load(butil::memory_order_relaxed));}}void ComputeLatency() {if (_stage_index < _test_case.latency_stage_list_size() &&butil::gettimeofday_s() > _next_stage_start) {++_stage_index;if (_stage_index < _test_case.latency_stage_list_size()) {_next_stage_start += _test_case.latency_stage_list(_stage_index).duration_sec();DisplayStage(_test_case.latency_stage_list(_stage_index));}}if (_stage_index == _test_case.latency_stage_list_size()) {const test::Stage& latency_stage = _test_case.latency_stage_list(_stage_index - 1);if (latency_stage.type() == test::ChangeType::FLUCTUATE) {_latency.store((latency_stage.lower_bound() + latency_stage.upper_bound()) / 2,butil::memory_order_relaxed);} else if (latency_stage.type() == test::ChangeType::SMOOTH) {_latency.store(latency_stage.upper_bound(), butil::memory_order_relaxed);}return;}const test::Stage& latency_stage = _test_case.latency_stage_list(_stage_index);const int lower_bound = latency_stage.lower_bound();const int upper_bound = latency_stage.upper_bound();if (latency_stage.type() == test::FLUCTUATE) {_latency.store(butil::fast_rand_less_than(upper_bound - lower_bound) + lower_bound,butil::memory_order_relaxed); } else if (latency_stage.type() == test::SMOOTH) {int latency = lower_bound + (upper_bound - lower_bound) / double(latency_stage.duration_sec()) * (latency_stage.duration_sec() - _next_stage_start + butil::gettimeofday_s());_latency.store(latency, butil::memory_order_relaxed);} else {LOG(FATAL) << "Wrong Type:" << latency_stage.type();}}private:int _stage_index;int _next_stage_start;butil::atomic<int> _latency;test::TestCase _test_case;bool _running_case;
};void TimerTask(void* data) {EchoServiceImpl* echo_service = (EchoServiceImpl*)data;echo_service->UpdateLatency();
}class ControlServiceImpl : public test::ControlService {
public:ControlServiceImpl() : _case_index(0) {LoadCaseSet(FLAGS_case_file);_echo_service = new EchoServiceImpl;if (_server.AddService(_echo_service,brpc::SERVER_OWNS_SERVICE) != 0) {LOG(FATAL) << "Fail to add service";}g_timer_thread.start(NULL);}virtual ~ControlServiceImpl() { _echo_service->StopTestCase();g_timer_thread.stop_and_join(); };virtual void Notify(google::protobuf::RpcController* cntl_base,const test::NotifyRequest* request,test::NotifyResponse* response,google::protobuf::Closure* done) {brpc::ClosureGuard done_guard(done);const std::string& message = request->message();LOG(INFO) << message;if (message == "ResetCaseSet") {_server.Stop(0);_server.Join();_echo_service->StopTestCase();LoadCaseSet(FLAGS_case_file);_case_index = 0;response->set_message("CaseSetReset");} else if (message == "StartCase") {CHECK(!_server.IsRunning()) << "Continuous StartCase";const test::TestCase& test_case = _case_set.test_case(_case_index++);_echo_service->SetTestCase(test_case);brpc::ServerOptions options;options.max_concurrency = FLAGS_server_max_concurrency;_server.MaxConcurrencyOf("test.EchoService.Echo") = test_case.max_concurrency();_server.Start(FLAGS_echo_port, &options);            _echo_service->StartTestCase();response->set_message("CaseStarted");} else if (message == "StopCase") {CHECK(_server.IsRunning()) << "Continuous StopCase";_server.Stop(0);_server.Join();_echo_service->StopTestCase();response->set_message("CaseStopped");} else {LOG(FATAL) << "Invalid message:" << message;response->set_message("Invalid Cntl Message");}}private:void LoadCaseSet(const std::string& file_path) {std::ifstream ifs(file_path.c_str(), std::ios::in);  if (!ifs) {LOG(FATAL) << "Fail to open case set file: " << file_path;}std::string case_set_json((std::istreambuf_iterator<char>(ifs)),  std::istreambuf_iterator<char>()); test::TestCaseSet case_set;std::string err;if (!json2pb::JsonToProtoMessage(case_set_json, &case_set, &err)) {LOG(FATAL) << "Fail to trans case_set from json to protobuf message: "<< err;}_case_set = case_set;ifs.close();}brpc::Server _server;EchoServiceImpl* _echo_service;test::TestCaseSet _case_set;int _case_index;
};int main(int argc, char* argv[]) {// Parse gflags. We recommend you to use gflags as well.GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);bthread::FLAGS_bthread_concurrency= FLAGS_server_bthread_concurrency;brpc::Server server;ControlServiceImpl control_service_impl;if (server.AddService(&control_service_impl, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {LOG(ERROR) << "Fail to add service";return -1;}if (server.Start(FLAGS_cntl_port, NULL) != 0) {LOG(ERROR) << "Fail to start EchoServer";return -1;}server.RunUntilAskedToQuit();return 0;
}

客户端代码实现

#include <gflags/gflags.h>
#include <butil/logging.h>
#include <butil/time.h>
#include <brpc/channel.h>
#include <bvar/bvar.h>
#include <bthread/timer_thread.h>
#include <json2pb/json_to_pb.h>#include <fstream>
#include "cl_test.pb.h"DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in src/brpc/options.proto");
DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short");
DEFINE_string(cntl_server, "0.0.0.0:9000", "IP Address of server");
DEFINE_string(echo_server, "0.0.0.0:9001", "IP Address of server");
DEFINE_int32(timeout_ms, 3000, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 0, "Max retries(not including the first RPC)"); 
DEFINE_int32(case_interval, 20, "Intervals for different test cases");
DEFINE_int32(client_qps_change_interval_us, 50000, "The interval for client changes the sending speed");
DEFINE_string(case_file, "", "File path for test_cases");void DisplayStage(const test::Stage& stage) {std::string type;switch(stage.type()) {case test::FLUCTUATE: type = "Fluctuate";break;case test::SMOOTH:type = "Smooth";break;default:type = "Unknown";}std::stringstream ss;ss << "Stage:[" << stage.lower_bound() << ':' << stage.upper_bound() <<  "]"<< " , Type:" << type;LOG(INFO) << ss.str();
}uint32_t cast_func(void* arg) {return *(uint32_t*)arg;
}butil::atomic<uint32_t> g_timeout(0);
butil::atomic<uint32_t> g_error(0);
butil::atomic<uint32_t> g_succ(0);
bvar::PassiveStatus<uint32_t> g_timeout_bvar(cast_func, &g_timeout);
bvar::PassiveStatus<uint32_t> g_error_bvar(cast_func, &g_error);
bvar::PassiveStatus<uint32_t> g_succ_bvar(cast_func, &g_succ);
bvar::LatencyRecorder g_latency_rec;void LoadCaseSet(test::TestCaseSet* case_set, const std::string& file_path) {std::ifstream ifs(file_path.c_str(), std::ios::in);  if (!ifs) {LOG(FATAL) << "Fail to open case set file: " << file_path;}std::string case_set_json((std::istreambuf_iterator<char>(ifs)),  std::istreambuf_iterator<char>()); std::string err;if (!json2pb::JsonToProtoMessage(case_set_json, case_set, &err)) {LOG(FATAL) << "Fail to trans case_set from json to protobuf message: "<< err;}
}void HandleEchoResponse(brpc::Controller* cntl,test::NotifyResponse* response) {// std::unique_ptr makes sure cntl/response will be deleted before returning.std::unique_ptr<brpc::Controller> cntl_guard(cntl);std::unique_ptr<test::NotifyResponse> response_guard(response);if (cntl->Failed() && cntl->ErrorCode() == brpc::ERPCTIMEDOUT) {g_timeout.fetch_add(1, butil::memory_order_relaxed);LOG_EVERY_N(INFO, 1000) << cntl->ErrorText();} else if (cntl->Failed()) {g_error.fetch_add(1, butil::memory_order_relaxed);LOG_EVERY_N(INFO, 1000) << cntl->ErrorText();} else {g_succ.fetch_add(1, butil::memory_order_relaxed);g_latency_rec << cntl->latency_us();}}void Expose() {g_timeout_bvar.expose_as("cl", "timeout");g_error_bvar.expose_as("cl", "failed");g_succ_bvar.expose_as("cl", "succ");g_latency_rec.expose("cl");
}struct TestCaseContext {TestCaseContext(const test::TestCase& tc) : running(true), stage_index(0), test_case(tc), next_stage_sec(test_case.qps_stage_list(0).duration_sec() + butil::gettimeofday_s()) {DisplayStage(test_case.qps_stage_list(stage_index));Update();}bool Update() {if (butil::gettimeofday_s() >= next_stage_sec) {++stage_index;if (stage_index < test_case.qps_stage_list_size()) {next_stage_sec += test_case.qps_stage_list(stage_index).duration_sec(); DisplayStage(test_case.qps_stage_list(stage_index));} else {return false;}}int qps = 0;const test::Stage& qps_stage = test_case.qps_stage_list(stage_index);const int lower_bound = qps_stage.lower_bound();const int upper_bound = qps_stage.upper_bound();if (qps_stage.type() == test::FLUCTUATE) {qps = butil::fast_rand_less_than(upper_bound - lower_bound) + lower_bound;} else if (qps_stage.type() == test::SMOOTH) {qps = lower_bound + (upper_bound - lower_bound) / double(qps_stage.duration_sec()) * (qps_stage.duration_sec() - next_stage_sec+ butil::gettimeofday_s());}interval_us.store(1.0 / qps * 1000000, butil::memory_order_relaxed);return true;}butil::atomic<bool> running;butil::atomic<int64_t> interval_us;int stage_index;const test::TestCase test_case;int next_stage_sec;
};void RunUpdateTask(void* data) {TestCaseContext* context = (TestCaseContext*)data;bool should_continue = context->Update();if (should_continue) {bthread::get_global_timer_thread()->schedule(RunUpdateTask, data, butil::microseconds_from_now(FLAGS_client_qps_change_interval_us));} else {context->running.store(false, butil::memory_order_release);}
}void RunCase(test::ControlService_Stub &cntl_stub, const test::TestCase& test_case) {LOG(INFO) << "Running case:`" << test_case.case_name() << '\'';brpc::Channel channel;brpc::ChannelOptions options;options.protocol = FLAGS_protocol;options.connection_type = FLAGS_connection_type;options.timeout_ms = FLAGS_timeout_ms;options.max_retry = FLAGS_max_retry;if (channel.Init(FLAGS_echo_server.c_str(), &options) != 0) {LOG(FATAL) << "Fail to initialize channel";}test::EchoService_Stub echo_stub(&channel);test::NotifyRequest cntl_req;test::NotifyResponse cntl_rsp;brpc::Controller cntl;cntl_req.set_message("StartCase");cntl_stub.Notify(&cntl, &cntl_req, &cntl_rsp, NULL);CHECK(!cntl.Failed()) << "control failed";TestCaseContext context(test_case);bthread::get_global_timer_thread()->schedule(RunUpdateTask, &context, butil::microseconds_from_now(FLAGS_client_qps_change_interval_us));while (context.running.load(butil::memory_order_acquire)) {test::NotifyRequest echo_req;echo_req.set_message("hello");brpc::Controller* echo_cntl = new brpc::Controller;test::NotifyResponse* echo_rsp = new test::NotifyResponse;google::protobuf::Closure* done = brpc::NewCallback(&HandleEchoResponse, echo_cntl, echo_rsp);echo_stub.Echo(echo_cntl, &echo_req, echo_rsp, done);::usleep(context.interval_us.load(butil::memory_order_relaxed));}LOG(INFO) << "Waiting to stop case: `" << test_case.case_name() << '\'';::sleep(FLAGS_case_interval);cntl.Reset();cntl_req.set_message("StopCase");cntl_stub.Notify(&cntl, &cntl_req, &cntl_rsp, NULL);CHECK(!cntl.Failed()) << "control failed";LOG(INFO) << "Case `" << test_case.case_name() << "' finshed:";
}int main(int argc, char* argv[]) {// Parse gflags. We recommend you to use gflags as well.GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);Expose();brpc::Channel channel;brpc::ChannelOptions options;options.protocol = FLAGS_protocol;options.connection_type = FLAGS_connection_type;options.timeout_ms = FLAGS_timeout_ms;if (channel.Init(FLAGS_cntl_server.c_str(), &options) != 0) {LOG(ERROR) << "Fail to initialize channel";return -1;}test::ControlService_Stub cntl_stub(&channel);test::TestCaseSet case_set;LoadCaseSet(&case_set, FLAGS_case_file);brpc::Controller cntl;test::NotifyRequest cntl_req;test::NotifyResponse cntl_rsp;cntl_req.set_message("ResetCaseSet");cntl_stub.Notify(&cntl, &cntl_req, &cntl_rsp, NULL);CHECK(!cntl.Failed()) << "Cntl Failed";for (int i = 0; i < case_set.test_case_size(); ++i) {RunCase(cntl_stub, case_set.test_case(i));}LOG(INFO) << "EchoClient is going to quit";return 0;
}

proto

syntax="proto2";
package test;option cc_generic_services = true;message NotifyRequest {required string message = 1;
};message NotifyResponse {required string message = 1;
};enum ChangeType {FLUCTUATE = 1;  // Fluctuating between upper and lower bound SMOOTH = 2;     // Smoothly rising from the lower bound to the upper bound 
}message Stage {required int32 lower_bound = 1;required int32 upper_bound = 2;required int32 duration_sec = 3;required ChangeType type = 4; 
}message TestCase {required string case_name = 1;required string max_concurrency = 2;repeated Stage qps_stage_list = 3;repeated Stage latency_stage_list = 4;
}message TestCaseSet {repeated TestCase test_case = 1;
}service ControlService {rpc Notify(NotifyRequest) returns (NotifyResponse);
}service EchoService {rpc Echo(NotifyRequest) returns (NotifyResponse);
};

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/162045.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PHP中间件实现

目录 1、简单中间实现 2、使用闭包函数实现中间件 在PHP中&#xff0c;中间件是一种常用的设计模式&#xff0c;用于处理请求和响应&#xff0c;它可以在请求到达目标处理程序之前或响应发送给客户端之前执行一些特定的逻辑。中间件提供了一种灵活的方式来修改或扩展应用程序的…

查看当前laravel版本三种方法(笔记二)

1、在终端中使用 Artisan 命令&#xff1a;在 Laravel 项目的根目录下&#xff0c;打开终端&#xff08;命令行界面&#xff09;&#xff0c;然后运行以下命令&#xff1a; php artisan --version 2、控制器中打印版本 var_dump(app()->version()); 3、在 Laravel 项目的根目…

【kubernetes】k8s架构之节点

文章目录 1、集群架构示意图2、概述3、管理3.1 节点名称唯一性3.2 节点自注册3.3 手动节点管理 4、节点状态4.1 地址&#xff08;Addresses&#xff09;4.2 状况&#xff08;Condition&#xff09;4.3 容量&#xff08;Capacity&#xff09;与可分配&#xff08;Allocatable&am…

PTA-输出三角形面积和周长

本题要求编写程序&#xff0c;根据输入的三角形的三条边a、b、c&#xff0c;计算并输出面积和周长。注意&#xff1a;在一个三角形中&#xff0c; 任意两边之和大于第三边。三角形面积计算公式&#xff1a;areas(s−a)(s−b)(s−c)​&#xff0c;其中s(abc)/2。 输入格式&…

某60区块链安全之Call函数簇滥用实战二学习记录

区块链安全 文章目录 区块链安全Call函数簇滥用实战二实验目的实验环境实验原理实验内容实验步骤EXP利用 Call函数簇滥用实战二 实验目的 学会使用python3的web3模块 学会并区分以太坊call、staticcall、delegatecall三种函数调用的特点 找到合约漏洞进行分析并形成利用 实验…

关于git hooks

Git hooks 是一种在 Git 仓库中触发自定义脚本的机制。这些脚本可以在特定的 Git 操作&#xff08;如提交、推送、合并等&#xff09;发生时执行。通过使用 Git hooks&#xff0c;你可以在版本控制的不同阶段自动运行脚本&#xff0c;以执行一些定制化的操作。 在 Git 中&…

03梯度下降

目录 lambda基础知识 代码 核心算法&#xff1a; lambda基础知识 lambda 是 Python 中的一个关键字&#xff0c;用于创建匿名函数。匿名函数是一种没有具体名称的小型、临时的函数&#xff0c;通常用于一次性的、简单的操作。lambda 函数的语法如下&#xff1a;python Copy c…

高效运维工具,助力运维服务商为企业用户提供IT远程维保服务

一、背景介绍 随着科技的迅速发展和信息化建设的不断推进&#xff0c;IT运维在中小企业中的地位逐渐提升。IT运维是指通过技术手段和工具&#xff0c;对企业的IT基础设施进行监控、管理和维护&#xff0c;以确保企业信息系统的稳定运行和业务的持续发展。 然而&#xff0c;对于…

计算3个点的6种分布在平面上的占比

假设平面的尺寸是6*6&#xff0c;用11的方式构造2&#xff0c;在用21的方式构造3 2 2 2 1 2 2 2 2 2 1 2 2 2 2 2 1 2 2 3 3 3 x 3 3 2 2 2 1 2 2 2 2 2 1 2 2 在平面上有一个点x&#xff0c;11的操作吧平面分成了3部分2a1&#xff0c;2a…

海康Visionmaster-模块索引:MFC 模块索引异常解决 办法

现象&#xff1a;文件编码格式为 UTF-8 不带签名编码格式&#xff0c;模块索引会出现 模块无法找到异常 更改文件类型为 UTF-8 带签名格式或 vs 默认 GBK2312 编码格式

JMeter处理接口签名sign

写接口脚本的时候&#xff0c;很多接口涉及到签名&#xff0c;今天介绍下用JMeter编写签名脚本的方法。 举个例子&#xff0c;开启红包接口&#xff0c;请求方式为post POST /v1/api/red/open json请求参数 { "red_id":1, "timestamp":"1667033841…

2023年中国边缘计算网关现状及发展趋势分析[图]

边缘计算网关是一种可以在设备上运行本地计算、消息通信、数据缓存等功能的工业智能网关&#xff0c;可以在无需联网的情况下实现设备的本地联动以及数据处理分析。边缘计算网关是一种连接物联网设备和云端服务的关键技术&#xff0c;它可以在设备和云端之间建立一个安全、高效…

实例讲解Simulink的MATLAB Function模块

内容 MATLAB Function是一个支持使用M语言编写模块功能,并能够将所编写的M语言生成C代码&#xff0c;用于开发桌面应用和嵌入式应用的模块。它支持的 MATLAB内建函数比 Fcn模块要广泛&#xff0c;除去基本的四则运算、逻辑操作符和关系操作符&#xff0c;还可以调用MATLAB各种…

代码随想录算法训练营第四十三天【动态规划part05】 | 1049. 最后一块石头的重量 II、494. 目标和、474.一和零

1049. 最后一块石头的重量 II 题目链接&#xff1a; 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 求解思路&#xff1a; 等于把石头尽量分成重量相同的两堆 动规五部曲 确定dp数组及其下标含义&#xff1a;容量为j的背包&#xff0c;最多能装…

logstash安装和使用

官网&#xff1a;https://www.elastic.co/cn/logstash/ 1.上传Linux安装包 2.解压安装包且重命名 [rootVM-4-10-centos logstash]# tar -zxvf logstash-8.11.1-linux-x86_64.tar.gz -C ../software/[rootVM-4-10-centos logstash]# mv logstash-8.11.1/ logstash3.启动测试 …

国产遥感影像处理软件 GSRS,真是很方便

兼容国内外绝大多数卫星遥感影像格式&#xff1b;高效的影像查看&#xff0c;比如漫游、放大、缩小、查看影像像素灰度值、影像地理坐标、影像投影坐标系等等&#xff1b;人机交互影像裁剪&#xff0c;任何绘制裁剪区域&#xff0c;输出裁剪影像&#xff1b;具备影像基本处理功…

基于Haclon的Blob分析

任务要求&#xff1a; 请用BLOB分析的方法计算图中所有灰度值在120和255之间的像素构成的8连通区域的面积与中心点坐标。 Blob基础&#xff1a; 分析过程&#xff1a;首先获取图像&#xff0c;然后根据特征对原始图像进行阈值分割&#xff08;区分背景像素和前景像素&#xf…

洛谷 P4552 [Poetize6] IncDec Sequence

挺好的一道思维题。 分析 因为是对区间修改&#xff0c;多次修改肯定会超时&#xff0c;很容易想到差分。 那么原题的对区间修改就可以转换为下面三个操作&#xff08;均在差分数组中&#xff09;&#xff1a; 1. 任选一个数1 2. 任选一个数-1 3. 任选两个数1和-1 进一步考…

贪心算法及相关例题

目录 什么是贪心算法&#xff1f; leetcode455题.分发饼干 leetcode376题.摆动序列 leetcode55题.跳跃游戏I leetcode45题.跳跃游戏II leetcode621题.任务调度器 leetcode435题.无重叠空间 leetcode135题.分发糖果 什么是贪心算法&#xff1f; 贪心算法更多的是一种思…

《QT从基础到进阶·三十七》QWidget实现左侧导航栏效果

NavigationBarPlugin插件类实现了对左侧导航栏的管理&#xff0c;我们可以在导航栏插件中添加界面&#xff0c;并用鼠标点击导航栏能够切换对应的界面。 源码在文章末尾 实现效果如下&#xff1a; NavigationBarPlugin实现的接口如下&#xff1a; class NAVIGATIONBAR_EXP…