英文:
How to return response to webflux endpoint after pushing message into google cloud pubsub?
问题
我正在使用Spring Integration创建一个简单的Spring Boot应用程序。以下是该应用程序的三个主要构造:
- Inbound Gateway: WebFluxInboundEndpoint,用于接收HTTP请求。
- Outbound Gateway: PubSubMessageHandler,用于将消息推送到Google Cloud Pub/Sub主题。
- Message Channel: FluxMessageChannel,作为请求通道。
由于Google Cloud PubSubMessageHandler提供了失败和成功的回调,因此错误/成功响应不会返回到WebFlux端点,请求会无限期地等待。
问题:如何在从Pub/Sub接收到响应后返回成功/失败响应?
应用程序的工作副本在此处可用:https://github.com/piyushpcegarg/spring-gcp-pubsub-webflux-sample
要运行应用程序,请将您的Google Cloud服务密钥放入serviceAccountKey.json文件中,然后提供环境变量GOOGLE_APPLICATION_CREDENTIALS=/PATH_TO/serviceAccountKey.json
示例请求:curl -d "name=piyush" http://localhost:8080/createPerson
以下是接受上述请求并在转换为Spring消息后将其推送到"person" Pub/Sub主题的示例文件:
// 代码示例已省略
构建.gradle文件的依赖关系如下:
// 代码示例已省略
在将PubSubMessageHandler设置为同步并添加ExpressionEvaluatingRequestHandlerAdvice后,新的应用程序文件出现错误,当MessagingGatewaySupport创建Correlator时出现错误"'beanFactory' must not be null"。
// 代码示例已省略
发送HTTP请求后出现的错误堆栈跟踪:
// 错误堆栈跟踪已省略
英文:
I am creating a simple spring boot app using spring integration. Below are the three main constructs of this app:
- Inbound Gateway: WebFluxInboundEndpoint which accepts http request
- Outbound Gateway: PubSubMessageHandler which pushes message to google cloud pubsub topic
- Message Channel: FluxMessageChannel acting as request channel
Google cloud PubSubMessageHandler provides failure and success callback due to which error/success response is not returned back to webflux endpoint and request waits for indefinite time.
Ask: How success/failure response can be returned back after receiving response from pubsub?
Working copy of application is available here: https://github.com/piyushpcegarg/spring-gcp-pubsub-webflux-sample
To run application please put your google cloud service key in serviceAccountKey.json file and
then provide environment variable GOOGLE_APPLICATION_CREDENTIALS=/PATH_TO/serviceAccountKey.json
Sample request: curl -d "name=piyush" http://localhost:8080/createPerson
Below is the sample file which accepts above request and after converting into spring message, it pushes into pubsub topic "person"
package com.example;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* Entry point into the sample application.
*
* @author Piyush Garg
*/
@SpringBootApplication
public class PubSubWebFluxApplication {
private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);
private static final String TOPIC_NAME = "person";
public static void main(String[] args) {
SpringApplication.run(PubSubWebFluxApplication.class, args);
}
/**
* bean to deserialize request payload.
*/
@Bean
public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) {
return new JacksonPubSubMessageConverter(objectMapper);
}
@Bean
public MessageChannel pubSubOutputChannel() {
return MessageChannels.flux().get();
}
/**
* Message handler which will consume messages from message channel.
* Then it will send google cloud pubsub topic.
*/
@Bean
@ServiceActivator(inputChannel = "pubSubOutputChannel")
public MessageHandler messageSender(PubSubTemplate pubSubTemplate) {
PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
handler.setPublishCallback(new ListenableFutureCallback<>() {
@Override
public void onFailure(Throwable ex) {
LOGGER.info("There was an error sending the message.");
}
@Override
public void onSuccess(String result) {
LOGGER.info("Message was sent successfully.");
}
});
return handler;
}
/**
* Webflux endpoint to consume http request.
*/
@Bean
public WebFluxInboundEndpoint webFluxInboundEndpoint() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setMethods(HttpMethod.POST);
requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
requestMapping.setPathPatterns("/createPerson");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannel(pubSubOutputChannel());
return endpoint;
}
}
The build.gradle dependencies are:
plugins {
id 'org.springframework.boot' version '2.2.6.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
ext {
set('springCloudVersion', "Hoxton.SR4")
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-integration'
implementation 'org.springframework.integration:spring-integration-webflux'
implementation 'org.springframework.cloud:spring-cloud-gcp-starter-pubsub'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}
test {
useJUnitPlatform()
}
New application file after making PubSubMessageHandler as sync and adding ExpressionEvaluatingRequestHandlerAdvice but this is giving error "'beanFactory' must not be null" when MessagingGatewaySupport creates Correlator.
package com.example;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* Entry point into the sample application.
*
* @author Piyush Garg
*/
@SpringBootApplication
public class PubSubWebFluxApplication {
private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);
private static final String TOPIC_NAME = "person";
public static void main(String[] args) {
SpringApplication.run(PubSubWebFluxApplication.class, args);
}
/**
* bean to deserialize request payload.
*/
@Bean
public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) {
return new JacksonPubSubMessageConverter(objectMapper);
}
@Bean
public MessageChannel pubSubOutputChannel() {
return MessageChannels.flux().get();
}
@Bean
public MessageChannel replyChannel() {
return MessageChannels.flux().get();
}
@Bean
public MessageChannel errorChannel() {
return MessageChannels.flux().get();
}
/**
* Message handler which will consume messages from message channel.
* Then it will send google cloud pubsub topic.
*/
@Bean
@ServiceActivator(
inputChannel = "pubSubOutputChannel",
adviceChain = "expressionAdvice"
)
public MessageHandler messageSender(PubSubTemplate pubSubTemplate) {
PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
handler.setSync(true);
return handler;
}
/**
* Webflux endpoint to consume http request.
*/
@Bean
public WebFluxInboundEndpoint webFluxInboundEndpoint() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setMethods(HttpMethod.POST);
requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
requestMapping.setPathPatterns("/createPerson");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannel(pubSubOutputChannel());
endpoint.setReplyChannel(replyChannel());
endpoint.setErrorChannel(errorChannel());
return endpoint;
}
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannel(replyChannel());
advice.setFailureChannel(errorChannel());
return advice;
}
}
Stacktrace of error which is coming after sending http request:
2020-05-04 16:23:47.371 ERROR 59089 --- [ctor-http-nio-3] a.w.r.e.AbstractErrorWebExceptionHandler : [fd79ecbb-1] 500 Server Error for HTTP POST "/createPerson"
java.lang.IllegalArgumentException: 'beanFactory' must not be null
at org.springframework.util.Assert.notNull(Assert.java:198) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ HTTP POST "/createPerson" [ExceptionHandlingWebHandler]
Stack trace:
at org.springframework.util.Assert.notNull(Assert.java:198) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.channel.ChannelUtils.getErrorHandler(ChannelUtils.java:52) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.endpoint.ReactiveStreamsConsumer.onInit(ReactiveStreamsConsumer.java:126) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.context.IntegrationObjectSupport.afterPropertiesSet(IntegrationObjectSupport.java:214) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.gateway.MessagingGatewaySupport.registerReplyMessageCorrelatorIfNecessary(MessagingGatewaySupport.java:799) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessageReactive(MessagingGatewaySupport.java:602) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
答案1
得分: 0
PubSubMessageHandler
并非设计用于请求/回复行为。在大多数情况下,它被用作发送并忘记。
如果你真的关心成功/失败的回复,我只能提出类似以下的建议:
-
PubSubMessageHandler.setSync(true)
:/** * 将发布方法设置为同步或异步。 * * <p>默认情况下,发布是异步的。 * @param sync true表示同步,false表示异步 */ public void setSync(boolean sync) {
这样,你的 PubSubMessageHandler
将等待 pubsubFuture.get();
,如果失败,将抛出 MessageHandlingException
。
-
为了处理这种
sync
场景的成功或失败,我建议查看ExpressionEvaluatingRequestHandlerAdvice
,其中包括successChannel
和failureChannel
。在onSuccessExpression
中,我认为应该使用#root
指向一个requestMessage
。onFailureExpression
可以参考一个#exception
SpEL 表达式变量,但仍然要将一个requestMessage
传递给failureChannel
。我之所以谈到requestMessage
,是因为它有一个重要的replyChannel
用于响应那个WebFluxInboundEndpoint
请求。有关更多信息,请参阅文档:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message-handler-advice-chain -
successChannel
和failureChannel
以及失败子流应该正确地使用一些返回值,将其outputChannel
置空。
但与此同时,我完全同意,将 PubSubMessageHandler
设计为 AbstractReplyProducingMessageHandler
并返回一些 ListenableFuture
将更容易让我们处理发布结果。
英文:
The PubSubMessageHandler
is not designed for request/reply behavior.
In most cases it is used as send-n-forget.
Since you really worry about a success/failure reply, I only can suggest something like:
-
PubSubMessageHandler.setSync(true)
:/** * Set publish method to be synchronous or asynchronous. * * <p>Publish is asynchronous be default. * @param sync true for synchronous, false for asynchronous */ public void setSync(boolean sync) {
This way your PubSubMessageHandler
is going to wait for pubsubFuture.get();
and if it fails a MessageHandlingException
is going to be thrown.
-
To handle success or failure for this
sync
scenario i suggest to take a look into anExpressionEvaluatingRequestHandlerAdvice
with itssuccessChannel
andfailureChannel
.
Where ononSuccessExpression
I think should#root
pointing to arequestMessage
.
TheonFailureExpression
may consult an#exception
SpEL expression variable, but still propagate arequestMessage
to thefailureChannel
. The reason I talk about arequestMessage
because it has that importantreplyChannel
to respond to thatWebFluxInboundEndpoint
request.
See more info in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message-handler-advice-chain -
Those
successChannel
andfailureChannel
and failure sub-flows should reply properly with some return leaving theiroutputChannel
empty.
But at the same time I totally agree that it would be much easier to make that PubSubMessageHandler
as AbstractReplyProducingMessageHandler
returning some ListenableFuture
to let us to process a publishing result.
答案2
得分: 0
感谢 @Artem。我通过提供自定义的请求处理程序建议来解决了这个问题,该建议在成功的情况下通过识别消息头中的 replyChannel 来发送消息有效载荷作为对 weblflux 端点的响应。
对于错误情况,我依赖于 ReactiveStreamsConsumer 的错误处理机制,它在内部使用 errorChannel 将错误发送回 weblflux 端点。
请您确认这个实现是否正确。
以下是 PubSubRequestHandlerAdvice 的代码:
package com.example;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice;
import org.springframework.integration.message.AdviceMessage;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
public class PubSubRequestHandlerAdvice extends AbstractRequestHandlerAdvice {
private final MessagingTemplate messagingTemplate = new MessagingTemplate();
@Override
protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) {
Object result = callback.execute();
Object evalResult = message.getPayload();
MessageChannel successChannel = null;
Object replyChannelHeader = message.getHeaders().getReplyChannel();
if (replyChannelHeader instanceof MessageChannel) {
successChannel = (MessageChannel) replyChannelHeader;
}
if (evalResult != null && successChannel != null) {
AdviceMessage<?> resultMessage = new AdviceMessage<>(evalResult, message);
this.messagingTemplate.send(successChannel, resultMessage);
}
return result;
}
}
使用 PubSubRequestHandlerAdvice 处理 PubSubMessageHandler 的最终应用程序文件:
package com.example;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* 应用程序示例的入口点。
*
* 作者:Piyush Garg
*/
@SpringBootApplication
public class PubSubWebFluxApplication {
private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);
private static final String TOPIC_NAME = "person";
public static void main(String[] args) {
SpringApplication.run(PubSubWebFluxApplication.class, args);
}
/**
* 用于反序列化请求有效载荷的 bean。
*/
@Bean
public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) {
return new JacksonPubSubMessageConverter(objectMapper);
}
@Bean
public MessageChannel pubSubOutputChannel() {
return MessageChannels.flux().get();
}
/**
* 消息处理程序,将从消息通道中消费消息。然后它将发送到 Google Cloud Pub/Sub 主题。
*/
@Bean
@ServiceActivator(
inputChannel = "pubSubOutputChannel",
adviceChain = "pubSubAdvice"
)
public MessageHandler messageSender(PubSubTemplate pubSubTemplate) {
PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
handler.setSync(true);
return handler;
}
/**
* 用于消费 HTTP 请求的 Webflux 端点。
*/
@Bean
public WebFluxInboundEndpoint webFluxInboundEndpoint() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setMethods(HttpMethod.POST);
requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
requestMapping.setPathPatterns("/createPerson");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannel(pubSubOutputChannel());
return endpoint;
}
@Bean
public Advice pubSubAdvice() {
return new PubSubRequestHandlerAdvice();
}
}
可以在此处找到应用程序的工作副本:https://github.com/piyushpcegarg/spring-gcp-pubsub-webflux-sample
英文:
Thanks @Artem. I resolved it by providing custom request handler advice which is identifying replyChannel from message header in success scenario and sending message payload in response to weblflux endpoint.
For error scenario, I am relying on error handling mechanism of ReactiveStreamsConsumer which internally uses errorChannel to send the error back to weblflux endpoint.
Please advice whether this implementation is correct.
Below is the code for PubSubRequestHandlerAdvice:
package com.example;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice;
import org.springframework.integration.message.AdviceMessage;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
public class PubSubRequestHandlerAdvice extends AbstractRequestHandlerAdvice {
private final MessagingTemplate messagingTemplate = new MessagingTemplate();
@Override
protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) {
Object result = callback.execute();
Object evalResult = message.getPayload();
MessageChannel successChannel = null;
Object replyChannelHeader = message.getHeaders().getReplyChannel();
if (replyChannelHeader instanceof MessageChannel) {
successChannel = (MessageChannel) replyChannelHeader;
}
if (evalResult != null && successChannel != null) {
AdviceMessage<?> resultMessage = new AdviceMessage<>(evalResult, message);
this.messagingTemplate.send(successChannel, resultMessage);
}
return result;
}
}
Final application file which is using PubSubRequestHandlerAdvice for PubSubMessageHandler.
package com.example;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* Entry point into the sample application.
*
* @author Piyush Garg
*/
@SpringBootApplication
public class PubSubWebFluxApplication {
private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);
private static final String TOPIC_NAME = "person";
public static void main(String[] args) {
SpringApplication.run(PubSubWebFluxApplication.class, args);
}
/**
* bean to deserialize request payload.
*/
@Bean
public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) {
return new JacksonPubSubMessageConverter(objectMapper);
}
@Bean
public MessageChannel pubSubOutputChannel() {
return MessageChannels.flux().get();
}
/**
* Message handler which will consume messages from message channel.
* Then it will send google cloud pubsub topic.
*/
@Bean
@ServiceActivator(
inputChannel = "pubSubOutputChannel",
adviceChain = "pubSubAdvice"
)
public MessageHandler messageSender(PubSubTemplate pubSubTemplate) {
PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
handler.setSync(true);
return handler;
}
/**
* Webflux endpoint to consume http request.
*/
@Bean
public WebFluxInboundEndpoint webFluxInboundEndpoint() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setMethods(HttpMethod.POST);
requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
requestMapping.setPathPatterns("/createPerson");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannel(pubSubOutputChannel());
return endpoint;
}
@Bean
public Advice pubSubAdvice() {
return new PubSubRequestHandlerAdvice();
}
}
Working copy of application is available here: https://github.com/piyushpcegarg/spring-gcp-pubsub-webflux-sample
专注分享java语言的经验与见解,让所有开发者获益!
评论