英文:
What is the best option to execute 200 requests per minute to an external API in a multi-threaded Spring Boot application?
问题
所以,有一个外部服务器(游戏)。在那里有一个市场。有很多产品及其组合。它们的总数是2146。
我想要定时从服务器上获取最新的定价信息。
当应用程序启动时,我创建了2146个任务,每个任务负责自己类型的产品。这些任务在单独的线程中以2.5秒的延迟运行。
@EventListener(ApplicationReadyEvent.class)
public void start() {
log.info("Let's get party started!");
Set<MarketplaceCollector> collectorSet = marketplaceCollectorProviders.stream()
.flatMap(provider -> provider.getCollectors().stream())
.peek(this::subscribeOfferDBSubscriber)
.collect(Collectors.toSet());
collectors.addAll(collectorSet);
runTasks();
}
private void subscribeOfferDBSubscriber(MarketplaceCollector marketplaceCollector) {
marketplaceCollector.subscribe(marketplaceOfferDBSubscriber);
}
private void runTasks() {
Thread thread = new Thread(() -> collectors.forEach(this::runWithDelay));
thread.setName("tasks-runner");
thread.start();
}
private void runWithDelay(Collector collector) {
try {
collector.collect();
Thread.sleep(2_500);
counter += 1;
} catch (InterruptedException e) {
log.error(e);
}
log.debug(counter);
}
我使用 RestTemplate 访问服务器。如果价格发生变化,这个任务将在1分钟后再次完成。如果价格保持不变,就在等待时间上加一分钟,然后再次发起请求。因此,如果价格不变,一个产品的请求之间的最大时间间隔将为20分钟。我假设我的应用程序每分钟最多执行200次请求,否则会收到“请求过多”的错误。
@Override
public void collect() {
executorService.schedule(new MarketplaceTask(), INIT_DELAY, MILLISECONDS);
}
private MarketplaceRequest request() {
return MarketplaceRequest.builder()
.country(country)
.industry(industry)
.quality(quality)
.orderBy(ASC)
.currentPage(1)
.build();
}
private class MarketplaceTask implements Runnable {
private long MIN_DELAY = 60; // 1 minute
private long MAX_DELAY = 1200; // 20 minutes
private Double PREVIOUS_PRICE = Double.MAX_VALUE;
private long DELAY = 0; // seconds
@Override
public void run() {
log.debug(format("Collecting offer of %s %s in %s after %d m delay", industry, quality, country, DELAY / 60));
MarketplaceResponse response = marketplaceClient.getOffers(request());
subscribers.forEach(s -> s.onSubscription(response));
updatePreviousPriceAndPeriod(response);
executorService.schedule(this, DELAY, SECONDS);
}
private void updatePreviousPriceAndPeriod(MarketplaceResponse response) {
if (response.isError() || response.getOffers().isEmpty()) {
increasePeriod();
} else {
Double currentPrice = response.getOffers().get(0).getPriceWithTaxes();
if (isPriceChanged(currentPrice)) {
setMinimalDelay();
PREVIOUS_PRICE = currentPrice;
} else {
increasePeriod();
}
}
}
private void increasePeriod() {
if (DELAY < MAX_DELAY) {
DELAY += 60;
}
}
private boolean isPriceChanged(Double currentPrice) {
return !Objects.equals(currentPrice, PREVIOUS_PRICE);
}
private void setMinimalDelay() {
DELAY = MIN_DELAY;
}
}
public MarketplaceClient(@Value("${app.host}") String host,
AuthenticationService authenticationService,
RestTemplateBuilder restTemplateBuilder,
CommonHeadersComposer headersComposer) {
this.host = host;
this.authenticationService = authenticationService;
this.restTemplateList = restTemplateBuilder.build();
this.headersComposer = headersComposer;
}
public MarketplaceResponse getOffers(MarketplaceRequest request) {
var authentication = authenticationService.getAuthentication();
var requestEntity = new HttpEntity<>(requestString(request, authentication), headersComposer.getHeaders());
log.debug(message("PING for", request));
var responseEntity = restTemplate.exchange(host + MARKET_URL, POST, requestEntity, MarketplaceResponse.class);
log.debug(message("PONG for", request));
if (responseEntity.getBody().isError()) {
log.warn("{}: {} {} in {}", responseEntity.getBody().getMessage(), request.getIndustry(), request.getQuality(), request.getCountry());
}
return responseEntity.getBody();
}
private String requestString(MarketplaceRequest request, Authentication authentication) {
return format("countryId=%s&industryId=%s&quality=%s&orderBy=%s¤tPage=%s&ajaxMarket=1&_token=%s",
request.getCountry().getId(), request.getIndustry().getId(), request.getQuality().getValue(),
request.getOrderBy().getValue(), request.getCurrentPage(), authentication.getToken());
}
但是在应用程序运行几分钟后,我遇到了问题。一些任务停止工作。请求可能会发送到服务器但不返回。然而,其他任务却没有问题。日志中的行为如下(例如):
2020-04-04 14:11:58.267 INFO 3546 --- [ main] c.g.d.e.harvesting.CollectorManager : Let's get party started!
2020-04-04 14:11:58.302 DEBUG 3546 --- [pool-1-thread-1] c.g.d.e.harvesting.MarketplaceCollector : Collecting offer of WEAPONS Q5 in GREECE after 0 m delay
2020-04-04 14:11:58.379 DEBUG 3546 --- [pool-1-thread-1] c.g.d.e.market.api.MarketplaceClient : PING for: WEAPONS Q5 in GREECE
2020-04-04 14:11:59.217 DEBUG 3546 --- [pool-1-thread-1] c.g.d.e.market.api.MarketplaceClient : PONG for: WEAPONS Q5 in GREECE
2020-04-04 14:12:00.805 DEBUG 3546 --- [ tasks-runner] c.g.d.e.harvesting.CollectorManager : 1
2020-04-04 14:12:00.806 DEBUG 3546 --- [pool-1-thread-1] c.g.d.e.harvesting.MarketplaceCollector : Collecting offer of WEAPONS Q4 in PAKISTAN after 0 m delay
2020-04-04 14:12:00.807 DEBUG 3546 --- [pool-1-thread-1] c.g.d.e.market.api.MarketplaceClient : PING for: WEAPONS Q4 in PAKISTAN
2020-
<details>
<summary>英文:</summary>
So, there is an external server (game). There is a market there. A lot of products and their combinations. Their total number is 2146.
I want to receive up-to-date pricing information from time to time.
When the application starts, I create 2146 tasks, each of which is responsible for its own type of product. Tasks run in a separate thread with a delay of 2.5 seconds.
@EventListener(ApplicationReadyEvent.class)
public void start() {
log.info("Let's get party started!");
Set<MarketplaceCollector> collectorSet = marketplaceCollectorProviders.stream()
.flatMap(provider -> provider.getCollectors().stream())
.peek(this::subscribeOfferDBSubscriber)
.collect(Collectors.toSet());
collectors.addAll(collectorSet);
runTasks();
}
private void subscribeOfferDBSubscriber(MarketplaceCollector marketplaceCollector) {
marketplaceCollector.subscribe(marketplaceOfferDBSubscriber);
}
private void runTasks() {
Thread thread = new Thread(() -> collectors.forEach(this::runWithDelay));
thread.setName("tasks-runner");
thread.start();
}
private void runWithDelay(Collector collector) {
try {
collector.collect();
Thread.sleep(2_500);
counter += 1;
} catch (InterruptedException e) {
log.error(e);
}
log.debug(counter);
}
Using RestTemplate, I access the server. If the price has changed, this task will be completed again after 1 minute. If the price remains the same, add one minute to the wait and again make a request. Thus, if the price does not change, the maximum time between requests for one product will be 20 minutes. I assume that my application will execute up to 200 requests per minute, otherwise I will get a "too many requests" error.
@Override
public void collect() {
executorService.schedule(new MarketplaceTask(), INIT_DELAY, MILLISECONDS);
}
private MarketplaceRequest request() {
return MarketplaceRequest.builder()
.country(country)
.industry(industry)
.quality(quality)
.orderBy(ASC)
.currentPage(1)
.build();
}
private class MarketplaceTask implements Runnable {
private long MIN_DELAY = 60; // 1 minute
private long MAX_DELAY = 1200; // 20 minutes
private Double PREVIOUS_PRICE = Double.MAX_VALUE;
private long DELAY = 0; // seconds
@Override
public void run() {
log.debug(format("Collecting offer of %s %s in %s after %d m delay", industry, quality, country, DELAY / 60));
MarketplaceResponse response = marketplaceClient.getOffers(request());
subscribers.forEach(s -> s.onSubscription(response));
updatePreviousPriceAndPeriod(response);
executorService.schedule(this, DELAY, SECONDS);
}
private void updatePreviousPriceAndPeriod(MarketplaceResponse response) {
if (response.isError() || response.getOffers().isEmpty()) {
increasePeriod();
} else {
Double currentPrice = response.getOffers().get(0).getPriceWithTaxes();
if (isPriceChanged(currentPrice)) {
setMinimalDelay();
PREVIOUS_PRICE = currentPrice;
} else {
increasePeriod();
}
}
}
private void increasePeriod() {
if (DELAY < MAX_DELAY) {
DELAY += 60;
}
}
private boolean isPriceChanged(Double currentPrice) {
return !Objects.equals(currentPrice, PREVIOUS_PRICE);
}
private void setMinimalDelay() {
DELAY = MIN_DELAY;
}
}
public MarketplaceClient(@Value("${app.host}") String host,
AuthenticationService authenticationService,
RestTemplateBuilder restTemplateBuilder,
CommonHeadersComposer headersComposer) {
this.host = host;
this.authenticationService = authenticationService;
this.restTemplateList = restTemplateBuilder.build();
this.headersComposer = headersComposer;
}
public MarketplaceResponse getOffers(MarketplaceRequest request) {
var authentication = authenticationService.getAuthentication();
var requestEntity = new HttpEntity<>(requestString(request, authentication), headersComposer.getHeaders());
log.debug(message("PING for", request));
var responseEntity = restTemplate.exchange(host + MARKET_URL, POST, requestEntity, MarketplaceResponse.class);
log.debug(message("PONG for", request));
if (responseEntity.getBody().isError()) {
log.warn("{}: {} {} in {}", responseEntity.getBody().getMessage(), request.getIndustry(), request.getQuality(), request.getCountry());
}
return responseEntity.getBody();
}
private String requestString(MarketplaceRequest request, Authentication authentication) {
return format("countryId=%s&industryId=%s&quality=%s&orderBy=%s&currentPage=%s&ajaxMarket=1&_token=%s",
request.getCountry().getId(), request.getIndustry().getId(), request.getQuality().getValue(),
request.getOrderBy().getValue(), request.getCurrentPage(), authentication.getToken());
}
But I have a problem after a few minutes of the application. Some tasks cease to care. The request may go to the server and not return. However, other tasks work without problems. Logs how it behaves (for example):
2020-04-04 14:11:58.267 INFO 3546 --- [ main] c.g.d.e.harvesting.CollectorManager : Let's get party started!
2020-04-04 14:11:58.302 DEBUG 3546 --- [pool-1-thread-1] c.g.d.e.harvesting.MarketplaceCollector : Collecting offer of WEAPONS Q5 in GREECE after 0 m delay
2020-04-04 14:11:58.379 DEBUG 3546 --- [pool-1-thread-1] c.g.d.e.market.api.MarketplaceClient : PING for: WEAPONS Q5 in GREECE
2020-04-04 14:11:59.217 DEBUG 3546 --- [pool-1-thread-1] c.g.d.e.market.api.MarketplaceClient : PONG for: WEAPONS Q5 in GREECE
2020-04-04 14:12:00.805 DEBUG 3546 --- [ tasks-runner] c.g.d.e.harvesting.CollectorManager : 1
2020-04-04 14:12:00.806 DEBUG 3546 --- [pool-1-thread-1] c.g.d.e.harvesting.MarketplaceCollector : Collecting offer of WEAPONS Q4 in PAKISTAN after 0 m delay
2020-04-04 14:12:00.807 DEBUG 3546 --- [pool-1-thread-1] c.g.d.e.market.api.MarketplaceClient : PING for: WEAPONS Q4 in PAKISTAN
2020-04-04 14:12:03.308 DEBUG 3546 --- [ tasks-runner] c.g.d.e.harvesting.CollectorManager : 2
2020-04-04 14:12:03.309 DEBUG 3546 --- [pool-1-thread-2] c.g.d.e.harvesting.MarketplaceCollector : Collecting offer of FOOD_RAW Q1 in SAUDI_ARABIA after 0 m delay
2020-04-04 14:12:03.311 DEBUG 3546 --- [pool-1-thread-2] c.g.d.e.market.api.MarketplaceClient : PING for: FOOD_RAW Q1 in SAUDI_ARABIA
2020-04-04 14:12:05.810 DEBUG 3546 --- [ tasks-runner] c.g.d.e.harvesting.CollectorManager : 3
2020-04-04 14:12:05.810 DEBUG 3546 --- [pool-1-thread-1] c.g.d.e.harvesting.MarketplaceCollector : Collecting offer of WEAPONS Q5 in COLOMBIA after 0 m delay
2020-04-04 14:12:05.811 DEBUG 3546 --- [pool-1-thread-1] c.g.d.e.market.api.MarketplaceClient : PING for: WEAPONS Q5 in COLOMBIA
2020-04-04 14:12:08.314 DEBUG 3546 --- [ tasks-runner] c.g.d.e.harvesting.CollectorManager : 4
2020-04-04 14:12:08.315 DEBUG 3546 --- [pool-1-thread-4] c.g.d.e.harvesting.MarketplaceCollector : Collecting offer of WEAPONS Q1 in CZECH_REPUBLIC after 0 m delay
2020-04-04 14:12:08.316 DEBUG 3546 --- [pool-1-thread-4] c.g.d.e.market.api.MarketplaceClient : PING for: WEAPONS Q1 in CZECH_REPUBLIC
2020-04-04 14:12:10.818 DEBUG 3546 --- [ tasks-runner] c.g.d.e.harvesting.CollectorManager : 5
@Configuration
public class BeanConfiguration {
@Bean
public ScheduledExecutorService scheduledExecutorService() {
return Executors.newScheduledThreadPool(8);
}
}
I tried to change the connection pool for one host, but I only made it worse. I even created 200 instances of RestTemplate, but over time the access to the server ceased.
I would not want to use Spring Webflux for this purpose.
What should I do to make the app work as expected?
</details>
专注分享java语言的经验与见解,让所有开发者获益!
评论