英文:
Spring dataflow Processor to process one payload and write multiple rows in data table
问题
我想提一个简单的问题,
我已经实现了一个处理器,它处理一个载荷并返回一个实体数组,就像这样:
@EnableBinding(Processor.class)
public class SimpleProcessor {
...
public SimpleProcessor () {
...
}
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public OutgoingEntity[] processData(IncomingEntity payload) {
// 业务逻辑在这里
return outgoingEntity;
}
我在SCDF中有我的流,中间件是类似这样的kafka:
某个来源 | SimpleProcessor | JDBC 槽
为了验证这些消息,我已经用日志槽替换了JDBC槽,并且它记录了JSON数组。当我使用JDBC槽时,它抛出异常,说JDBC槽无法访问对象中的属性,这是有道理的,因为它是对象的数组,而不是对象...
我的问题是:
1. 我能修改我的处理器吗,以便它可以一次处理一个载荷并多次输出消息,就像这样:
@Transformer(inputChannel = Processor.INPUT)
public void processData(IncomingEntity payload) {
...
for(OutgoingEntity o: OutgoingEntity[]){
outputMethod();
}
}
@Transformer(outputChannel = Processor.OUTPUT)
private OutgoingEntity outputMethod() {
...
return outgoingEntity;
}
这样它就可以将多个JSON对象传递给JDBC槽并写入数据表。
2. 我能用JDBC槽处理数组吗?如何操作?
3. 我能使用其他处理器或槽来完成这个任务吗?
英文:
I would like to ask a simple question,
I have implemented a processor, which process one payload and return an array of entities, such like:
@EnableBinding(Processor.class)
public class SimpleProcessor {
...
public SimpleProcessor () {
...
}
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public OutgoingEntity[] processData(IncomingEntity payload) {
// business logic here
return outgoingEntity;
}
I have my stream in SCDF and middleware as kafka like this:
some source | SimpleProcessor | JDBC sink
to validate the messages, I have replaced log sink to relpace JDBC sink and it logs arrays of json. When I use JDBC sink, it throws Exception and says JDBC sink can not access the properties in the object, which makes sense, that it is array of objects instead object...
My question is:
-
Can I use modify my processor, so it can process a payload once and give out message multiple time, like this,
@Transformer(inputChannel = Processor.INPUT) public void processData(IncomingEntity payload) { ... for(OutGoingEntity o: OutgoingEntity[]){ outputMethode(); } } @Transformer(outputChannel = Processor.OUTPUT) private OutGoingEntity outputMethode() { ..... return outGoingEntity; }
So it can pass multiple json object to jdbc sink and write in the datatable.
-
Can I use JDBC sink to deal with arrays? How?
-
Can I use some other processors or sink to finish this task?
答案1
得分: 0
就像Matthias J. Sax在评论中建议的那样,我已经使用了KStream的flatMapValue方法来处理输入中的数组。我将这个scdf处理器放在了另一个转发数组之后。这样运行得很好。
@EnableBinding(KafkaStreamsProcessor.class)
public class ArrayProcessor {
@StreamListener("input") @SendTo("output")
public KStream<?, String> process(KStream<?, String> payload) {
return payload.flatMapValues( //实现部分 )
...} }
英文:
Like Matthias J. Sax suggested in comment, I have used flatMapValue method of KStream to deal with array from input. I put this scdf processor after the one forwards array. That works fine.
@EnableBinding(KafkaStreamsProcessor.class)
public class ArrayProcessor {
@StreamListener("input") @SendTo("output")
public KStream<?, String> process(KStream<?, String> payload) {
return payload.flatMapValues( //impl )
...} }
专注分享java语言的经验与见解,让所有开发者获益!
评论