如何在将消息推送到 Google Cloud Pub/Sub 后将响应返回给 WebFlux 端点?

huangapple 未分类评论49阅读模式
英文:

How to return response to webflux endpoint after pushing message into google cloud pubsub?

问题

我正在使用Spring Integration创建一个简单的Spring Boot应用程序。以下是该应用程序的三个主要构造:

  1. Inbound Gateway: WebFluxInboundEndpoint,用于接收HTTP请求。
  2. Outbound Gateway: PubSubMessageHandler,用于将消息推送到Google Cloud Pub/Sub主题。
  3. Message Channel: FluxMessageChannel,作为请求通道。

由于Google Cloud PubSubMessageHandler提供了失败和成功的回调,因此错误/成功响应不会返回到WebFlux端点,请求会无限期地等待。

问题:如何在从Pub/Sub接收到响应后返回成功/失败响应?

应用程序的工作副本在此处可用:https://github.com/piyushpcegarg/spring-gcp-pubsub-webflux-sample

要运行应用程序,请将您的Google Cloud服务密钥放入serviceAccountKey.json文件中,然后提供环境变量GOOGLE_APPLICATION_CREDENTIALS=/PATH_TO/serviceAccountKey.json

示例请求:curl -d "name=piyush" http://localhost:8080/createPerson

以下是接受上述请求并在转换为Spring消息后将其推送到"person" Pub/Sub主题的示例文件:

// 代码示例已省略

构建.gradle文件的依赖关系如下:

// 代码示例已省略

在将PubSubMessageHandler设置为同步并添加ExpressionEvaluatingRequestHandlerAdvice后,新的应用程序文件出现错误,当MessagingGatewaySupport创建Correlator时出现错误"'beanFactory' must not be null"。

// 代码示例已省略

发送HTTP请求后出现的错误堆栈跟踪:

// 错误堆栈跟踪已省略
英文:

I am creating a simple spring boot app using spring integration. Below are the three main constructs of this app:

  1. Inbound Gateway: WebFluxInboundEndpoint which accepts http request
  2. Outbound Gateway: PubSubMessageHandler which pushes message to google cloud pubsub topic
  3. Message Channel: FluxMessageChannel acting as request channel

Google cloud PubSubMessageHandler provides failure and success callback due to which error/success response is not returned back to webflux endpoint and request waits for indefinite time.

Ask: How success/failure response can be returned back after receiving response from pubsub?

Working copy of application is available here: https://github.com/piyushpcegarg/spring-gcp-pubsub-webflux-sample

To run application please put your google cloud service key in serviceAccountKey.json file and
then provide environment variable GOOGLE_APPLICATION_CREDENTIALS=/PATH_TO/serviceAccountKey.json

Sample request: curl -d "name=piyush" http://localhost:8080/createPerson

Below is the sample file which accepts above request and after converting into spring message, it pushes into pubsub topic "person"

package com.example;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * Entry point into the sample application.
 *
 * @author Piyush Garg
 */
@SpringBootApplication
public class PubSubWebFluxApplication {

	private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);

	private static final String TOPIC_NAME = "person";

	public static void main(String[] args) {
		SpringApplication.run(PubSubWebFluxApplication.class, args);
	}

	/**
	 * bean to deserialize request payload.
	 */
	@Bean
	public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) {
		return new JacksonPubSubMessageConverter(objectMapper);
	}

	@Bean
	public MessageChannel pubSubOutputChannel() {
		return MessageChannels.flux().get();
	}

	/**
	 * Message handler which will consume messages from message channel.
	 * Then it will send google cloud pubsub topic.
	 */
	@Bean
	@ServiceActivator(inputChannel = "pubSubOutputChannel")
	public MessageHandler messageSender(PubSubTemplate pubSubTemplate) {
		PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
		handler.setPublishCallback(new ListenableFutureCallback<>() {
			@Override
			public void onFailure(Throwable ex) {
				LOGGER.info("There was an error sending the message.");
			}

			@Override
			public void onSuccess(String result) {
				LOGGER.info("Message was sent successfully.");
			}
		});

		return handler;
	}

	/**
	 * Webflux endpoint to consume http request.
	 */
	@Bean
	public WebFluxInboundEndpoint webFluxInboundEndpoint() {

		WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();

		RequestMapping requestMapping = new RequestMapping();
		requestMapping.setMethods(HttpMethod.POST);
		requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
		requestMapping.setPathPatterns("/createPerson");
		endpoint.setRequestMapping(requestMapping);

		endpoint.setRequestChannel(pubSubOutputChannel());

		return endpoint;
	}
}

The build.gradle dependencies are:

