英文:
Understanding flink jobmanager memory
问题
我有一个 Flink 作业,将一个 NFS 文件系统文件夹作为源,Kafka 作为接收端。目前在此阶段没有进行任何转换操作。
我已经使用 ContinuousFileMonitoringFunction
来持续监视文件夹中的事件,
以及 ContinuousFileReaderOperator
用于读取数据。
ContinuousFileMonitoringFunction<String> monitoringFunction = new ContinuousFileMonitoringFunction<>(
inputFormat, FileProcessingMode.PROCESS_CONTINUOUSLY, env.getParallelism(),
MONITORING_INTERVAL);
ContinuousFileReaderOperator<String> reader = new ContinuousFileReaderOperator<>(inputFormat);
初始时文件夹的大小约为 40GB,其中包含 3785468 个文件(包括所有子目录)。
我创建了一个拥有 25GB 堆内存的 Job Manager,以及两个拥有 4 个任务槽的 Task Manager,并配置了以下内存值:
taskmanager.memory.process.size: "26g"
taskmanager.memory.flink.size: "24g"
jobmanager.heap.size: "25g"
taskmanager.memory.jvm-overhead.max: "2g"
taskmanager.memory.task.off-heap.size: "1024M"
taskmanager.memory.task.heap.size: "16g"
taskmanager.memory.managed.fraction: 0.2
taskmanager.memory.network.max: "2g"
作业启动后,Job Manager 正在准备作业,准备阶段花费了大约 2 小时的时间。一旦作业开始,它能够顺利将文件传输到 Kafka。
我正在尝试微调作业,是否有人可以帮助我理解在准备阶段发生了什么,以及在此阶段哪部分内存重要?
我尝试调整内存参数,但似乎没有效果,在不了解各部分内存用途的情况下,我无法继续。
我已经阅读了 Flink 的内存文档,但不清楚管理内存和 DirectMemory 在处理作业时的用途。
链接:https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#memory-configuration
有人能帮助我理解需要考虑哪些因素来优化作业吗?
英文:
I have a flink job that has a nfs filesystem folder as a source and kafka as a sink. there are no transformations done at this point.
I have used continuousmonitoringfunction to continuously monitor for events on the folder and
ContinuousFileReaderOperator for reading the data.
ContinuousFileMonitoringFunction<String> monitoringFunction = new ContinuousFileMonitoringFunction<>(
inputFormat, FileProcessingMode.PROCESS_CONTINUOUSLY, env.getParallelism(),
MONITORING_INTERVAL);
ContinuousFileReaderOperator<String> reader = new ContinuousFileReaderOperator<>(inputFormat);
Initial size of the folder is ~40GB with 3785468 files(in all sub directories) in it.
I have created 1 job manager with heap 25G and 2 task managers with 4 task slots and following memory values.
taskmanager.memory.process.size: "26g"
taskmanager.memory.flink.size: "24g"
jobmanager.heap.size: "25g"
taskmanager.memory.jvm-overhead.max: "2g"
taskmanager.memory.task.off-heap.size: "1024M"
taskmanager.memory.task.heap.size: "16g"
taskmanager.memory.managed.fraction: 0.2
taskmanager.memory.network.max: "2g"
When the job started job manager is working on prepping the job and the prepping state is taking long time around 2 hrs. Once job starts it is working fine in transferring the files to kafka.
I am trying to fine tune the job, can anyone please help me understand what happens during prepping stage and what part of memory is important during this state ?
I am trying to play with memory params but nothing seems to work, without knowledge of what memory is used for what I am unable to proceed.
I have gone through flink documentation on memory but it is not clear on what managed memory is used for and DirectMemory is used for while processing the job.
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#memory-configuration
Could some one help me understand what I should consider to fine tune the job ?
专注分享java语言的经验与见解,让所有开发者获益!
评论