英文:
Time taken for individual transformations in spark
问题
我正试图想出一种方法来测量Spark中各个转换所花费的时间。我正在使用以下代码来进行我项目的实验验证。
StructType newSchema = df.schema().add("CONCAT", DataTypes.StringType);
LongAccumulator longAccumulator = sparkSession.sparkContext().longAccumulator();
/* Map partition 转换 */
long start = System.currentTimeMillis();
df = df.mapPartitions(new MapPartitionsFunction<Row, Row>() {
long startTime = System.currentTimeMillis();
@Override
public Iterator<Row> call(Iterator<Row> it) throws Exception {
//某些逻辑
long endTime = System.currentTimeMillis();
longAccumulator.add(endTime - startTime);
//其余逻辑
}
}, RowEncoder.apply(newSchema));
// df = df.cache();
// count = df.count();
long end = System.currentTimeMillis();
/* withColumn 转换 */
start = System.currentTimeMillis();
df = df.withColumn("new_col", functions.lit(1));
// df = df.cache();
// count = df.count();
end = System.currentTimeMillis();
count = df.count();
df.show();
System.out.println("Map partition 时间为 " + longAccumulator.value() / df.rdd().getNumPartitions());
所以,我想知道像 mapPartition 和 withColumn 这样的单独转换需要花费多少时间。我们之前考虑过使用 cache/persist,然后再进行 count,但在更大的数据集上,性能有问题,因为我们会频繁进行这些转换,无法为所有转换都使用 count 和 cache。之前的方法在代码内部有注释。
我已经能够计算 mapPartition 的时间(使用累加器将所有时间相加,然后除以分区数),但我无法找到 with_column 的方法。有人能提供建议吗?我找不到任何在线资源。如果有人对 mapPartition 有更好的方法,请也给予建议。
英文:
I am trying to come up with a way to measure the time taken by individual transformations in spark. I am using the following code for experimenting for my POC.
StructType newSchema=df.schema().add("CONCAT",DataTypes.StringType);
LongAccumulator longAccumulator=sparkSession.sparkContext().longAccumulator();
/* Map partition transformation */
long start=System.currentTimeMillis();
df=df.mapPartitions(new MapPartitionsFunction<Row, Row>() {
long startTime=System.currentTimeMillis();
@Override
public Iterator<Row> call(Iterator<Row> it) throws Exception {
//Some logic
long endTime=System.currentTimeMillis();
longAccumulator.add(endTime-startTime);
//Rest of the logic
}
},RowEncoder.apply(newSchema));
// df=df.cache();
// count=df.count();
long end=System.currentTimeMillis();
/* withColumn transformation */
start=System.currentTimeMillis();
df=df.withColumn("new_col", functions.lit(1));
// df=df.cache();
// count=df.count();
end=System.currentTimeMillis();
count=df.count();
df.show();
System.out.println("Map partition time is "+longAccumulator.value()/df.rdd().getNumPartitions());
So, I want to know how much time does the individual transforms like mapPartition and withColumn takes. We were earlier thinking of using cache/persist and then count but on bigger datasets, it's having perfomance issues, since we would have these transformations a lot and can't afford count and cache for all. The earlier approach is commented inside code.
I have been able to calculate time for mapPartition(using accumulator to sum everything and divide by number of partitions) but I am not able to find one for with_column. Can someone suggest something? I am not able to find any online resource for it. If someone has better approach for map partition, please suggest that also.
专注分享java语言的经验与见解,让所有开发者获益!
评论