plugins {
	id 'org.springframework.boot' version '2.2.6.RELEASE'
	id 'io.spring.dependency-management' version '1.0.9.RELEASE'
	id 'java'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

configurations {
	compileOnly {
		extendsFrom annotationProcessor
	}
}

repositories {
	mavenCentral()
}

ext {
	set('springCloudVersion', "Hoxton.SR4")
}

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-webflux'
	implementation 'org.springframework.boot:spring-boot-starter-integration'
	implementation 'org.springframework.integration:spring-integration-webflux'
	implementation 'org.springframework.cloud:spring-cloud-gcp-starter-pubsub'
	testImplementation('org.springframework.boot:spring-boot-starter-test') {
		exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
	}
}

dependencyManagement {
	imports {
		mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
	}
}

test {
	useJUnitPlatform()
}

New application file after making PubSubMessageHandler as sync and adding ExpressionEvaluatingRequestHandlerAdvice but this is giving error "'beanFactory' must not be null" when MessagingGatewaySupport creates Correlator.

package com.example;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

/**
 * Entry point into the sample application.
 *
 * @author Piyush Garg
 */
@SpringBootApplication
public class PubSubWebFluxApplication {

	private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);

	private static final String TOPIC_NAME = "person";

	public static void main(String[] args) {
		SpringApplication.run(PubSubWebFluxApplication.class, args);
	}

	/**
	 * bean to deserialize request payload.
	 */
	@Bean
	public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) {
		return new JacksonPubSubMessageConverter(objectMapper);
	}

	@Bean
	public MessageChannel pubSubOutputChannel() {
		return MessageChannels.flux().get();
	}

	@Bean
	public MessageChannel replyChannel() {
		return MessageChannels.flux().get();
	}

	@Bean
	public MessageChannel errorChannel() {
		return MessageChannels.flux().get();
	}

	/**
	 * Message handler which will consume messages from message channel.
	 * Then it will send google cloud pubsub topic.
	 */
	@Bean
	@ServiceActivator(
			inputChannel = "pubSubOutputChannel",
			adviceChain = "expressionAdvice"
	)
	public MessageHandler messageSender(PubSubTemplate pubSubTemplate) {
		PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
		handler.setSync(true);
		return handler;
	}

	/**
	 * Webflux endpoint to consume http request.
	 */
	@Bean
	public WebFluxInboundEndpoint webFluxInboundEndpoint() {

		WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();

		RequestMapping requestMapping = new RequestMapping();
		requestMapping.setMethods(HttpMethod.POST);
		requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
		requestMapping.setPathPatterns("/createPerson");
		endpoint.setRequestMapping(requestMapping);

		endpoint.setRequestChannel(pubSubOutputChannel());
		endpoint.setReplyChannel(replyChannel());
		endpoint.setErrorChannel(errorChannel());

		return endpoint;
	}

	@Bean
	public Advice expressionAdvice() {
		ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
		advice.setSuccessChannel(replyChannel());
		advice.setFailureChannel(errorChannel());
		return advice;
	}
}

Stacktrace of error which is coming after sending http request:

2020-05-04 16:23:47.371 ERROR 59089 --- [ctor-http-nio-3] a.w.r.e.AbstractErrorWebExceptionHandler : [fd79ecbb-1]  500 Server Error for HTTP POST "/createPerson"

java.lang.IllegalArgumentException: 'beanFactory' must not be null
	at org.springframework.util.Assert.notNull(Assert.java:198) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	|_ checkpoint ⇢ HTTP POST "/createPerson" [ExceptionHandlingWebHandler]
Stack trace:
		at org.springframework.util.Assert.notNull(Assert.java:198) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
		at org.springframework.integration.channel.ChannelUtils.getErrorHandler(ChannelUtils.java:52) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
		at org.springframework.integration.endpoint.ReactiveStreamsConsumer.onInit(ReactiveStreamsConsumer.java:126) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
		at org.springframework.integration.context.IntegrationObjectSupport.afterPropertiesSet(IntegrationObjectSupport.java:214) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
		at org.springframework.integration.gateway.MessagingGatewaySupport.registerReplyMessageCorrelatorIfNecessary(MessagingGatewaySupport.java:799) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
		at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessageReactive(MessagingGatewaySupport.java:602) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]

答案1

得分: 0

PubSubMessageHandler并非设计用于请求/回复行为。在大多数情况下,它被用作发送并忘记。

