英文:
Can someone help me fix this program : ReqlDriverError: Response pump closed
问题
我正在学习rethinkdb,并且遇到了以下错误。我连接了rethinkdb,成功订阅了changefeed。在我启动我的应用程序之后,我可以成功地向表中添加记录,但在之后的15/20分钟后,如果我添加记录,会出现以下错误:
```none
c.r.g.e.ReqlDriverError: 响应泵关闭。
at c.r.n.DefaultConnectionFactory$ThreadResponsePump.await(DefaultConnectionFactory.java:214)
at c.r.net.Connection.sendQuery(Connection.java:349)
at c.r.net.Connection.runQuery(Connection.java:384)
at c.r.net.Connection.runAsync(Connection.java:166)
at c.r.net.Connection.run(Connection.java:185).
我的代码如下:
@Override
public void run() {
while (true) {
try {
logger.info("开始 RETHINKDB 和 CHANGEFEED");
conn = r.connection()
.hostname(configParam.hostRethink)
.port(configParam.portRethink)
.db(configParam.dbRethink)
.user(configParam.userRethink, configParam.passRethink)
.connect();
// Changefeeds: 通过在表上调用changes来订阅一个feed
Result<Object> result = r.table("order")
.filter(row -> row.g("status").eq("APPROVED"))
.changes()
.optArg("include_types", true)
.run(conn);
for (Object change : result) {
logger.info(change.toString());
superMarketService.updateApprovedSuperMarketOrder(change);
}
logger.info("结束 RETHINKDB 和 CHANGEFEED 成功");
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void addOrder(JSONSuperMarket json) {
if (conn == null) {
logger.error("=============== RETHINK DIE =========");
}
Result<Object> result = r.table("order").insert(
r.array(
r.hashMap("order_code", json.getOrderCode())
.with("cash_id", json.getCashId())
.with("merchant_id", json.getMerchantId())
.with("amount", json.getAmount())
.with("status", "PENDING")
.with("description", json.getDescription())
.with("created_date", r.now().inTimezone("+07:00").toIso8601())
)
).run(conn);
logger.info(result.toString());
}
在我的代码中是否存在任何错误,请帮我看看:(
我使用的是Java 8 + RethinkDb 2.4.2。
<details>
<summary>英文:</summary>
I am learning rethinkdb and I get the following error. i connect rethinkdb, subscribe changefeed success. After I started my application, I could add the record to table success, but after 15/20 minutes later, if I add the record, the error
```none
c.r.g.e.ReqlDriverError: Response pump closed.
at c.r.n.DefaultConnectionFactory$ThreadResponsePump.await(DefaultConnectionFactory.java:214)
at c.r.net.Connection.sendQuery(Connection.java:349)
at c.r.net.Connection.runQuery(Connection.java:384)
at c.r.net.Connection.runAsync(Connection.java:166)
at c.r.net.Connection.run(Connection.java:185).
My code here:
@Override
public void run () {
    while (true) {
        try {
logger.info ("START RETHINKDB & CHANGEFEED");
conn = r.connection ()
        .hostname (configParam.hostRethink)
        .port (configParam.portRethink)
        .db (configParam.dbRethink)
        .user (configParam.userRethink, configParam.passRethink)
        .connect ();
// Changefeeds: subscribe to a feed by calling changes on a table
Result <Object> result = r.table ("order")
        .filter (
                new ReqlFunction1 () {
                    @Override
                    public Object apply (ReqlExpr row) {
                        return row.g ("status"). eq ("APPROVED");
                    }
                }). changes (). optArg ("include_types", true) .run (conn);
for (Object change: result) {
    logger.info (change.toString ());
    superMarketService.updateApprovedSuperMarketOrder (change);
}
logger.info ("END RETHINKDB & CHANGEFEED SUCCESS");
        } catch (Exception e) {
            e.printStackTrace ();
        }
    }
}
public void addOrder (JSONSuperMarket json) {
    if (conn == null) {
        logger.error ("=============== RETHINK DIE =========");
    }
    Result <Object> result = r.table ("order"). Insert (r.array (
            r.hashMap ("order_code", json.getOrderCode ())
                    .with ("cash_id", json.getCashId ())
                    .with ("merchant_id", json.getMerchantId ())
                    .with ("amount", json.getAmount ())
                    .with ("status", "PENDING")
                    .with ("description", json.getDescription ())
                    .with ("created_date", r.now (). inTimezone ("+ 07:00"). toIso8601 ())
    )). run (conn);
    logger.info (result.toString ());
}
Is there any error in my code, pls help me
I use Java 8 + RethinkDb 2.4.2.
专注分享java语言的经验与见解,让所有开发者获益!
评论