如何确定Spark中最佳的洗牌分区数量

huangapple 未分类评论43阅读模式
英文:

How to identify the optimum number of shuffle partition in Spark

问题

以下是翻译好的内容:

我在EMR中运行一个Spark结构化流作业(每天反弹一次)。在执行几个小时后,我的应用程序出现了OOM错误并被终止。以下是我的配置和Spark SQL代码。
我是Spark的新手,需要您宝贵的建议。

EMR具有10个实例,每个实例有16个内核和64GB内存。

Spark-Submit参数:

   num_of_executors: 17
   executor_cores: 5
   executor_memory: 19G
   driver_memory: 30G

作业从Kafka以30秒的间隔读取微批输入。每个批次平均读取的行数是9万。

  spark.streaming.kafka.maxRatePerPartition: 4500
  spark.streaming.stopGracefullyOnShutdown: true
  spark.streaming.unpersist: true
  spark.streaming.kafka.consumer.cache.enabled: true
  spark.hadoop.fs.s3.maxRetries: 30 
  spark.sql.shuffle.partitions: 2001

Spark SQL聚合代码:

dataset.groupBy(functions.col(NAME),functions.window(functions.column(TIMESTAMP_COLUMN),30))
            .agg(functions.concat_ws(SPLIT, functions.collect_list(DEPARTMENT)).as(DEPS))
            .select(NAME,DEPS)
            .map((row) -> {
              Map<String, Object> map = Maps.newHashMap();
              map.put(NAME, row.getString(0));
              map.put(DEPS, row.getString(1));
              return new KryoMapSerializationService().serialize(map);
            }, Encoders.BINARY());

驱动程序的一些日志:

20/04/04 13:10:51 INFO TaskSetManager: 在阶段1041.0中完成任务1911.0(TID 1052055),在<host>(执行者3)上花费374毫秒(1998/2001)
20/04/04 13:10:52 INFO TaskSetManager: 在阶段1041.0中完成任务1925.0(TID 1052056),在<host>(执行者3)上花费411毫秒(1999/2001)
20/04/04 13:10:52 INFO TaskSetManager: 在阶段1041.0中完成任务1906.0(TID 1052054),在<host>(执行者3)上花费776毫秒(2000/2001)
20/04/04 13:11:04 INFO YarnSchedulerBackend$YarnDriverEndpoint: 禁用执行者3。
20/04/04 13:11:04 INFO DAGScheduler: 执行者丢失:3(时期522)
20/04/04 13:11:04 INFO BlockManagerMasterEndpoint: 正在尝试从BlockManagerMaster中删除执行者3。
20/04/04 13:11:04 INFO BlockManagerMasterEndpoint: 正在删除块管理器BlockManagerId(3,<host>,38533,无)。
20/04/04 13:11:04 INFO BlockManagerMaster: 在removeExecutor中成功删除3
20/04/04 13:11:04 INFO YarnAllocator: 在主机<host>上完成容器container_1582797414408_1814_01_000004(状态:COMPLETE,退出状态:143)

顺便说一下,我在我的forEachBatch代码中使用了collectAsList。

  List<Event> list = dataset.select("value")
        .selectExpr("deserialize(value) as rows")
        .select("rows.*")
        .selectExpr(NAME, DEPS)
        .as(Encoders.bean(Event.class))
        .collectAsList();
英文:

I am running a spark structured streaming job (bounces every day) in EMR. I am getting an OOM error in my application after a few hours of execution and get killed. The following are my configurations and spark SQL code.
I am new to Spark and need your valuable input.

The EMR is having 10 instances with 16 core and 64GB memory.

Spark-Submit arguments:

   num_of_executors: 17
   executor_cores: 5
   executor_memory: 19G
   driver_memory: 30G

Job is reading input as micro-batches from a Kafka at an interval of 30seconds. Average number of rows read per batch is 90k.

  spark.streaming.kafka.maxRatePerPartition: 4500
  spark.streaming.stopGracefullyOnShutdown: true
  spark.streaming.unpersist: true
  spark.streaming.kafka.consumer.cache.enabled: true
  spark.hadoop.fs.s3.maxRetries: 30 
  spark.sql.shuffle.partitions: 2001

