英文:
Apache Beam pipeline with FlinkRunner does not send ack's on PubSub when reading messages
问题
我有一个使用 PubsubIO 读取消息的流水线,这个流水线发布了很多消息,但不幸的是我没有收到确认。
以下是读取消息的代码部分:
public CampaignMetricStream() {
super(Options.class);
}
@Override
public PCollection<String> source(Pipeline pipeline, Options options) {
return pipeline.apply("pubsub-pull-events", PubsubIO
.readStrings()
.fromSubscription(options.getSubscription()));
}
我相当确定问题不在代码中,而在配置中。
Flink 在一个 k8s 集群上进行了配置:
taskmanager.numberOfTaskSlots: 64
taskmanager.memory.managed.size: 0
jobmanager.memory.process.size: 1g
taskmanager.memory.process.size: 50g
parallelism.default: 2
state.backend: filesystem
state.checkpoints.dir: file:///data/flink/checkpoints
taskmanager.memory.jvm-metaspace.size: 4g
而流水线是使用以下参数部署的:
--runner=FlinkRunner
--flinkMaster=localhost:8081
--checkpointingInterval=30000
--parallelism=20
--numConcurrentCheckpoints=200
--autoBalanceWriteFilesShardingEnabled=true
我有很多未传递的消息,但我找不到解决方法。
英文:
I have a pipeline that reads messages using PubsubIO, this pipeline publishes a lot of messages, but unfortunately I don't get the acknowledgment.
This is the code that reads the messages
public CampaignMetricStream() {
super(Options.class);
}
@Override
public PCollection<String> source(Pipeline pipeline, Options options) {
return pipeline.apply("pubsub-pull-events", PubsubIO
.readStrings()
.fromSubscription(options.getSubscription()));
}
I'm pretty sure that it is not a problem in the code, but in the configuration.
Flink is configured on a k8s cluster
taskmanager.numberOfTaskSlots: 64
taskmanager.memory.managed.size: 0
jobmanager.memory.process.size: 1g
taskmanager.memory.process.size: 50g
parallelism.default: 2
state.backend: filesystem
state.checkpoints.dir: file:///data/flink/checkpoints
taskmanager.memory.jvm-metaspace.size: 4g
And the pipeline is deployed with these parameters:
--runner=FlinkRunner
--flinkMaster=localhost:8081
--checkpointingInterval=30000
--parallelism=20
--numConcurrentCheckpoints=200
--autoBalanceWriteFilesShardingEnabled=true
I have a lot of undelivered messages and I can't find a way to solve it.
专注分享java语言的经验与见解,让所有开发者获益!
评论