无法通过Observable将数据插入数据库。

huangapple 未分类评论59阅读模式
英文:

Unable to insert data into db via an Observable

问题

这是Observable,我预计在订阅时会发生一些数据库插入操作。
但是没有任何操作,没有数据库插入,同时也没有错误。

但是,如果我直接订阅执行数据库调用的方法,数据库插入就会按预期发生。
我如何修复这个问题,使得对下面Observable的订阅会执行数据库插入操作?

请给予建议。谢谢。

这是Observable,其中没有发生数据库插入,也没有错误。我希望进行更改,以便在订阅此Observable时发生数据库插入。

public Observable<KafkaConsumerRecord<String, RequestObj>> apply(KafkaConsumerRecords<String, RequestObj> records) {
    Observable.from(records.getDelegate().records().records("TOPIC_NAME"))
            .buffer(2)
            .map(this::convertToEventRequest)
            .doOnNext(this::handleEventInsertions)
            .doOnSubscribe(() -> System.out.println("Subscribed!"))
            .subscribe(); // 故意在这里进行订阅以进行测试

    return null; // 即使我返回此Observable并在调用者处订阅,结果仍然相同。
}

只是为了测试查询是否有效,如果我直接订阅执行插入的方法,它会按预期工作,如下所示。
在调试模式下执行此操作。

client.rxQueryWithParams(query, new JsonArray(params)).subscribe() // 正常工作

以下是内部发生情况的参考,位于convertToEventRequest和handleEventInsertions方法内:

private Map<String, List<?>> convertToEventRequest(Object records) {
    List<ConsumerRecord<String, RequestObj>> consumerRecords = (List<ConsumerRecord<String, RequestObj>>) records;

    // ... 省略部分代码 ...
    
    return new HashMap<String, List<?>>() {{
        put("add", addEventRequests);
        put("update", updateEventRequests);
    }};
}

private void handleEventInsertions(Object eventObject) {
    Map<String, List<?>> eventMap = (Map<String, List<?>>) eventObject;

    // ... 省略部分代码 ...
}

如果你有其他问题,请随时问。

英文:

I have the following Observable where I am expecting some DB insertions to occur upon subscribing to it.
But nothing happens, no DB inserts and same time no errors either.

But If I directly subscribe to the method that does the DB calls, the DB insert occurs as expected.
How can I fix this such that the subscription to the Observable call below will perform the DB insert?

Please advice. Thanks.

This is the Observable where no DB insert occurs and no errors. I want to change this such that the DB insertion occurs when I subscribe to this Observable.

public Observable&lt;KafkaConsumerRecord&lt;String, RequestObj&gt;&gt; apply(KafkaConsumerRecords&lt;String, RequestObj&gt; records) {

    Observable.from(records.getDelegate().records().records(&quot;TOPIC_NAME&quot;))
            .buffer(2)
            .map(this::convertToEventRequest)
            .doOnNext(this::handleEventInsertions)
            .doOnSubscribe(() -&gt; System.out.println(&quot;Subscribed!&quot;))
            .subscribe(); // purposely subscribing here itself to test 

    return null; // even if I return this observable and subscribe at the caller, same outcome. 
}

Just to test if the query works, if I were to directly subscribe to the method that does the insertion, it works as expected as follows.
Doing this in debug mode.

client.rxQueryWithParams(query, new JsonArray(params)).subscribe() // works

The following are references to see whats happening inside the convertToEventRequest and handleEventInsertions methods

private Map&lt;String, List&lt;?&gt;&gt; convertToEventRequest(Object records) {
    List&lt;ConsumerRecord&lt;String, RequestObj&gt;&gt; consumerRecords = (List&lt;ConsumerRecord&lt;String, RequestObj&gt;&gt;) records;

    List&lt;AddEventRequest&gt; addEventRequests = new ArrayList&lt;&gt;();
    List&lt;UpdateEventRequest&gt; updateEventRequests = new ArrayList&lt;&gt;();

    consumerRecords.forEach(record -&gt; {
        String eventType = new String(record.headers().headers(&quot;type&quot;).iterator().next().value(), StandardCharsets.UTF_8);

        if(&quot;add&quot;.equals(eventType)) {
            AddEventRequest request = AddEventRequest.builder()
                    .count(Integer.parseInt(new String(record.headers().headers(&quot;count&quot;).iterator().next().value(), StandardCharsets.UTF_8)))
                    .data(record.value())
                    .build();
            addEventRequests.add(request);
        } else {
            UpdateEventRequest request = UpdateEventRequest.builder()
                    .id(new String(record.headers().headers(&quot;id&quot;).iterator().next().value(), StandardCharsets.UTF_8))
                    .status(Integer.parseInt(new String(record.headers().headers(&quot;status&quot;).iterator().next().value(), StandardCharsets.UTF_8)))
                    .build();
            updateEventRequests.add(request);
        }
    });

    return new HashMap&lt;String, List&lt;?&gt;&gt;() {{
        put(&quot;add&quot;, addEventRequests);
        put(&quot;update&quot;, updateEventRequests);
    }};
}

private void handleEventInsertions(Object eventObject) {
    Map&lt;String, List&lt;?&gt;&gt; eventMap = (Map&lt;String, List&lt;?&gt;&gt;) eventObject;

    List&lt;AddEventRequest&gt; addEventRequests = (List&lt;AddEventRequest&gt;) eventMap.get(&quot;add&quot;);
    List&lt;UpdateEventRequest&gt; updateEventRequests = (List&lt;UpdateEventRequest&gt;) eventMap.get(&quot;update&quot;);

    if(addEventRequests != null &amp;&amp; !addEventRequests.isEmpty()) {
        insertAddEvents(addEventRequests);
    }
    if(updateEventRequests != null &amp;&amp; !updateEventRequests.isEmpty()) {
        insertUpdateEvents(updateEventRequests);
    }
}

private Single&lt;ResultSet&gt; insertAddEvents(List&lt;AddEventRequest&gt; requests) {
    AddEventRequest request = requests.get(0);
    List&lt;Object&gt; params = Arrays.asList(request.getCount(), request.getData());
    String query = &quot;INSERT INTO mytable(count, data, creat_ts) &quot; +
            &quot;VALUES (?, ?, current_timestamp)&quot;;
    return client.rxQueryWithParams(query, new JsonArray(params));
}

private Single&lt;ResultSet&gt; insertUpdateEvents(List&lt;UpdateEventRequest&gt; requests) {
    UpdateEventRequest request = requests.get(0);
    return client.rxQueryWithParams(
            &quot;UPDATE mytable SET status=?, creat_ts=current_timestamp WHERE id=?&quot;,
            new JsonArray(Arrays.asList(request.getStatus(), request.getId())));
}

答案1

得分: 0

Sure, here's the translation:

你可以尝试将它包装到 `Observable.defer` 中吗?

```plaintext
Observable.defer(() -&gt; Observable.from(records.getDelegate().records().records(&quot;TOPIC_NAME&quot;))...

<details>
<summary>英文:</summary>

Can you try to wrap it into `Observable.defer`?

Observable.defer(() -> Observable.from(records.getDelegate().records().records("TOPIC_NAME"))...


</details>



huangapple
  • 本文由 发表于 2020年7月27日 22:26:02
  • 转载请务必保留本文链接:https://java.coder-hub.com/63117434.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定