Spark中每个单独转换所花费的时间

huangapple 未分类评论47阅读模式
英文:

Time taken for individual transformations in spark

问题

我正试图想出一种方法来测量Spark中各个转换所花费的时间。我正在使用以下代码来进行我项目的实验验证。

        StructType newSchema = df.schema().add("CONCAT", DataTypes.StringType);
        LongAccumulator longAccumulator = sparkSession.sparkContext().longAccumulator();
        
        /* Map partition 转换 */

        long start = System.currentTimeMillis();

        df = df.mapPartitions(new MapPartitionsFunction<Row, Row>() {
            long startTime = System.currentTimeMillis();
            @Override
            public Iterator<Row> call(Iterator<Row> it) throws Exception {
                //某些逻辑
                long endTime = System.currentTimeMillis();
                longAccumulator.add(endTime - startTime);
                //其余逻辑
            }
        }, RowEncoder.apply(newSchema));

//        df = df.cache();
//        count = df.count();
        long end = System.currentTimeMillis();

        /* withColumn 转换 */

        start = System.currentTimeMillis();
        df = df.withColumn("new_col", functions.lit(1));
//        df = df.cache();
//        count = df.count();
        end = System.currentTimeMillis();

        count = df.count();
        df.show();
        System.out.println("Map partition 时间为 " + longAccumulator.value() / df.rdd().getNumPartitions());

所以,我想知道像 mapPartition 和 withColumn 这样的单独转换需要花费多少时间。我们之前考虑过使用 cache/persist,然后再进行 count,但在更大的数据集上,性能有问题,因为我们会频繁进行这些转换,无法为所有转换都使用 count 和 cache。之前的方法在代码内部有注释。

我已经能够计算 mapPartition 的时间(使用累加器将所有时间相加,然后除以分区数),但我无法找到 with_column 的方法。有人能提供建议吗?我找不到任何在线资源。如果有人对 mapPartition 有更好的方法,请也给予建议。

英文:

I am trying to come up with a way to measure the time taken by individual transformations in spark. I am using the following code for experimenting for my POC.

        StructType newSchema=df.schema().add(&quot;CONCAT&quot;,DataTypes.StringType);
        LongAccumulator longAccumulator=sparkSession.sparkContext().longAccumulator();
        
        /* Map partition transformation */

        long start=System.currentTimeMillis();

        df=df.mapPartitions(new MapPartitionsFunction&lt;Row, Row&gt;() {
            long startTime=System.currentTimeMillis();
            @Override
            public Iterator&lt;Row&gt; call(Iterator&lt;Row&gt; it) throws Exception {
                //Some logic
                long endTime=System.currentTimeMillis();
                longAccumulator.add(endTime-startTime);
                //Rest of the logic
            }
        },RowEncoder.apply(newSchema));

//        df=df.cache();
//        count=df.count();
        long end=System.currentTimeMillis();

        /* withColumn transformation */

        start=System.currentTimeMillis();
        df=df.withColumn(&quot;new_col&quot;, functions.lit(1));
//        df=df.cache();
//        count=df.count();
        end=System.currentTimeMillis();

        count=df.count();
        df.show();
        System.out.println(&quot;Map partition time is &quot;+longAccumulator.value()/df.rdd().getNumPartitions());

So, I want to know how much time does the individual transforms like mapPartition and withColumn takes. We were earlier thinking of using cache/persist and then count but on bigger datasets, it's having perfomance issues, since we would have these transformations a lot and can't afford count and cache for all. The earlier approach is commented inside code.

I have been able to calculate time for mapPartition(using accumulator to sum everything and divide by number of partitions) but I am not able to find one for with_column. Can someone suggest something? I am not able to find any online resource for it. If someone has better approach for map partition, please suggest that also.

huangapple
  • 本文由 发表于 2020年5月5日 00:03:52
  • 转载请务必保留本文链接:https://java.coder-hub.com/61596518.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定