在上一篇文章中,我们创建了一个从ElasticSearch的API到Reactor的Mono
的简单适配器,如下所示:
import reactor.core.publisher.Mono;private Mono indexDoc(Doc doc) {//...
}
现在,我们希望以受控的并发级别运行此方法数百万次。 基本上,我们想看看索引代码在负载下的行为,对其进行基准测试。
用jFairy伪造数据
首先,我们需要一些美观的测试数据。 为此,我们将使用方便的jFairy库。 我们将索引的文档是一个简单的POJO:
@Value
class Doc {private final String username;private final String json;
}
生成逻辑包装在Java类中:
import io.codearte.jfairy.Fairy;
import io.codearte.jfairy.producer.person.Address;
import io.codearte.jfairy.producer.person.Person;
import org.apache.commons.lang3.RandomUtils;@Component
class PersonGenerator {private final ObjectMapper objectMapper;private final Fairy fairy;private Doc generate() {Person person = fairy.person();final String username = person.getUsername() + RandomUtils.nextInt(1_000_000, 9_000_000);final ImmutableMap<String, Object> map = ImmutableMap.<String, Object>builder().put("address", toMap(person.getAddress())).put("firstName", person.getFirstName()).put("middleName", person.getMiddleName()).put("lastName", person.getLastName()).put("email", person.getEmail()).put("companyEmail", person.getCompanyEmail()).put("username", username).put("password", person.getPassword()).put("sex", person.getSex()).put("telephoneNumber", person.getTelephoneNumber()).put("dateOfBirth", person.getDateOfBirth()).put("company", person.getCompany()).put("nationalIdentityCardNumber", person.getNationalIdentityCardNumber()).put("nationalIdentificationNumber", person.getNationalIdentificationNumber()).put("passportNumber", person.getPassportNumber()).build();final String json = objectMapper.writeValueAsString(map);return new Doc(username, json);}private ImmutableMap<String, Object> toMap(Address address) {return ImmutableMap.<String, Object>builder().put("street", address.getStreet()).put("streetNumber", address.getStreetNumber()).put("apartmentNumber", address.getApartmentNumber()).put("postalCode", address.getPostalCode()).put("city", address.getCity()).put("lines", Arrays.asList(address.getAddressLine1(), address.getAddressLine2())).build();}}
相当无聊的代码实际上确实很酷。 每次运行它时,它都会生成随机但合理的JSON,如下所示:
{"address": {"street": "Ford Street","streetNumber": "32","apartmentNumber": "","postalCode": "63913","city": "San Francisco","lines": ["32 Ford Street","San Francisco 63913"]},"firstName": "Evelyn","middleName": "","lastName": "Pittman","email": "pittman@mail.com","companyEmail": "evelyn.pittman@woodsllc.eu","username": "epittman5795354","password": "VpEfFmzG","sex": "FEMALE","telephoneNumber": "368-005-109","dateOfBirth": "1917-05-14T16:47:06.273Z","company": {"name": "Woods LLC","domain": "woodsllc.eu","email": "contact@woodsllc.eu","vatIdentificationNumber": "30-0005081","url": "http://www.woodsllc.eu"},"nationalIdentityCardNumber": "713-79-5185","nationalIdentificationNumber": "","passportNumber": "jVeyZLSt3"
}
整齐! 不幸的是,没有记录jFairy是否是线程安全的,因此以防万一在实际代码中,我正在使用ThreadLocal
。 好的,所以我们只有一个文档,但是我们需要数百万个文档! 使用for
-loop太过时了。 您会告诉我们无限的随机人流吗?
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;private final Scheduler scheduler = Schedulers.newParallel(PersonGenerator.class.getSimpleName());Mono<Doc> generateOne() {return Mono.fromCallable(this::generate).subscribeOn(scheduler);
}Flux<Doc> infinite() {return generateOne().repeat();
}
generateOne()
在Mono<Doc>
包装阻塞的generate()
方法。 另外, generate()
在parallel
Scheduler
上运行。 为什么? 事实证明,jFairy在单个内核上还不够快(很多随机数生成,表查找等),因此我不得不并行化数据生成。 通常不应该是一个问题。 但是,当生成伪造数据的速度比接触外部服务器的响应式应用程序慢时,它会告诉您有关基于Netty的Spring Web-flux(!)的性能。
同时调用ElasticSearch
好的,拥有无数好看的假测试数据流,我们现在要在ElasticSearch中对其进行索引。
@PostConstruct
void startIndexing() {index(1_000_000, 1_000);
}private void index(int count, int maxConcurrency) {personGenerator.infinite().take(count).flatMap(this::indexDocSwallowErrors, maxConcurrency).window(Duration.ofSeconds(1)).flatMap(Flux::count).subscribe(winSize -> log.debug("Got {} responses in last second", winSize));
}private Mono<IndexResponse> indexDocSwallowErrors(Doc doc) {return indexDoc(doc).doOnError(e -> log.error("Unable to index {}", doc, e)).onErrorResume(e -> Mono.empty());
}
当应用程序启动时,它将启动对一百万个文档的索引编制。 注意,告诉Reactor(它与RxJava相同)多么容易,它应该调用多达1000个对ElasticSearch的并发请求。 每秒我们计算收到的回复数:
Got 2925 responses in last second
Got 2415 responses in last second
Got 3336 responses in last second
Got 2199 responses in last second
Got 1861 responses in last second
不错! 特别是当您考虑到有多达一千个并发HTTP请求并且我们的应用程序启动时几乎只有30个线程峰值(!),好吧,这是localhost
<-> localhost
,有罪! 但是,我们实际上如何知道所有这些呢? 日志记录很好,但是到了二十一世纪,我们可以做得更好! 监视将是下一批的主题。
源代码可在react reactive-elastic-search
分支中的github.com/nurkiewicz/elastic-flux中获得。
翻译自: https://www.javacodegeeks.com/2018/01/spring-reactor-elasticsearch-bechmarking-fake-test-data.html