英文:
Spring Cloud GCP is not receiving messages from subscription
问题
ChatboardApplication.java
@SpringBootApplication
public class ChatboardApplication {
public static void main(String[] args) {
SpringApplication.run(ChatboardApplication.class, args);
}
private static final Log LOGGER = LogFactory.getLog(ChatboardApplication.class);
@Bean
public MessageChannel pubsubInputChannel() {
return new DirectChannel();
}
@Bean
public PubSubInboundChannelAdapter messageChannelAdapter(
@Qualifier("pubsubInputChannel") MessageChannel inputChannel,
PubSubTemplate pubSubTemplate) {
PubSubInboundChannelAdapter adapter =
new PubSubInboundChannelAdapter(pubSubTemplate, "agent-genie-sub");
adapter.setOutputChannel(inputChannel);
return adapter;
}
@ServiceActivator(inputChannel = "pubsubInputChannel")
public void messageReceiver(String payload) {
LOGGER.info("Message arrived! Payload: " + payload);
}
}
ChatboardProducerApplication.java
@SpringBootApplication
@RestController
public class ChatboardProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ChatboardProducerApplication.class, args);
}
@MessagingGateway(defaultRequestChannel = "pubsubOutputChannel")
public interface PubsubOutboundGateway {
void sendToPubsub(String text);
}
@Bean
@ServiceActivator(inputChannel = "pubsubOutputChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
return new PubSubMessageHandler(pubsubTemplate, "agent-genie-logs");
}
@Autowired
private PubsubOutboundGateway messagingGateway;
@PostMapping("/postMessage")
public RedirectView postMessage(@RequestParam("message") String message) {
this.messagingGateway.sendToPubsub(message);
return new RedirectView("/");
}
}
pom.xml
<!-- The contents of your pom.xml file remain the same -->
application.properties
spring.cloud.gcp.project-id=#########
spring.cloud.gcp.credentials.location=file:#######################
Curl command to test applications
curl --data "message=Hello world!" localhost:8080/postMessage
Edit - Error after several minutes
com.google.api.gax.rpc.UnavailableException: io.grpc.StatusRuntimeException: UNAVAILABLE: Credentials failed to obtain metadata
...
...
...
Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: Credentials failed to obtain metadata
...
...
...
Caused by: java.io.IOException: Error getting access token for service account: 400 Bad Request
{ "error": "invalid_grant", "error_description": "Invalid JWT: Token must be a short-lived token (60 minutes) and in a reasonable timeframe. Check your iat and exp values in the JWT claim." }
...
...
...
Please note that I have only provided the translated code snippets and key information from your original content. If you have any specific questions or need further assistance, feel free to ask.
英文:
I have a trained Chatbot in DialogFlow. I enabled logging for the chatbot and it logs to Google StackDriver. From Google StackDriver, using a log router, I have routed the logs to a Google Cloud Pub-Sub topic. Then, with a subscription, I have subscribed to this topic. The Pub-Sub works fine. I have setup Google Cloud SDK in my Ubuntu PC. Using gcloud commands, I can pull messages from the Pub-Sub subscription and it works.
Now, I implemented a Spring Boot application to subscribe to Pub-Sub. I followed this tutorial. Find the code below. The application does not receive messages from Pub-Sub. It is supposed to log the received message. Can anyone tell me what I may be missing here? How can I test if this actually works? Please comment if further information is needed.
> ChatboardApplication.java
@SpringBootApplication
public class ChatboardApplication {
public static void main(String[] args) {
SpringApplication.run(ChatboardApplication.class, args);
}
private static final Log LOGGER = LogFactory.getLog(ChatboardApplication.class);
@Bean
public MessageChannel pubsubInputChannel() {
return new DirectChannel();
}
@Bean
public PubSubInboundChannelAdapter messageChannelAdapter(
@Qualifier("pubsubInputChannel") MessageChannel inputChannel,
PubSubTemplate pubSubTemplate) {
PubSubInboundChannelAdapter adapter =
new PubSubInboundChannelAdapter(pubSubTemplate, "agent-genie-sub");
adapter.setOutputChannel(inputChannel);
return adapter;
}
@ServiceActivator(inputChannel = "pubsubInputChannel")
public void messageReceiver(String payload) {
LOGGER.info("Message arrived! Payload: " + payload);
}
}
> ChatboardProducerApplication
@SpringBootApplication
@RestController
public class ChatboardProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ChatboardProducerApplication.class, args);
}
@MessagingGateway(defaultRequestChannel = "pubsubOutputChannel")
public interface PubsubOutboundGateway {
void sendToPubsub(String text);
}
@Bean
@ServiceActivator(inputChannel = "pubsubOutputChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
return new PubSubMessageHandler(pubsubTemplate, "agent-genie-logs");
}
@Autowired
private PubsubOutboundGateway messagingGateway;
@PostMapping("/postMessage")
public RedirectView postMessage(@RequestParam("message") String message) {
this.messagingGateway.sendToPubsub(message);
return new RedirectView("/");
}
}
> pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.dialog</groupId>
<artifactId>chatboard</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>chatboard</name>
<description>PubSub subscriber and Dashboard backend</description>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Hoxton.SR3</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
> application.properties
spring.cloud.gcp.project-id=#########
spring.cloud.gcp.credentials.location=file:#######################
> Curl command to test applications
curl --data "message=Hello world!" localhost:8080/postMessage
> Edit - Error after several minutes
After some time from the applications started and running curl command, I get the following error.
com.google.api.gax.rpc.UnavailableException: io.grpc.StatusRuntimeException: UNAVAILABLE: Credentials failed to obtain metadata
at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:69) ~[gax-1.54.0.jar:1.54.0]
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72) ~[gax-grpc-1.54.0.jar:1.54.0]
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60) ~[gax-grpc-1.54.0.jar:1.54.0]
at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97) [gax-grpc-1.54.0.jar:1.54.0]
at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68) [api-common-1.8.1.jar:na]
at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1039) [guava-28.2-android.jar:na]
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30) [guava-28.2-android.jar:na]
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1165) [guava-28.2-android.jar:na]
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:958) [guava-28.2-android.jar:na]
at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:749) [guava-28.2-android.jar:na]
at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:522) [grpc-stub-1.27.2.jar:1.27.2]
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:497) [grpc-stub-1.27.2.jar:1.27.2]
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:426) [grpc-core-1.27.2.jar:1.27.2]
at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66) [grpc-core-1.27.2.jar:1.27.2]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:689) [grpc-core-1.27.2.jar:1.27.2]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$900(ClientCallImpl.java:577) [grpc-core-1.27.2.jar:1.27.2]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:751) [grpc-core-1.27.2.jar:1.27.2]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:740) [grpc-core-1.27.2.jar:1.27.2]
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) [grpc-core-1.27.2.jar:1.27.2]
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) [grpc-core-1.27.2.jar:1.27.2]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_212]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_212]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_212]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [na:1.8.0_212]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_212]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_212]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]
Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: Credentials failed to obtain metadata
at io.grpc.Status.asRuntimeException(Status.java:533) ~[grpc-api-1.27.2.jar:1.27.2]
... 16 common frames omitted
Caused by: java.io.IOException: Error getting access token for service account: 400 Bad Request
{"error":"invalid_grant","error_description":"Invalid JWT: Token must be a short-lived token (60 minutes) and in a reasonable timeframe. Check your iat and exp values in the JWT claim."}
at com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:444) ~[google-auth-library-oauth2-http-0.20.0.jar:na]
at com.google.auth.oauth2.OAuth2Credentials.refresh(OAuth2Credentials.java:157) ~[google-auth-library-oauth2-http-0.20.0.jar:na]
at com.google.auth.oauth2.OAuth2Credentials.getRequestMetadata(OAuth2Credentials.java:145) ~[google-auth-library-oauth2-http-0.20.0.jar:na]
at com.google.auth.oauth2.ServiceAccountCredentials.getRequestMetadata(ServiceAccountCredentials.java:603) ~[google-auth-library-oauth2-http-0.20.0.jar:na]
at com.google.auth.Credentials.blockingGetToCallback(Credentials.java:112) ~[google-auth-library-credentials-0.20.0.jar:na]
at com.google.auth.Credentials$1.run(Credentials.java:98) ~[google-auth-library-credentials-0.20.0.jar:na]
... 7 common frames omitted
Caused by: com.google.api.client.http.HttpResponseException: 400 Bad Request
{"error":"invalid_grant","error_description":"Invalid JWT: Token must be a short-lived token (60 minutes) and in a reasonable timeframe. Check your iat and exp values in the JWT claim."}
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1113) ~[google-http-client-1.34.2.jar:1.34.2]
at com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:441) ~[google-auth-library-oauth2-http-0.20.0.jar:na]
... 12 common frames omitted
专注分享java语言的经验与见解,让所有开发者获益!
评论