Apache Beam KinesisIO Java - 在运动流中使用数据离开的地方答案

作者: 分类: 编程代码 时间:1970-01-01

Apache Beam KinesisIO Java - 在运动流中使用数据离开的地方答案

Apache Beam KinesisIO Java - Consume the data in a kinesis stream from where it leftApache Beam KinesisIO Java - 在运动流中使用数据离开的地方

首先我想说的是,这对 Beam 世界来说是全新的。我正在处理一项以 Apache Beam 为重点的任务,我的主要数据源是 Kinesis 流。 在那里,当我使用流数据时,我注意到当我重新启动程序(我的消费者应用程序)时会出现相同的数据集。这是我的代码,

    String awsStreamName = KinesisStream.getProperty("stream.name");
    String awsAccessKey = KinesisStream.getProperty("access.key");
    String awsSecretKey = KinesisStream.getProperty("secret.key");
    String awsRegion = KinesisStream.getProperty("aws.region");
    Regions region = Regions.fromName(awsRegion);

    return KinesisIO.read()
            .withStreamName(awsStreamName)
            .withInitialPositionInStream(InitialPositionInStream.LATEST)
            .withAWSClientsProvider(awsAccessKey, awsSecretKey, region);

我想要的只是,我需要从我离开的地方开始读取数据。如果有人也可以提供一些资源,我将不胜感激。

我也发现了一个类似的问题,但对我没有帮助 - Apache Beam KinesisIO Java processing pipeline - application state, error handling & fault-tolerance?

【问题讨论】:

标签: java apache-beam amazon-kinesis apache-beam-io


【解决方案1】:

Beam 中的UnboundedSources 例如KinesisIO.read() 支持使用CheckpointMarks 进行检查点,以便在重新启动应用程序后从最新的检查点恢复。

这些检查点必须持久化到持久存储中。但是,具体如何完成取决于您使用的 Beam runner,例如Dataflow、Apache Flink 或 Apache Spark。

我建议阅读您各自运行时关于检查点的文档并检查相应 Beam runner 的管道选项。

例如,对于 Apache Flink,您必须通过 checkpointingInterval (FlinkPipelineOptions) 和另外的 configure checkpointing in Flink 启用检查点。

【讨论】:

    【解决方案2】:

    要从流中的不同位置开始,您可以使用以下任何一种:

    • .withInitialPositionInStream
    • .withInitialTimestampInStream

    【讨论】: