rxread java永不停止信息

huangapple 未分类评论42阅读模式
英文:

rxread java never stop info

问题

public static void main(String[] argc) {
    Vertx vertx = Vertx.vertx();
    
    WebClientOptions options = new WebClientOptions();
    WebClient client = WebClient.create(vertx, options);
    String uri = "/api/command";
    String host = "mydomain";
    vertx.fileSystem().rxReadFile("file.csv")
        .flattenAsObservable(fileContent -> Lists.newArrayList(fileContent.toString().split("\n")))
        .map(row -> row.split(";"))
        .skip(1)
        .map(row -> new JsonObject().put(name, row[0]))
        .flatMap(row -> {
            createCmd.put("CMD", "CMD");
            return client.post(host, uri)
                         .rxSendJson(createCmd)
                         .map((HttpResponse<Buffer> r) -> {
                             Long res = r.bodyAsJsonObject()
                                          .getJsonObject("result")
                                          .getJsonArray("hits")
                                          .getJsonObject(0)
                                          .getLong("res");
                             return row.put("res", res);
                         }).toObservable();
        })
        .doOnComplete(() -> System.out.println("On Complete: Completed the operation"))
        .doOnTerminate(() -> System.out.println("On Terminate: Terminated the operation"))
        .subscribe(
            content -> System.out.println("Content: " + content),
            err -> System.out.println("Cannot read the file: " + err.getMessage())
        );
    System.out.println("Out of the Vertx Read Stream");
}

Please note that the translation is based on the content you provided. If you have any further questions or need assistance with this code, feel free to ask.

英文:

I am trying to parse a csv file and execute a post call, this is what I achieved so far:

public static void main(String[] argc) {
	
    Vertx vertx = Vertx.vertx();
    
    WebClientOptions options = new WebClientOptions();
    WebClient client = WebClient.create(vertx, options);
    String uri = &quot;/api/command&quot;;
    String host = &quot;mydomain&quot;;
    vertx.fileSystem().rxReadFile(&quot;file.csv&quot;)
            .flattenAsObservable(fileContent -&gt; Lists.newArrayList(fileContent.toString().split(&quot;\n&quot;)))
            .map(row -&gt; row.split(&quot;;&quot;))
            .skip(1)
            .map(row -&gt; new JsonObject().put(name, row[0]))
            .flatMap(row -&gt; {
                    createCmd.put(&quot;CMD&quot;,&quot;CMD&quot;)
                    return client.post(host,uri)
                                 .rxSendJson(createCmd)
                                 .map((HttpResponse&lt;Buffer&gt; r) -&gt; {
                                     Long res = r.bodyAsJsonObject()
                                                      .getJsonObject(&quot;result&quot;)
                                                      .getJsonArray(&quot;hits&quot;)
                                                      .getJsonObject(0)
                                                      .getLong(&quot;res&quot;);
                                     return row.put(&quot;res&quot;, res);
                                     }).toObservable();
                    })
                    .doOnComplete(()  -&gt; System.out.println(&quot;On Complete: Completed the operation&quot;))
		            .doOnTerminate(() -&gt; System.out.println(&quot;On Terminate: Terminated the operation&quot;))
		.subscribe(
			    content -&gt; System.out.println(&quot;Content: &quot; + content),
		        err -&gt; System.out.println(&quot;Cannot read the file: &quot; + err.getMessage())
		    );
    System.out.println(&quot;Out of the Vertx Read Stream&quot;);
}

Why this code never reaches the last line?

    System.out.println(&quot;Out of the Vertx Read Stream&quot;);

Am i blocking somehow the execution of the thread and it gets stuck or it is normal behaviour?

EDIT:

I am also getting this Warning

Apr 10, 2020 5:57:51 PM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-worker-thread-5,5,main]=Thread[vert.x-worker-thread-5,5,main] has been blocked for 75944 ms, time limit is 60000 ms
io.vertx.core.VertxException: Thread blocked

答案1

得分: 0

以下是翻译好的代码部分:

import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.rxjava.core.AbstractVerticle;
import io.vertx.rxjava.core.Vertx;
import io.vertx.rxjava.ext.web.client.WebClient;
import io.vertx.rxjava.core.buffer.Buffer;
import io.vertx.rxjava.ext.web.client.HttpResponse;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import rx.Observable;
import rx.Single;
import java.util.List;
import java.util.ArrayList;

