理解 Flink JobManager 内存

huangapple 未分类评论45阅读模式
英文:

Understanding flink jobmanager memory

问题

我有一个 Flink 作业,将一个 NFS 文件系统文件夹作为源,Kafka 作为接收端。目前在此阶段没有进行任何转换操作。

我已经使用 ContinuousFileMonitoringFunction 来持续监视文件夹中的事件,
以及 ContinuousFileReaderOperator 用于读取数据。

ContinuousFileMonitoringFunction<String> monitoringFunction = new ContinuousFileMonitoringFunction<>(
    inputFormat, FileProcessingMode.PROCESS_CONTINUOUSLY, env.getParallelism(),
    MONITORING_INTERVAL);

ContinuousFileReaderOperator<String> reader = new ContinuousFileReaderOperator<>(inputFormat);

初始时文件夹的大小约为 40GB,其中包含 3785468 个文件(包括所有子目录)。

我创建了一个拥有 25GB 堆内存的 Job Manager,以及两个拥有 4 个任务槽的 Task Manager,并配置了以下内存值:

taskmanager.memory.process.size: "26g"
taskmanager.memory.flink.size: "24g"
jobmanager.heap.size: "25g"
taskmanager.memory.jvm-overhead.max: "2g"
taskmanager.memory.task.off-heap.size: "1024M"
taskmanager.memory.task.heap.size: "16g"
taskmanager.memory.managed.fraction: 0.2
taskmanager.memory.network.max: "2g"

作业启动后,Job Manager 正在准备作业,准备阶段花费了大约 2 小时的时间。一旦作业开始,它能够顺利将文件传输到 Kafka。

我正在尝试微调作业,是否有人可以帮助我理解在准备阶段发生了什么,以及在此阶段哪部分内存重要?

我尝试调整内存参数,但似乎没有效果,在不了解各部分内存用途的情况下,我无法继续。

我已经阅读了 Flink 的内存文档,但不清楚管理内存和 DirectMemory 在处理作业时的用途。

链接:https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#memory-configuration

有人能帮助我理解需要考虑哪些因素来优化作业吗?

英文:

I have a flink job that has a nfs filesystem folder as a source and kafka as a sink. there are no transformations done at this point.

I have used continuousmonitoringfunction to continuously monitor for events on the folder and
ContinuousFileReaderOperator for reading the data.

ContinuousFileMonitoringFunction&lt;String&gt; monitoringFunction = new ContinuousFileMonitoringFunction&lt;&gt;(
                inputFormat, FileProcessingMode.PROCESS_CONTINUOUSLY, env.getParallelism(),
                MONITORING_INTERVAL);

ContinuousFileReaderOperator&lt;String&gt; reader = new ContinuousFileReaderOperator&lt;&gt;(inputFormat);

Initial size of the folder is ~40GB with 3785468 files(in all sub directories) in it.

I have created 1 job manager with heap 25G and 2 task managers with 4 task slots and following memory values.

taskmanager.memory.process.size: &quot;26g&quot;
taskmanager.memory.flink.size: &quot;24g&quot;
jobmanager.heap.size: &quot;25g&quot;
taskmanager.memory.jvm-overhead.max: &quot;2g&quot;
taskmanager.memory.task.off-heap.size: &quot;1024M&quot;
taskmanager.memory.task.heap.size: &quot;16g&quot;
taskmanager.memory.managed.fraction: 0.2
taskmanager.memory.network.max: &quot;2g&quot;

When the job started job manager is working on prepping the job and the prepping state is taking long time around 2 hrs. Once job starts it is working fine in transferring the files to kafka.

I am trying to fine tune the job, can anyone please help me understand what happens during prepping stage and what part of memory is important during this state ?

I am trying to play with memory params but nothing seems to work, without knowledge of what memory is used for what I am unable to proceed.

I have gone through flink documentation on memory but it is not clear on what managed memory is used for and DirectMemory is used for while processing the job.

https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#memory-configuration

Could some one help me understand what I should consider to fine tune the job ?

huangapple
  • 本文由 发表于 2020年4月8日 22:21:16
  • 转载请务必保留本文链接:https://java.coder-hub.com/61102995.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定