有人可以帮我修复这个程序吗:ReqlDriverError:响应泵关闭

huangapple 未分类评论48阅读模式
英文:

Can someone help me fix this program : ReqlDriverError: Response pump closed

问题

我正在学习rethinkdb,并且遇到了以下错误。我连接了rethinkdb,成功订阅了changefeed。在我启动我的应用程序之后,我可以成功地向表中添加记录,但在之后的15/20分钟后,如果我添加记录,会出现以下错误:

```none
c.r.g.e.ReqlDriverError: 响应泵关闭。
	at c.r.n.DefaultConnectionFactory$ThreadResponsePump.await(DefaultConnectionFactory.java:214)
	at c.r.net.Connection.sendQuery(Connection.java:349)
	at c.r.net.Connection.runQuery(Connection.java:384)
	at c.r.net.Connection.runAsync(Connection.java:166)
	at c.r.net.Connection.run(Connection.java:185).

我的代码如下:

@Override
public void run() {
    while (true) {
        try {
            logger.info("开始 RETHINKDB 和 CHANGEFEED");
            conn = r.connection()
                    .hostname(configParam.hostRethink)
                    .port(configParam.portRethink)
                    .db(configParam.dbRethink)
                    .user(configParam.userRethink, configParam.passRethink)
                    .connect();

            // Changefeeds: 通过在表上调用changes来订阅一个feed
            Result<Object> result = r.table("order")
                    .filter(row -> row.g("status").eq("APPROVED"))
                    .changes()
                    .optArg("include_types", true)
                    .run(conn);

            for (Object change : result) {
                logger.info(change.toString());
                superMarketService.updateApprovedSuperMarketOrder(change);
            }
            logger.info("结束 RETHINKDB 和 CHANGEFEED 成功");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

public void addOrder(JSONSuperMarket json) {
    if (conn == null) {
        logger.error("=============== RETHINK DIE =========");
    }

    Result<Object> result = r.table("order").insert(
            r.array(
                    r.hashMap("order_code", json.getOrderCode())
                            .with("cash_id", json.getCashId())
                            .with("merchant_id", json.getMerchantId())
                            .with("amount", json.getAmount())
                            .with("status", "PENDING")
                            .with("description", json.getDescription())
                            .with("created_date", r.now().inTimezone("+07:00").toIso8601())
            )
    ).run(conn);
    logger.info(result.toString());
}

在我的代码中是否存在任何错误,请帮我看看:(
我使用的是Java 8 + RethinkDb 2.4.2。


<details>
<summary>英文:</summary>

I am learning rethinkdb and I get the following error. i connect rethinkdb, subscribe changefeed success. After I started my application, I could add the record to table success, but after 15/20 minutes later, if I add the record, the error 

```none
c.r.g.e.ReqlDriverError: Response pump closed.
	at c.r.n.DefaultConnectionFactory$ThreadResponsePump.await(DefaultConnectionFactory.java:214)
	at c.r.net.Connection.sendQuery(Connection.java:349)
	at c.r.net.Connection.runQuery(Connection.java:384)
	at c.r.net.Connection.runAsync(Connection.java:166)
	at c.r.net.Connection.run(Connection.java:185). 

My code here:

@Override
public void run () {
&#160;&#160;&#160;&#160;while (true) {
&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;try {
			logger.info (&quot;START RETHINKDB &amp; CHANGEFEED&quot;);
			conn = r.connection ()
			&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;.hostname (configParam.hostRethink)
			&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;.port (configParam.portRethink)
			&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;.db (configParam.dbRethink)
			&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;.user (configParam.userRethink, configParam.passRethink)
			&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;.connect ();

			// Changefeeds: subscribe to a feed by calling changes on a table
			Result &lt;Object&gt; result = r.table (&quot;order&quot;)
			&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;.filter (
			&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;new ReqlFunction1 () {
			&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;@Override
			&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;public Object apply (ReqlExpr row) {
			&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;return row.g (&quot;status&quot;). eq (&quot;APPROVED&quot;);
			&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;}
			&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;}). changes (). optArg (&quot;include_types&quot;, true) .run (conn);
			for (Object change: result) {
			&#160;&#160;&#160;&#160;logger.info (change.toString ());
			&#160;&#160;&#160;&#160;superMarketService.updateApprovedSuperMarketOrder (change);
			}
			logger.info (&quot;END RETHINKDB &amp; CHANGEFEED SUCCESS&quot;);
&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;} catch (Exception e) {
&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;e.printStackTrace ();
&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;}
&#160;&#160;&#160;&#160;}
}

public void addOrder (JSONSuperMarket json) {
&#160;&#160;&#160;&#160;if (conn == null) {
&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;logger.error (&quot;=============== RETHINK DIE =========&quot;);
&#160;&#160;&#160;&#160;}

&#160;&#160;&#160;&#160;Result &lt;Object&gt; result = r.table (&quot;order&quot;). Insert (r.array (
&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;r.hashMap (&quot;order_code&quot;, json.getOrderCode ())
&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;.with (&quot;cash_id&quot;, json.getCashId ())
&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;.with (&quot;merchant_id&quot;, json.getMerchantId ())
&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;.with (&quot;amount&quot;, json.getAmount ())
&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;.with (&quot;status&quot;, &quot;PENDING&quot;)
&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;.with (&quot;description&quot;, json.getDescription ())
&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;.with (&quot;created_date&quot;, r.now (). inTimezone (&quot;+ 07:00&quot;). toIso8601 ())
&#160;&#160;&#160;&#160;)). run (conn);
&#160;&#160;&#160;&#160;logger.info (result.toString ());
}

Is there any error in my code, pls help me 有人可以帮我修复这个程序吗:ReqlDriverError:响应泵关闭
I use Java 8 + RethinkDb 2.4.2.

huangapple
  • 本文由 发表于 2020年5月30日 10:51:43
  • 转载请务必保留本文链接:https://java.coder-hub.com/62097226.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定