public class Main extends AbstractVerticle {

    @Override
    public void start() throws Exception {

        Vertx vertx = Vertx.vertx();

        WebClientOptions options = new WebClientOptions();
        WebClient client = WebClient.create(vertx, options);
        String uri = "/api/command";
        String host = "mydomain";
        vertx.fileSystem().rxReadFile("file.csv")
                .flattenAsObservable(fileContent -> {
                    List<String> rows = new ArrayList<>();
                    String[] lines = fileContent.toString().split("\n");
                    for (String line : lines) {
                        rows.add(line);
                    }
                    return Observable.from(rows);
                })
                .map(row -> row.split(";"))
                .skip(1)
                .map(row -> {
                    JsonObject jsonObject = new JsonObject();
                    jsonObject.put("name", row[0]); // 'name' is used as a key here
                    return jsonObject;
                })
                .flatMap(row -> {
                    JsonObject createCmd = new JsonObject();
                    createCmd.put("CMD", "CMD");
                    return client.post(host, uri)
                                 .rxSendJson(createCmd)
                                 .map((HttpResponse<Buffer> r) -> {
                                     Long res = r.bodyAsJsonObject()
                                                  .getJsonObject("result")
                                                  .getJsonArray("hits")
                                                  .getJsonObject(0)
                                                  .getLong("res");
                                     return row.put("res", res);
                                 }).toObservable();
                })
                .doOnComplete(() -> System.out.println("On Complete: Completed the operation"))
                .doOnTerminate(() -> System.out.println("On Terminate: Terminated the operation"))
                .subscribe(
                    content -> System.out.println("Content: " + content),
                    err -> System.out.println("Cannot read the file: " + err.getMessage())
                );

        System.out.println("Out of the Vertx Read Stream");
    }
}

请注意,我对代码中的一些类型和方法进行了适当的翻译和调整,以使其更符合Java的语法和要求。如果您对翻译有任何疑问,请随时提问。

英文:

Just have spend some time and what is mine result

import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.rxjava.core.AbstractVerticle;
import io.vertx.rxjava.core.Vertx;
import io.vertx.rxjava.ext.web.client.WebClient;



public class Main extends AbstractVerticle {

@Override
public void start() throws Exception {

    Vertx vertx = Vertx.vertx();

    WebClientOptions options = new WebClientOptions();
    WebClient client = WebClient.create(vertx, options);
    String uri = &quot;/api/command&quot;;
    String host = &quot;mydomain&quot;;
    vertx.fileSystem().rxReadFile(&quot;file.csv&quot;)
            .flattenAsObservable(fileContent -&gt; Lists.newArrayList(fileContent.toString().split(&quot;\n&quot;))) //The method flattenAsObservable((&lt;no type&gt; fileContent) -&gt; {}) is undefined for the type Single&lt;Buffer&gt;
            .map(row -&gt; row.split(&quot;;&quot;))
            .skip(1)
            .map(row -&gt; new JsonObject().put(name, row[0])) // what is name here?
            .flatMap(row -&gt; {
                    createCmd.put(&quot;CMD&quot;,&quot;CMD&quot;)
                    return client.post(host,uri)
                                 .rxSendJson(createCmd)
                                 .map((HttpResponse&lt;Buffer&gt; r) -&gt; {
                                     Long res = r.bodyAsJsonObject()
                                                      .getJsonObject(&quot;result&quot;)
                                                      .getJsonArray(&quot;hits&quot;)
                                                      .getJsonObject(0)
                                                      .getLong(&quot;res&quot;);
                                     return row.put(&quot;res&quot;, res);
                                     }).toObservable();
                    })
                    .doOnComplete(()  -&gt; System.out.println(&quot;On Complete: Completed the operation&quot;))
                    .doOnTerminate(() -&gt; System.out.println(&quot;On Terminate: Terminated the operation&quot;))
        .subscribe(
                content -&gt; System.out.println(&quot;Content: &quot; + content),
                err -&gt; System.out.println(&quot;Cannot read the file: &quot; + err.getMessage())
            );


      System.out.println(&quot;Out of the Vertx Read Stream&quot;);
     }
}

huangapple
  • 本文由 发表于 2020年4月10日 23:04:40
  • 转载请务必保留本文链接:https://java.coder-hub.com/61143112.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定