如果你真的关心成功/失败的回复,我只能提出类似以下的建议:

  1. PubSubMessageHandler.setSync(true)

     /**
      * 将发布方法设置为同步或异步。
      *
      * <p>默认情况下,发布是异步的。
      * @param sync true表示同步,false表示异步
      */
     public void setSync(boolean sync) {
    

这样,你的 PubSubMessageHandler 将等待 pubsubFuture.get();,如果失败,将抛出 MessageHandlingException

  1. 为了处理这种 sync 场景的成功或失败,我建议查看 ExpressionEvaluatingRequestHandlerAdvice,其中包括 successChannelfailureChannel。在 onSuccessExpression 中,我认为应该使用 #root 指向一个 requestMessageonFailureExpression 可以参考一个 #exception SpEL 表达式变量,但仍然要将一个 requestMessage 传递给 failureChannel。我之所以谈到 requestMessage,是因为它有一个重要的 replyChannel 用于响应那个 WebFluxInboundEndpoint 请求。有关更多信息,请参阅文档:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message-handler-advice-chain

  2. successChannelfailureChannel 以及失败子流应该正确地使用一些返回值,将其 outputChannel 置空。

但与此同时,我完全同意,将 PubSubMessageHandler 设计为 AbstractReplyProducingMessageHandler 并返回一些 ListenableFuture 将更容易让我们处理发布结果。

英文:

The PubSubMessageHandler is not designed for request/reply behavior.
In most cases it is used as send-n-forget.

Since you really worry about a success/failure reply, I only can suggest something like:

  1. PubSubMessageHandler.setSync(true):

     /**
      * Set publish method to be synchronous or asynchronous.
      *
      * &lt;p&gt;Publish is asynchronous be default.
      * @param sync true for synchronous, false for asynchronous
      */
     public void setSync(boolean sync) {
    

This way your PubSubMessageHandler is going to wait for pubsubFuture.get(); and if it fails a MessageHandlingException is going to be thrown.

  1. To handle success or failure for this sync scenario i suggest to take a look into an ExpressionEvaluatingRequestHandlerAdvice with its successChannel and failureChannel.
    Where on onSuccessExpression I think should #root pointing to a requestMessage.
    The onFailureExpression may consult an #exception SpEL expression variable, but still propagate a requestMessage to the failureChannel. The reason I talk about a requestMessage because it has that important replyChannel to respond to that WebFluxInboundEndpoint request.
    See more info in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message-handler-advice-chain

  2. Those successChannel and failureChannel and failure sub-flows should reply properly with some return leaving their outputChannel empty.

But at the same time I totally agree that it would be much easier to make that PubSubMessageHandler as AbstractReplyProducingMessageHandler returning some ListenableFuture to let us to process a publishing result.

答案2

得分: 0

感谢 @Artem。我通过提供自定义的请求处理程序建议来解决了这个问题,该建议在成功的情况下通过识别消息头中的 replyChannel 来发送消息有效载荷作为对 weblflux 端点的响应。

对于错误情况,我依赖于 ReactiveStreamsConsumer 的错误处理机制,它在内部使用 errorChannel 将错误发送回 weblflux 端点。

请您确认这个实现是否正确。

以下是 PubSubRequestHandlerAdvice 的代码:

package com.example;

import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice;
import org.springframework.integration.message.AdviceMessage;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;

public class PubSubRequestHandlerAdvice extends AbstractRequestHandlerAdvice {

  private final MessagingTemplate messagingTemplate = new MessagingTemplate();

  @Override
  protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) {

    Object result = callback.execute();

    Object evalResult = message.getPayload();
    MessageChannel successChannel = null;
    Object replyChannelHeader = message.getHeaders().getReplyChannel();
    if (replyChannelHeader instanceof MessageChannel) {
      successChannel = (MessageChannel) replyChannelHeader;
    }

    if (evalResult != null && successChannel != null) {
      AdviceMessage<?> resultMessage = new AdviceMessage<>(evalResult, message);
      this.messagingTemplate.send(successChannel, resultMessage);
    }
    return result;
  }
}

使用 PubSubRequestHandlerAdvice 处理 PubSubMessageHandler 的最终应用程序文件:

package com.example;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

/**
 * 应用程序示例的入口点。
 *
 * 作者:Piyush Garg
 */
@SpringBootApplication
public class PubSubWebFluxApplication {

  private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);

  private static final String TOPIC_NAME = "person";

  public static void main(String[] args) {
    SpringApplication.run(PubSubWebFluxApplication.class, args);
  }

  /**
   * 用于反序列化请求有效载荷的 bean。
   */
  @Bean
  public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) {
    return new JacksonPubSubMessageConverter(objectMapper);
  }

  @Bean
  public MessageChannel pubSubOutputChannel() {
    return MessageChannels.flux().get();
  }

  /**
   * 消息处理程序,将从消息通道中消费消息。然后它将发送到 Google Cloud Pub/Sub 主题。
   */
  @Bean
  @ServiceActivator(
      inputChannel = "pubSubOutputChannel",
      adviceChain = "pubSubAdvice"
  )
  public MessageHandler messageSender(PubSubTemplate pubSubTemplate) {
    PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
    handler.setSync(true);
    return handler;
  }

  /**
   * 用于消费 HTTP 请求的 Webflux 端点。
   */
  @Bean
  public WebFluxInboundEndpoint webFluxInboundEndpoint() {

    WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();

    RequestMapping requestMapping = new RequestMapping();
    requestMapping.setMethods(HttpMethod.POST);
    requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
    requestMapping.setPathPatterns("/createPerson");
    endpoint.setRequestMapping(requestMapping);

    endpoint.setRequestChannel(pubSubOutputChannel());

    return endpoint;
  }

  @Bean
  public Advice pubSubAdvice() {
    return new PubSubRequestHandlerAdvice();
  }

}

