标题翻译
spring-integration-mqtt Route message from Input Topic1 to Output Topic2
问题
我已阅读并翻译您提供的内容:
我已经在这个问题上花了3天时间:
我正在使用spring-integration-mqtt从topic1读取一个简单的JSON消息:
{speed:40,
position:"NewYork"
}
如果速度大于50,则在Topic2上写入"ok"。
我正确地读取了JSON。
但是,当我在Topic2中写入"ok"时,我遇到了这个错误:
org.springframework.messaging.MessageHandlingException: 在[bean 'mqttOutbound'中无法发布MQTT; 定义在:'类路径资源[it/almaviva/mqtt/Configuration/Producer/Producer.class]'; 来源:'org.springframework.core.type.classreading.SimpleMethodMetadata@2b95e48b']中; 嵌套异常是Connessione già in corso (32110)
这是负责读取和写入的Consumer类:
@Configuration
public class Consumer {
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
@Autowired MqttPahoClientFactory mqttClientFactory;
@Autowired
Producer producer;
@Autowired
private ApplicationContext context;
@Bean
public MessageChannel topicChannel(){
return new DirectChannel();
}
@Bean
public MessageProducer mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
Parameters.MQTT_CLIENT_ID, mqttClientFactory, Parameters.TOPICS[0]);
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(topicChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "topicChannel")
public MessageHandler handler() {
return message -> {
OduModel model = JsonUtility.convertToOduModel(String.valueOf(message.getPayload()));
logger.info(model.toString());
boolean condition = DataProcessor.processOduModel(model);
logger.info(String.valueOf(condition));
Producer.ProducerGateway gw = context.getBean(Producer.ProducerGateway.class);
if (condition){
gw.sendToMqtt("OK", Parameters.TOPICS[1]);
}
else{
gw.sendToMqtt("不OK", Parameters.TOPICS[1]);
}
};
}
}
Parameters.TOPICS[0]是我读取传入消息的topic1。
Parameters.TOPICS[1]是"topic2",我在其中写入"ok"或"non ok"。
在MessageHandler中使用这段代码是否正确?
Producer.ProducerGateway gw = context.getBean(Producer.ProducerGateway.class);
if (condition){
gw.sendToMqtt("OK", Parameters.TOPICS[1]);
}
else{
gw.sendToMqtt("不OK", Parameters.TOPICS[1]);
}
我在pom.xml中使用了以下依赖项:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.2.3.RELEASE</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>jackson-module-kotlin</artifactId>
<groupId>com.fasterxml.jackson.module</groupId>
</exclusion>
</exclusions>
</dependency>
非常感谢。
祝好。
英文翻译
i'm from 3 days on this problem:
I'm using spring-integration-mqtt for read from topic1 a simple message like a json:
{speed:40,
position:"NewYork"
}
if speed > 50 write "ok" on Topic2.
I read correctly json.
but when i write "ok" in Topic2 i have this error:
org.springframework.messaging.MessageHandlingException: Failed to publish to MQTT in the [bean 'mqttOutbound'; defined in: 'class path resource [it/almaviva/mqtt/Configuration/Producer/Producer.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@2b95e48b']; nested exception is Connessione già in corso (32110)
This is class Consumer that read and write on Producer..
@Configuration
public class Consumer {
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
@Autowired MqttPahoClientFactory mqttClientFactory;
@Autowired
Producer producer;
@Autowired
private ApplicationContext context;
//@Autowired
//static Producer.ProducerGateway gw;
@Bean
public MessageChannel topicChannel(){
return new DirectChannel();
}
@Bean
public MessageProducer mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
Parameters.MQTT_CLIENT_ID, mqttClientFactory, Parameters.TOPICS[0]);
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(topicChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "topicChannel")
public MessageHandler handler() {
return message -> {
OduModel model = JsonUtility.convertToOduModel(String.valueOf(message.getPayload()));
logger.info(model.toString());
boolean condition = DataProcessor.processOduModel(model);
logger.info(String.valueOf(condition));
//ApplicationContext context = new SpringApplicationBuilder(Producer.class).context();
Producer.ProducerGateway gw = context.getBean(Producer.ProducerGateway.class);
if (condition){
gw.sendToMqtt("OK"!, Parameters.TOPICS[1]);
}
else{
gw.sendToMqtt("No OK", Parameters.TOPICS[1]);
}
};
}
}
Parameters.TOPICS[0] is topic1 where i read message incoming.
Parameters.TOPICS[1] is "topic2" where i write "ok" or "non ok".
Is correct use in particular this code in MessageHandler??
Producer.ProducerGateway gw = context.getBean(Producer.ProducerGateway.class);
if (condition){
gw.sendToMqtt(""OK", Parameters.TOPICS[1]);
}
else{
gw.sendToMqtt("not OK", Parameters.TOPICS[1]);
}
I use this dependency on pom.xml
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.2.3.RELEASE</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>jackson-module-kotlin</artifactId>
<groupId>com.fasterxml.jackson.module</groupId>
</exclusion>
</exclusions>
</dependency>
Thanks very much.
Regards
专注分享java语言的经验与见解,让所有开发者获益!
评论