Spark SQL aggregation code:

dataset.groupBy(functions.col(NAME),functions.window(functions.column(TIMESTAMP_COLUMN),30))
            .agg(functions.concat_ws(SPLIT, functions.collect_list(DEPARTMENT)).as(DEPS))
            .select(NAME,DEPS)
            .map((row) -> {
              Map<String, Object> map = Maps.newHashMap();
              map.put(NAME, row.getString(0));
              map.put(DEPS, row.getString(1));
              return new KryoMapSerializationService().serialize(map);
            }, Encoders.BINARY());

Some logs from the driver:

20/04/04 13:10:51 INFO TaskSetManager: Finished task 1911.0 in stage 1041.0 (TID 1052055) in 374 ms on <host> (executor 3) (1998/2001)
20/04/04 13:10:52 INFO TaskSetManager: Finished task 1925.0 in stage 1041.0 (TID 1052056) in 411 ms on  <host> (executor 3) (1999/2001)
20/04/04 13:10:52 INFO TaskSetManager: Finished task 1906.0 in stage 1041.0 (TID 1052054) in 776 ms on  <host> (executor 3) (2000/2001)
20/04/04 13:11:04 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 3.
20/04/04 13:11:04 INFO DAGScheduler: Executor lost: 3 (epoch 522)
20/04/04 13:11:04 INFO BlockManagerMasterEndpoint: Trying to remove executor 3 from BlockManagerMaster.
20/04/04 13:11:04 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(3,  <host>, 38533, None)
20/04/04 13:11:04 INFO BlockManagerMaster: Removed 3 successfully in removeExecutor
20/04/04 13:11:04 INFO YarnAllocator: Completed container container_1582797414408_1814_01_000004 on host:  <host> (state: COMPLETE, exit status: 143)

And by the way, I am using collectasList in my forEachBatch code

  List<Event> list = dataset.select("value")
        .selectExpr("deserialize(value) as rows")
        .select("rows.*")
        .selectExpr(NAME, DEPS)
        .as(Encoders.bean(Event.class))
        .collectAsList();

答案1

得分: 0

使用这些设置,您可能会导致自己的问题。

   num_of_executors: 17
   executor_cores: 5
   executor_memory: 19G
   driver_memory: 30G

您实际上在这里创建了额外的容器,需要在它们之间进行洗牌。相反地,从类似于 10 个执行器、15 个核心、60GB 内存的设置开始。如果那个可行,然后您可以稍微调整这些参数以尝试优化性能。通常我会在每一步中将容器数量减半(但自从 Spark 2.0 以来我也不需要这样做了)。

让 Spark SQL 将默认值保持在 200。您将其分割得越多,Spark 进行计算洗牌的工作量就越大。如果有什么问题,我会尝试使用与执行器数量相同的并行度,所以在这种情况下只是 10。当 2.0 发布时,这就是您调整 Hive 查询的方式。将作业设置得过于复杂会让所有负载都集中在主节点上。

使用 Datasets 和编码通常不如直接使用 DataFrame 操作高效。我发现将这部分内容提取出来用于 DataFrame 操作可以显著提升性能。

英文:

With these settings, you may be causing your own issues.

   num_of_executors: 17
   executor_cores: 5
   executor_memory: 19G
   driver_memory: 30G

You are basically creating extra containers here to have to shuffle between. Instead, start off with something like 10 executors, 15 cores, 60g memory. If that is working, then you can play these a bit to try and optimize performance. I usually try splitting my containers in half each step (but I also havent needed to do this since spark 2.0).

Let Spark SQL keep the default at 200. The more you break this up, the more math you make Spark do to calculate the shuffles. If anything, I'd try to go with the same number of parallelism as you have executors, so in this case just 10. When 2.0 came out, this is how you would tune hive queries.
Making the job complex to break up puts all the load on the master.

Using Datasets and Encoding are also generally not as performant as going with straight DataFrame operations. I have found great lifts in performance of factoring this out for dataframe operations.

huangapple
  • 本文由 发表于 2020年4月4日 22:03:48
  • 转载请务必保留本文链接:https://java.coder-hub.com/61029267.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定