Apache Beam使用FlinkRunner运行的流水线在读取消息时未发送到PubSub的确认。

huangapple 未分类评论72阅读模式
英文:

Apache Beam pipeline with FlinkRunner does not send ack's on PubSub when reading messages

问题

我有一个使用 PubsubIO 读取消息的流水线,这个流水线发布了很多消息,但不幸的是我没有收到确认。
以下是读取消息的代码部分:

public CampaignMetricStream() {
    super(Options.class);
}

@Override
public PCollection<String> source(Pipeline pipeline, Options options) {
    return pipeline.apply("pubsub-pull-events", PubsubIO
            .readStrings()
            .fromSubscription(options.getSubscription()));
}

我相当确定问题不在代码中,而在配置中。

Flink 在一个 k8s 集群上进行了配置:

taskmanager.numberOfTaskSlots: 64
taskmanager.memory.managed.size: 0
jobmanager.memory.process.size: 1g
taskmanager.memory.process.size: 50g
parallelism.default: 2
state.backend: filesystem
state.checkpoints.dir: file:///data/flink/checkpoints
taskmanager.memory.jvm-metaspace.size: 4g

而流水线是使用以下参数部署的:

--runner=FlinkRunner
--flinkMaster=localhost:8081
--checkpointingInterval=30000
--parallelism=20
--numConcurrentCheckpoints=200
--autoBalanceWriteFilesShardingEnabled=true

我有很多未传递的消息,但我找不到解决方法。

英文:

I have a pipeline that reads messages using PubsubIO, this pipeline publishes a lot of messages, but unfortunately I don't get the acknowledgment.
This is the code that reads the messages

public CampaignMetricStream() {
        super(Options.class);
    }

    @Override
    public PCollection&lt;String&gt; source(Pipeline pipeline, Options options) {
        return pipeline.apply(&quot;pubsub-pull-events&quot;, PubsubIO
                .readStrings()
                .fromSubscription(options.getSubscription()));
    }

I'm pretty sure that it is not a problem in the code, but in the configuration.

Flink is configured on a k8s cluster

    taskmanager.numberOfTaskSlots: 64
    taskmanager.memory.managed.size: 0
    jobmanager.memory.process.size: 1g
    taskmanager.memory.process.size: 50g
    parallelism.default: 2
    state.backend: filesystem
    state.checkpoints.dir: file:///data/flink/checkpoints
    taskmanager.memory.jvm-metaspace.size: 4g

And the pipeline is deployed with these parameters:

--runner=FlinkRunner
--flinkMaster=localhost:8081  
--checkpointingInterval=30000
--parallelism=20
--numConcurrentCheckpoints=200 
--autoBalanceWriteFilesShardingEnabled=true

I have a lot of undelivered messages and I can't find a way to solve it.

huangapple
  • 本文由 发表于 2020年7月28日 21:35:14
  • 转载请务必保留本文链接:https://java.coder-hub.com/63135407.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定