spring-integration-mqtt 将消息从输入主题1路由到输出主题2

huangapple 未分类评论53阅读模式
标题翻译

spring-integration-mqtt Route message from Input Topic1 to Output Topic2

问题

我已阅读并翻译您提供的内容:

我已经在这个问题上花了3天时间:
我正在使用spring-integration-mqtt从topic1读取一个简单的JSON消息:

{speed:40,
position:"NewYork"
}

如果速度大于50,则在Topic2上写入"ok"。

我正确地读取了JSON。
但是,当我在Topic2中写入"ok"时,我遇到了这个错误:

org.springframework.messaging.MessageHandlingException: 在[bean 'mqttOutbound'中无法发布MQTT; 定义在:'类路径资源[it/almaviva/mqtt/Configuration/Producer/Producer.class]'; 来源:'org.springframework.core.type.classreading.SimpleMethodMetadata@2b95e48b']中; 嵌套异常是Connessione già in corso (32110)

这是负责读取和写入的Consumer类:

@Configuration
public class Consumer {
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);

@Autowired MqttPahoClientFactory mqttClientFactory;
@Autowired
Producer producer;
@Autowired
private ApplicationContext context;

@Bean
public MessageChannel topicChannel(){
    return new DirectChannel();
}

@Bean
public MessageProducer mqttInbound() {
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
            Parameters.MQTT_CLIENT_ID, mqttClientFactory, Parameters.TOPICS[0]);
    adapter.setCompletionTimeout(5000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(1);
    adapter.setOutputChannel(topicChannel());
    return adapter;
}

@Bean
@ServiceActivator(inputChannel = "topicChannel")
public MessageHandler handler() {
    return message -> {
        OduModel model = JsonUtility.convertToOduModel(String.valueOf(message.getPayload()));
        logger.info(model.toString());
        boolean condition = DataProcessor.processOduModel(model);
        logger.info(String.valueOf(condition));
        Producer.ProducerGateway gw = context.getBean(Producer.ProducerGateway.class);
        if (condition){
            gw.sendToMqtt("OK", Parameters.TOPICS[1]);

        }
        else{
            gw.sendToMqtt("不OK", Parameters.TOPICS[1]);
        }

    };
}

}

Parameters.TOPICS[0]是我读取传入消息的topic1。
Parameters.TOPICS[1]是"topic2",我在其中写入"ok"或"non ok"。

在MessageHandler中使用这段代码是否正确?

Producer.ProducerGateway gw = context.getBean(Producer.ProducerGateway.class);
if (condition){
    gw.sendToMqtt("OK", Parameters.TOPICS[1]);

}
else{
    gw.sendToMqtt("不OK", Parameters.TOPICS[1]);
}

我在pom.xml中使用了以下依赖项:

<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-mqtt</artifactId>
  <version>5.2.3.RELEASE</version>
  <scope>compile</scope>
  <exclusions>
    <exclusion>
      <artifactId>jackson-module-kotlin</artifactId>
      <groupId>com.fasterxml.jackson.module</groupId>
    </exclusion>
  </exclusions>
</dependency>

非常感谢。
祝好。

英文翻译

i'm from 3 days on this problem:
I'm using spring-integration-mqtt for read from topic1 a simple message like a json:

{speed:40,
position:&quot;NewYork&quot;
}

if speed > 50 write "ok" on Topic2.

I read correctly json.
but when i write "ok" in Topic2 i have this error:

org.springframework.messaging.MessageHandlingException: Failed to publish to MQTT in the [bean &#39;mqttOutbound&#39;; defined in: &#39;class path resource [it/almaviva/mqtt/Configuration/Producer/Producer.class]&#39;; from source: &#39;org.springframework.core.type.classreading.SimpleMethodMetadata@2b95e48b&#39;]; nested exception is Connessione gi&#224; in corso (32110)

This is class Consumer that read and write on Producer..

@Configuration
public class Consumer {
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);

@Autowired MqttPahoClientFactory mqttClientFactory;
@Autowired
Producer producer;
@Autowired
private ApplicationContext context;
//@Autowired
//static Producer.ProducerGateway gw;

@Bean
public MessageChannel topicChannel(){
    return new DirectChannel();
}

@Bean
public MessageProducer mqttInbound() {
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
            Parameters.MQTT_CLIENT_ID, mqttClientFactory, Parameters.TOPICS[0]);
    adapter.setCompletionTimeout(5000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(1);
    adapter.setOutputChannel(topicChannel());
    return adapter;
}

@Bean
@ServiceActivator(inputChannel = &quot;topicChannel&quot;)
public MessageHandler handler() {
    return message -&gt; {
        OduModel model = JsonUtility.convertToOduModel(String.valueOf(message.getPayload()));
        logger.info(model.toString());
        boolean condition = DataProcessor.processOduModel(model);
        logger.info(String.valueOf(condition));
        //ApplicationContext context = new SpringApplicationBuilder(Producer.class).context();
        Producer.ProducerGateway gw = context.getBean(Producer.ProducerGateway.class);
        if (condition){
            gw.sendToMqtt(&quot;OK&quot;!, Parameters.TOPICS[1]);

        }
        else{
            gw.sendToMqtt(&quot;No OK&quot;, Parameters.TOPICS[1]);
        }

    };
}

}

Parameters.TOPICS[0] is topic1 where i read message incoming.
Parameters.TOPICS[1] is "topic2" where i write "ok" or "non ok".

Is correct use in particular this code in MessageHandler??

 Producer.ProducerGateway gw = context.getBean(Producer.ProducerGateway.class);
        if (condition){
            gw.sendToMqtt(&quot;&quot;OK&quot;, Parameters.TOPICS[1]);

        }
        else{
            gw.sendToMqtt(&quot;not OK&quot;, Parameters.TOPICS[1]);
        }

I use this dependency on pom.xml

&lt;dependency&gt;
  &lt;groupId&gt;org.springframework.integration&lt;/groupId&gt;
  &lt;artifactId&gt;spring-integration-mqtt&lt;/artifactId&gt;
  &lt;version&gt;5.2.3.RELEASE&lt;/version&gt;
  &lt;scope&gt;compile&lt;/scope&gt;
  &lt;exclusions&gt;
    &lt;exclusion&gt;
      &lt;artifactId&gt;jackson-module-kotlin&lt;/artifactId&gt;
      &lt;groupId&gt;com.fasterxml.jackson.module&lt;/groupId&gt;
    &lt;/exclusion&gt;
  &lt;/exclusions&gt;
&lt;/dependency&gt;

Thanks very much.
Regards

huangapple
  • 本文由 发表于 2020年3月16日 18:33:52
  • 转载请务必保留本文链接:https://java.coder-hub.com/60704305.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定