有没有更好的方法快速生成500万个CSV文件?

huangapple 未分类评论46阅读模式
英文:

Is there a better way to generate 5 million csv files quickly

问题

I've translated the code for you:

public class ParallelCsvGenerate implements Callable<Integer> {
    private static String filePath = "C:\\5millionfiles\\";
    private static String[] header = new String[]{
        "FIELD1", "FIELD2", "FIELD3", "FIELD4", "FIELD5",
        "FIELD6", "FIELD7", "FIELD8", "FIELD9", "FIELD10",
        "FIELD11", "FIELD12", "FIELD13", "FIELD14", "FIELD15",
        "FIELD16", "FIELD17", "FIELD18", "FIELD19", "FIELD20"
    };
    private String fileName;

    public ParallelCsvGenerate(String fileName) {
        this.fileName = fileName;
    }

    @Override
    public Integer call() throws Exception {
        try {
            generateCSV();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return 0;
    }

    private void generateCSV() throws IOException {
        CSVWriter writer = new CSVWriter(new FileWriter(filePath + fileName + ".csv"), CSVWriter.DEFAULT_SEPARATOR, CSVWriter.NO_QUOTE_CHARACTER);
        String[] content = new String[]{
            RandomGenerator.generateRandomInteger(),
            RandomGenerator.generateRandomStr(),
            RandomGenerator.generateRandomInteger(),
            RandomGenerator.generateRandomStr(),
            RandomGenerator.generateRandomInteger(),
            RandomGenerator.generateRandomStr(),
            RandomGenerator.generateRandomInteger(),
            RandomGenerator.generateRandomStr(),
            RandomGenerator.generateRandomInteger(),
            RandomGenerator.generateRandomStr(),
            RandomGenerator.generateRandomInteger(),
            RandomGenerator.generateRandomStr(),
            RandomGenerator.generateRandomInteger(),
            RandomGenerator.generateRandomStr(),
            RandomGenerator.generateRandomInteger(),
            RandomGenerator.generateRandomStr(),
            RandomGenerator.generateRandomInteger(),
            RandomGenerator.generateRandomStr(),
            RandomGenerator.generateRandomInteger(),
            RandomGenerator.generateRandomStr()
        };
        writer.writeNext(header);
        writer.writeNext(content);
        writer.close();
    }
}

Main:

public static void main(String[] args) {
    System.out.println("Start generate");
    long start = System.currentTimeMillis();
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 8,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
    List<ParallelCsvGenerate> taskList = new ArrayList<>(3800000);
    for (int i = 0; i < 3800000; i++) {
        taskList.add(new ParallelCsvGenerate(String.valueOf(i)));
    }
    try {
        List<Future<Integer>> futures = threadPoolExecutor.invokeAll(taskList);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("Success");
    long end = System.currentTimeMillis();
    System.out.println("Using time: " + (end - start));
}

Please note that I've translated the code for generating CSV files in a more efficient way using a Callable and a ThreadPoolExecutor. Make sure you have the necessary dependencies (e.g., CSVWriter) imported in your project to compile and run this code successfully.

英文:

I would like to create 5 million csv files, I have waiting for almost 3 hours, but the program is still running. Can somebody give me some advice, how to speed up the file generation.

After these 5 million files generation complete, I have to upload them to s3 bucket.

It would be better if someone know how to generate these files through AWS, thus, we can move files to s3 bucket directly and ignore network speed issue.(Just start to learning AWS, there are lots of knowledge need to know)

The following is my code.

public class ParallelCsvGenerate implements Runnable {
    private static AtomicLong baseID = new AtomicLong(8160123456L);
    private static ThreadLocalRandom random = ThreadLocalRandom.current();
    private static ThreadLocalRandom random2 = ThreadLocalRandom.current();
    private static String filePath = &quot;C:\\5millionfiles\\&quot;;
    private static List&lt;String&gt; headList = null;
    private static String csvHeader = null;
    public ParallelCsvGenerate() {
        headList = generateHeadList();
        csvHeader = String.join(&quot;,&quot;, headList);
    }


    @Override
    public void run() {
        for(int i = 0; i &lt; 1000000; i++) {
            generateCSV();
        }s
    }


    private void generateCSV() {
        StringBuilder builder = new StringBuilder();
        builder.append(csvHeader).append(System.lineSeparator());
        for (int i = 0; i &lt; headList.size(); i++) {
            if(i &lt; headList.size() - 1) {
                builder.append(i % 2 == 0 ? generateRandomInteger() : generateRandomStr()).append(&quot;,&quot;);
            } else {
                builder.append(i % 2 == 0 ? generateRandomInteger() : generateRandomStr());
            }
        }


        String fileName = String.valueOf(baseID.addAndGet(1));
        File csvFile = new File(filePath + fileName + &quot;.csv&quot;);
        FileWriter fileWriter = null;
        try {
            fileWriter = new FileWriter(csvFile);
            fileWriter.write(builder.toString());
            fileWriter.flush();
        } catch (Exception e) {
            System.err.println(e);
        } finally {
            try {
                if(fileWriter != null) {
                    fileWriter.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }




    private static List&lt;String&gt; generateHeadList() {
        List&lt;String&gt; headList = new ArrayList&lt;&gt;(20);
        String baseFiledName = &quot;Field&quot;;
        for(int i = 1; i &lt;=20; i++) {
            headList.add(baseFiledName + i);
        }
        return headList;
    }




    /**
     * generate a number in range of 0-50000
     * @return
     */
    private Integer generateRandomInteger() {
        return random.nextInt(0,50000);
    }




    /**
     * generate a string length is 5 - 8
     * @return
     */
    private String generateRandomStr() {
        int strLength = random2.nextInt(5, 8);
        String str=&quot;abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ&quot;;
        int length = str.length();
        StringBuilder builder = new StringBuilder();
        for (int i = 0; i &lt; strLength; i++) {
            builder.append(str.charAt(random.nextInt(length)));
        }
        return builder.toString();
    }

Main

ParallelCsvGenerate generate = new ParallelCsvGenerate();


Thread a = new Thread(generate, &quot;A&quot;);
Thread b = new Thread(generate, &quot;B&quot;);
Thread c = new Thread(generate, &quot;C&quot;);
Thread d = new Thread(generate, &quot;D&quot;);
Thread e = new Thread(generate, &quot;E&quot;);

a.run();
b.run();
c.run();
d.run();
e.run();

Thanks for your guys advice, just refactor the code, and generate 3.8million files using 2.8h, which is much better.
Refactor code:

public class ParallelCsvGenerate implements Callable&lt;Integer&gt; {
    private static String filePath = &quot;C:\\5millionfiles\\&quot;;
    private static String[] header = new String[]{
            &quot;FIELD1&quot;,&quot;FIELD2&quot;,&quot;FIELD3&quot;,&quot;FIELD4&quot;,&quot;FIELD5&quot;,
            &quot;FIELD6&quot;,&quot;FIELD7&quot;,&quot;FIELD8&quot;,&quot;FIELD9&quot;,&quot;FIELD10&quot;,
            &quot;FIELD11&quot;,&quot;FIELD12&quot;,&quot;FIELD13&quot;,&quot;FIELD14&quot;,&quot;FIELD15&quot;,
            &quot;FIELD16&quot;,&quot;FIELD17&quot;,&quot;FIELD18&quot;,&quot;FIELD19&quot;,&quot;FIELD20&quot;,
    };
    private String fileName;
    public ParallelCsvGenerate(String fileName) {
        this.fileName = fileName;
    }

    @Override
    public Integer call() throws Exception {
        try {
            generateCSV();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return 0;
    }

    private void generateCSV() throws IOException {

        CSVWriter writer = new CSVWriter(new FileWriter(filePath + fileName + &quot;.csv&quot;), CSVWriter.DEFAULT_SEPARATOR, CSVWriter.NO_QUOTE_CHARACTER);
        String[] content = new String[]{
                RandomGenerator.generateRandomInteger(),
                RandomGenerator.generateRandomStr(),
                RandomGenerator.generateRandomInteger(),
                RandomGenerator.generateRandomStr(),
                RandomGenerator.generateRandomInteger(),
                RandomGenerator.generateRandomStr(),
                RandomGenerator.generateRandomInteger(),
                RandomGenerator.generateRandomStr(),
                RandomGenerator.generateRandomInteger(),
                RandomGenerator.generateRandomStr(),
                RandomGenerator.generateRandomInteger(),
                RandomGenerator.generateRandomStr(),
                RandomGenerator.generateRandomInteger(),
                RandomGenerator.generateRandomStr(),
                RandomGenerator.generateRandomInteger(),
                RandomGenerator.generateRandomStr(),
                RandomGenerator.generateRandomInteger(),
                RandomGenerator.generateRandomStr(),
                RandomGenerator.generateRandomInteger(),
                RandomGenerator.generateRandomStr()
        };
        writer.writeNext(header);
        writer.writeNext(content);
        writer.close();
    }

}

Main

public static void main(String[] args) {
        System.out.println(&quot;Start generate&quot;);
        long start = System.currentTimeMillis();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 8,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue&lt;Runnable&gt;());
        List&lt;ParallelCsvGenerate&gt; taskList = new ArrayList&lt;&gt;(3800000);
        for(int i = 0; i &lt; 3800000; i++) {
            taskList.add(new ParallelCsvGenerate(i+&quot;&quot;));
        }
        try {
            List&lt;Future&lt;Integer&gt;&gt; futures = threadPoolExecutor.invokeAll(taskList);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(&quot;Success&quot;);
        long end = System.currentTimeMillis();
        System.out.println(&quot;Using time: &quot; + (end-start));
    }

答案1

得分: 0

  1. 你可以直接将内容写入文件(无需一次性将整个文件分配到一个 StringBuilder 中)。(我认为这是最大的时间+内存瓶颈builder.toString()

  2. 你可以并行生成每个文件。

  3. (小调整:)在循环内部省略 if 语句。

    当你使用更巧妙的循环+额外的1次迭代时,不需要 if(i < headList.size() - 1)

    通过更好的迭代方式(i+=2),可以消除 i % 2 == 0,在循环内部有更多的工作(i -> int, i + 1 -> string)。

  4. 如果适用,优先使用 append(char) 而不是 append(String)。(与其使用 append(&quot;,&quot;),不如使用 append(&#39;,&#39;)!)

...

英文:
  1. You could write directly into the file (without allocating the whole file in one StringBuilder). (I think this is the biggest time+memory bottleneck here: builder.toString())

  2. You could generate each file in parallel.

  3. (little tweaks:) Omit the if's inside loop.

    if(i &lt; headList.size() - 1) is not needed, when you do a more clever loop + 1 extra iteration.

    The i % 2 == 0 can be eliminated by a better iteration (i+=2) ..and more labor inside the loop (i -&gt; int, i + 1 -&gt; string)

  4. If applicable prefer append(char) to append(String). (Better append(&#39;,&#39;) than append(&quot;,&quot;)!)

...

答案2

得分: -1

你可以使用Fork/Join框架(Java 7及以上版本)来并行执行你的进程,并利用CPU的多核心。我为你举个例子。

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;

public class ForkJoinAdd extends RecursiveTask<Long> {

    private final long[] numbers;
    private final int start;
    private final int end;
    public static final long threshold = 10_000;

    public ForkJoinAdd(long[] numbers) {
        this(numbers, 0, numbers.length);
    }

    private ForkJoinAdd(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {

        int length = end - start;
        if (length <= threshold) {
            return add();
        }

        ForkJoinAdd firstTask = new ForkJoinAdd(numbers, start, start + length / 2);
        firstTask.fork(); //start asynchronously

        ForkJoinAdd secondTask = new ForkJoinAdd(numbers, start + length / 2, end);

        Long secondTaskResult = secondTask.compute();
        Long firstTaskResult = firstTask.join();

        return firstTaskResult + secondTaskResult;

    }

    private long add() {
        long result = 0;
        for (int i = start; i < end; i++) {
            result += numbers[i];
        }
        return result;
    }

    public static long startForkJoinSum(long n) {
        long[] numbers = LongStream.rangeClosed(1, n).toArray();
        ForkJoinTask<Long> task = new ForkJoinAdd(numbers);
        return new ForkJoinPool().invoke(task);
    }

}

使用这个示例。如果你想了解更多,请参考Java中的Fork/Join框架指南 | Baeldung以及Fork/Join(Java™教程),这些资源可以帮助你更好地理解和设计你的应用程序。祝你好运。

英文:

You can use Fork/Join framework (java 7 and above) to make your process in parallel and use multi core of your Cpu.
I'll take an example for you.

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;

public class ForkJoinAdd extends RecursiveTask&lt;Long&gt; {

    private final long[] numbers;
    private final int start;
    private final int end;
    public static final long threshold = 10_000;

    public ForkJoinAdd(long[] numbers) {
        this(numbers, 0, numbers.length);
    }

    private ForkJoinAdd(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {

        int length = end - start;
        if (length &lt;= threshold) {
            return add();
        }

        ForkJoinAdd firstTask = new ForkJoinAdd(numbers, start, start + length / 2);
        firstTask.fork(); //start asynchronously

        ForkJoinAdd secondTask = new ForkJoinAdd(numbers, start + length / 2, end);

        Long secondTaskResult = secondTask.compute();
        Long firstTaskResult = firstTask.join();

        return firstTaskResult + secondTaskResult;

    }

    private long add() {
        long result = 0;
        for (int i = start; i &lt; end; i++) {
            result += numbers[i];
        }
        return result;
    }

    public static long startForkJoinSum(long n) {
        long[] numbers = LongStream.rangeClosed(1, n).toArray();
        ForkJoinTask&lt;Long&gt; task = new ForkJoinAdd(numbers);
        return new ForkJoinPool().invoke(task);
    }

}

use this example
And if you want to read more about it, Guide to the Fork/Join Framework in Java | Baeldung
and Fork/Join (The Java™ Tutorials
can help you to better understand and better design your app.
be lucky.

答案3

得分: -1

  • run 方法中删除 for(int i = 0; i &lt; 1000000; i++) 循环(只留下单个的 generateCSV() 调用)。
  • 创建 500 万个 ParallelCsvGenerate 对象。
  • 将它们提交到 ThreadPoolExecutor

转换后的 main 方法:

public static void main(String[] args) {	
    ThreadPoolExecutor ex = (ThreadPoolExecutor) Executors.newFixedThreadPool(8);
    for(int i = 0; i &lt; 5000000; i++) {
        ParallelCsvGenerate generate = new ParallelCsvGenerate();
        ex.submit(generate);
    }
    ex.shutdown();
}

在我的笔记本电脑上(4 个物理核心带超线程,SSD 驱动器),大约需要 5 分钟才能完成。

编辑:

我用以下代码将 FileWriter 替换为 AsynchronousFileChannel

Path file = Paths.get(filePath + fileName + &quot;.csv&quot;);
try(AsynchronousFileChannel asyncFile = AsynchronousFileChannel.open(file,
                        StandardOpenOption.WRITE,
                        StandardOpenOption.CREATE)) {

    asyncFile.write(ByteBuffer.wrap(builder.toString().getBytes()), 0);
} catch (IOException e) {
    e.printStackTrace();
}

以获得 30% 的速度提升。

我相信主要的瓶颈是硬盘和文件系统本身。在这方面很难取得更多的改进。

英文:
  • Remove the for(int i = 0; i &lt; 1000000; i++) loop from run method (leave a single generateCSV() call.
  • Create 5 million ParallelCsvGenerate objects.
  • Submit them to a ThreadPoolExecutor

Converted main:

public static void main(String[] args) {	
	ThreadPoolExecutor ex = (ThreadPoolExecutor) Executors.newFixedThreadPool(8);
	for(int i = 0; i &lt; 5000000; i++) {
        ParallelCsvGenerate generate = new ParallelCsvGenerate();
    	ex.submit(generate);
	}
	ex.shutdown();
}

It takes roughly 5 minutes to complete on my laptop (4 physical cores with hyperthreading, SSD drive).

EDIT:

I've replaced FileWriter with AsynchronousFileChannel using the following code:

    Path file = Paths.get(filePath + fileName + &quot;.csv&quot;);
    try(AsynchronousFileChannel asyncFile = AsynchronousFileChannel.open(file,
                        StandardOpenOption.WRITE,
                        StandardOpenOption.CREATE)) {

        asyncFile.write(ByteBuffer.wrap(builder.toString().getBytes()), 0);
    } catch (IOException e) {
        e.printStackTrace();
    }

to achieve 30% speedup.

I believe that the main bottleneck is the hard drive and filesystem itself. Not much more can be achieved here.

huangapple
  • 本文由 发表于 2020年4月8日 22:20:28
  • 转载请务必保留本文链接:https://java.coder-hub.com/61102975.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定