可以在此处找到应用程序的工作副本:https://github.com/piyushpcegarg/spring-gcp-pubsub-webflux-sample

英文:

Thanks @Artem. I resolved it by providing custom request handler advice which is identifying replyChannel from message header in success scenario and sending message payload in response to weblflux endpoint.

For error scenario, I am relying on error handling mechanism of ReactiveStreamsConsumer which internally uses errorChannel to send the error back to weblflux endpoint.

Please advice whether this implementation is correct.

Below is the code for PubSubRequestHandlerAdvice:

package com.example;

import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice;
import org.springframework.integration.message.AdviceMessage;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;

public class PubSubRequestHandlerAdvice extends AbstractRequestHandlerAdvice {

  private final MessagingTemplate messagingTemplate = new MessagingTemplate();

  @Override
  protected Object doInvoke(ExecutionCallback callback, Object target, Message&lt;?&gt; message) {

    Object result = callback.execute();

    Object evalResult = message.getPayload();
    MessageChannel successChannel = null;
    Object replyChannelHeader = message.getHeaders().getReplyChannel();
    if (replyChannelHeader instanceof MessageChannel) {
      successChannel = (MessageChannel) replyChannelHeader;
    }

    if (evalResult != null &amp;&amp; successChannel != null) {
      AdviceMessage&lt;?&gt; resultMessage = new AdviceMessage&lt;&gt;(evalResult, message);
      this.messagingTemplate.send(successChannel, resultMessage);
    }
    return result;
  }
}

Final application file which is using PubSubRequestHandlerAdvice for PubSubMessageHandler.

package com.example;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

/**
 * Entry point into the sample application.
 *
 * @author Piyush Garg
 */
@SpringBootApplication
public class PubSubWebFluxApplication {

	private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);

	private static final String TOPIC_NAME = &quot;person&quot;;

	public static void main(String[] args) {
		SpringApplication.run(PubSubWebFluxApplication.class, args);
	}

	/**
	 * bean to deserialize request payload.
	 */
	@Bean
	public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) {
		return new JacksonPubSubMessageConverter(objectMapper);
	}

	@Bean
	public MessageChannel pubSubOutputChannel() {
		return MessageChannels.flux().get();
	}

	/**
	 * Message handler which will consume messages from message channel.
	 * Then it will send google cloud pubsub topic.
	 */
	@Bean
	@ServiceActivator(
			inputChannel = &quot;pubSubOutputChannel&quot;,
			adviceChain = &quot;pubSubAdvice&quot;
	)
	public MessageHandler messageSender(PubSubTemplate pubSubTemplate) {
		PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
		handler.setSync(true);
		return handler;
	}

	/**
	 * Webflux endpoint to consume http request.
	 */
	@Bean
	public WebFluxInboundEndpoint webFluxInboundEndpoint() {

		WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();

		RequestMapping requestMapping = new RequestMapping();
		requestMapping.setMethods(HttpMethod.POST);
		requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
		requestMapping.setPathPatterns(&quot;/createPerson&quot;);
		endpoint.setRequestMapping(requestMapping);

		endpoint.setRequestChannel(pubSubOutputChannel());

		return endpoint;
	}

	@Bean
	public Advice pubSubAdvice() {
		return new PubSubRequestHandlerAdvice();
	}

}

Working copy of application is available here: https://github.com/piyushpcegarg/spring-gcp-pubsub-webflux-sample

huangapple
  • 本文由 发表于 2020年5月2日 16:07:50
  • 转载请务必保留本文链接:https://java.coder-hub.com/61556238.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定